Fork me on GitHub

使用Pyspark进行机器学习

目录

  • 背景
  • 第一部分 环境依赖
  • 第二部分 交互接口
  • 第三部分 任务提交
  • 参考文献及资料

背景

第一部分 环境准备

配置和启动jupyter notebook

1.1 生成配置

1
2
root@hadoop01:/opt# jupyter notebook --generate-config
Writing default config to: /root/.jupyter/jupyter_notebook_config.py

1.2 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
root@hadoop01:/opt# vi /root/anaconda3/share/jupyter/kernels/python3/kernel.json

{
"argv": [
"/root/anaconda3/bin/python",
"-m",
"ipykernel_launcher",
"-f",
"{connection_file}"
],
"display_name": "Python 3",
"language": "python",
"env": {
"SPARK_HOME": "/opt/spark-2.3.2/",
"PYSPARK_PYTHON": "/root/anaconda3/bin/python",
"PYSPARK_DRIVER_PYTHON": "ipython3",
"PYTHONPATH": "/opt/spark-2.3.2/python/:/opt/spark-2.3.2/python/lib/py4j-0.10.7-src.zip",
"PYTHONSTARTUP": "/opt/spark-2.3.2/python/pyspark/shell.py",
"PYSPARK_SUBMIT_ARGS": "--name pyspark --master local pyspark-shell"
}
}

1.3 启动:

1
nohup jupyter notebook --ip=0.0.0.0 --no-browser --allow-root --notebook-dir=/opt/jupyter &

1.4 配置密钥

1
2
3
# jupyter notebook password
Enter password:
Verify password:

1.5 验证

1
2
3
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

第一部分 数据的读取

通常数据存储在csv文件中,需要使用pyspark进行读取。

1
df = spark.read.option("header",True).csv("/data/test.csv")

如果是从hdfs文件系统读取:

1
df = spark.read.options(header='True', delimiter=',').csv("hdfs://hadoop01:9000/data/test.csv")

可以使用show方法进行查看:

1
df.show()

第二部分 数据清洗

数据的清理,去除含有空字段的记录。

1
2
3
4
5
from pyspark.sql.functions import isnull, when, count, col

# |user_id|age_range|gender|merchant_id|label|
# 去除字段为空的异常数据
df = df.filter(df.label.isNotNull()&df.merchant_id.isNotNull()&df.gender.isNotNull()&df.age_range.isNotNull()&df.user_id.isNotNull())

另外还有些时候我们需要替换指定条件的。比如替换空值。

1
2
3
# 使用数字0替换null字段
df = df.na.fill(value=0).show()
df = df.fillna(value=0,subset=["population"]).show()

对于一些不需要的列进行去除。

1
2
3
# 去除列
df = df.drop('user_id')
df.show()

列值转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def encode_columns(df, col_list):
indexers = [
StringIndexer(inputCol=c, outputCol=f'{c}_indexed').setHandleInvalid("keep")
for c in col_list
]
encoder = OneHotEncoderEstimator(
inputCols = [indexer.getOutputCol()) for index in indexers]) #.setDropLast(False)
newColumns = []
for f in col_list:
colMap = df.select(f'{f}', f'{f}_indexed').distinct().rdd.collectAsMap()
colTuple = sorted( (v, f'{f}_{k}') for k,v in colMap.items())
newColumns.append(v[1] for v in colTuple)

pipeline = Pipeline(stages =indexers + [encoder])
piped_encoder = pipeline.fit(df)
encoded_df = piped_encoder.transfrom(df)
return piped_encoder, encoded_df, newColumns

df = encode_columns(df, ['user_id'])

特征数据向量化。

1
2
3
4
5
6
7
8
# Assemble all the features with VectorAssembler
from pyspark.ml.feature import VectorAssembler

required_features = ['age_range','gender','merchant_id']
assembler = VectorAssembler(inputCols=required_features, outputCol='features')

df = assembler.transform(df)
df.show()

切分数据集:

1
2
3
4
5
6
7
#Decide on the split between training and testing data from the dataframe
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the dataframe into test and training dataframes
training_data, test_data = df.randomSplit([trainingFraction, testingFraction], seed=seed)

第三部分 模型训练

1
2
3
4
5
6
7
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='label',
featuresCol='features',
maxDepth=5)
model = rf.fit(training_data)
rf_predictions = model.transform(test_data)

第四部分 模型预测评分

1
2
3
4
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'accuracy')
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))

第五部分 模型应用

数据准备:

1
2
df.write.option("header",True).csv("/data/output.csv")
df.write.option("header",True).csv("hdfs://hadoop01:9000/data/output")

参考文献及资料

1、 案例,链接:https://hackernoon.com/building-a-machine-learning-model-with-pyspark-a-step-by-step-guide-1z2d3ycd

2、案例,链接:https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-machine-learning-mllib-notebook

3、pyspark介绍,链接:https://sparkbyexamples.com/pyspark/pyspark-fillna-fill-replace-null-values/

0%