自定义Pyspark模型的方法
Pyspark的单个模型构建
Notebook提供灵活的算法建模环境,用户可通过代码自定义实现模型。
自定义的Pyspark模型,目前只支持封装一层Pyspark的ML类库中的模型,并需在类的属性中指定具体是哪一个ML类库的模型,其他和Python模型一致。总结如下
-
构造一个类,类名随意,但这个类必须有个属性为“model”,并且是Pyspark的ML库中的模型;
-
实现predict接口;
-
predict接口必须接收一个pandas的DataFrame对象;
示例如下:
import pandas as pd
from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs
class TestModel:
def __init__(self, model):
self.model = model
pass
def predict(self, data:pd.DataFrame):
pass
my_model = TestModel()
save_pyspark_model_to_hdfs(my_model)
下面是一个比较好的实现方法:
from pyspark import SparkConf, SparkContext,HiveContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
# spark session 初始化
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
# 训练数据
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data,columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x:1 if x > 1 else 0)
# 转为pyspark的dataframe
data = spark.createDataFrame(data)
# 转为向量形式
vecAssembler = VectorAssembler(inputCols=col, outputCol="features")
train_data = vecAssembler.transform(data).select(['features', label])
# 定义模型并训练
gbdt = GBTClassifier(featuresCol='features',
labelCol=label)
gbdt_model = gbdt.fit(train_data)
class TestModel:
def __init__(self, model, col):
self.model = model
self.col = col
# 下面一行不可缺少
self.__estimator__ = \
str(model.__class__).split('.')[-1].split("""'""")[0]
def predict(self, data:pd.DataFrame):
data = data[self.col]
data = spark.createDataFrame(data)
vecAssembler = VectorAssembler(inputCols=self.col,
outputCol="features")
test_data = vecAssembler.transform(data).select(['features'])
df_res = \
self.model.transform(test_data).select('prediction').toPandas()
return df_res
model = TestModel(model=gbdt_model,
col= ['sepal length (cm)', 'sepal width (cm)',
'petal length (cm)', 'petal width (cm)'])
from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs
save_pyspark_model_to_hdfs(model)
Pyspark的pipeline构建
1.数据处理
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
from aiworks_plugins.hdfs_plugins.saved_model_pyspark import save_PickleFile_spark
# spark session 初始化
import re
import dill
import os
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)
# 转为pyspark的dataframe
data = spark.createDataFrame(data)
x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))
2.特征工程封装
由于特征工程需要对特定列进行处理,因此需要对其进行封装。特征工程封装中必须实现以下几点:
-
必须要有transform过程;
-
如果有用到pyspark库中ml中机器学习的类,必须要是训练完成的类;
-
必须实现get_spark_model_info函数,需要返回使用到的pyspark.ml模型在该类中的属性名称、以及对应的模型,没有使用到pyspark模型,则为返回空字典。VectorAssembler除外。
封装的过程如下:
ⅰ. 构造一个类(类名是任意的); + ⅱ. 实现transform接口; + ⅲ. transform接口必须接收一个pyspark.sql.DataFrame对象; + ⅳ. 实现get_spark_model_info,返回相应字典; +
下面展示特征工程封装的例子。
(1) 对于自己写的特征工程方法封装,如二次函数转化:
class FeatureSquare:
def __init__(self, col):
self.col = col
def get_spark_model_info(self): #该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,以及对应的模型,没有使用到pyspark模型,则为返回空字典
return {}
def transform(self, data):
for col in self.col:
data = data.withColumn(f'{col}_pow', data[col] ** 2)
return data
feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)
(2)对于调用其他库特征工程方法的封装,如此处引用的SaparkMillib中的MinMaxScaler模型;
vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)
from pyspark.ml.feature import MinMaxScaler
min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)
class FeatureScale:
def __init__(self, model, col):
self.col = col
self.scale_model = model
def get_spark_model_info(self):
return {'scale_model': self.scale_model}
def transform(self, data: pd.DataFrame):
vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
data = vecAssembler.transform(data).drop(*self.col)
data = self.scale_model.transform(data).drop('features')
return data
std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
data = std_feature_model.transform(data)
3.预测模型封装
封装的过程如下: ⅰ. 构造一个类(类名是任意的)。 ⅱ. 实现predict接口。 ⅲ. predict接口必须接收一个pyspark.sql.DataFrame对象 ⅳ. 实现get_spark_model_info,返回相应字典
# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
labelCol=label)
gbdt_model = gbdt.fit(data)
class GBDTModel:
def __init__(self, model, col):
self.gbdt_model = model
self.col = col
def get_spark_model_info(self):
return {'gbdt_model': self.gbdt_model}
def transform(self, data: DataFrame):
df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
return df_res
predict_model = GBDTModel(gbdt_model, 'scale_features')
4.组成链路
将各个封装好的特征处理和预测模型按照顺序组成List。
pipeline_list = [feat_square, std_feature_model, predict_model]
path="/user/hive/warehouse/yufu_test.db/credit_data/"
save_spark_pipeline_model(pipeline_list, sc, 'spark_pipeline_pm', path)
5.整体示例
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
# spark session 初始化
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)
# 转为pyspark的dataframe
data = spark.createDataFrame(data)
x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))
# 特征处理1,col中不应该包含标签列
class FeatureSquare:
def __init__(self, col):
self.col = col
def get_spark_model_info(self):
"""
该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
以及对应的模型,没有使用到pyspark模型,则为返回空字典
"""
return {}
def transform(self, data):
# transform输入data必须为pyaprk.sql.DataFrame
for col in self.col:
data = data.withColumn(f'{col}_pow', data[col] ** 2)
return data
feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)
vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)
from pyspark.ml.feature import MinMaxScaler
min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)
# 特征处理2,引用了pyspark.ml中的模型,col中不应该包含标签列
class FeatureScale:
def __init__(self, model, col):
self.col = col
self.scale_model = model
def get_spark_model_info(self):
"""
该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
以及对应的模型,没有使用到pyspark模型,则为返回空字典
"""
return {'scale_model': self.scale_model}
def transform(self, data: pd.DataFrame):
# transform输入data必须为pyaprk.sql.DataFrame
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
data = vecAssembler.transform(data).drop(*self.col)
data = self.scale_model.transform(data).drop('features')
return data
std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
data = std_feature_model.transform(data)
# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
labelCol=label)
gbdt_model = gbdt.fit(data)
# 创建预测模型,col中不应该包含标签列
class GBDTModel:
def __init__(self, model, col):
self.gbdt_model = model
self.col = col
def get_spark_model_info(self):
"""
该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
以及对应的模型,没有使用到pyspark模型,则为返回空字典
"""
return {'gbdt_model': self.gbdt_model}
def transform(self, data: DataFrame):
# transform输入data必须为pyaprk.sql.DataFrame
df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
return df_res
predict_model = GBDTModel(gbdt_model, 'scale_features')
pipeline_list = [feat_square, std_feature_model, predict_model]
from aiworks_plugins.hdfs_plugins import save_spark_pipeline_to_hdfs
save_spark_pipeline_to_hdfs(pipeline_list, sc, model_name='spark_pipeline_pm')
# pipeline_list必须为list,sc为SparkContext, 'spark_pipeline_pm'为保存模型的名称
该算法pipeline部署后,可用以下参数进行模型调用,进行模型调试:
# 部署预测时输入的参数
{
"sepal length (cm)":5.9,
"sepal width (cm)":3,
"petal length (cm)":5.1,
"petal width (cm)":1.8
}
# 或者
{
"sepal length (cm)":5.1,
"sepal width (cm)":3.5,
"petal length (cm)":1.4,
"petal width (cm)":0.2
}