自定义Pyspark模型的方法

Pyspark的单个模型构建

Notebook提供灵活的算法建模环境,用户可通过代码自定义实现模型。

自定义的Pyspark模型,目前只支持封装一层Pyspark的ML类库中的模型,并需在类的属性中指定具体是哪一个ML类库的模型,其他和Python模型一致。总结如下

  1. 构造一个类,类名随意,但这个类必须有个属性为“model”,并且是Pyspark的ML库中的模型;

  2. 实现predict接口;

  3. predict接口必须接收一个pandas的DataFrame对象;

示例如下:

import pandas as pd
from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs


class TestModel:
  	def __init__(self, model):
      	self.model = model
      	pass
    def predict(self, data:pd.DataFrame):
      	pass

my_model = TestModel()
save_pyspark_model_to_hdfs(my_model)

下面是一个比较好的实现方法:

from pyspark import SparkConf, SparkContext,HiveContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd

# spark session 初始化
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

# 训练数据
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data,columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x:1 if x > 1 else 0)


# 转为pyspark的dataframe
data = spark.createDataFrame(data)
# 转为向量形式
vecAssembler = VectorAssembler(inputCols=col, outputCol="features")
train_data = vecAssembler.transform(data).select(['features', label])

# 定义模型并训练
gbdt = GBTClassifier(featuresCol='features',
                     labelCol=label)
gbdt_model = gbdt.fit(train_data)


class TestModel:
  	def __init__(self, model, col):
      	self.model = model
        self.col = col
        # 下面一行不可缺少
        self.__estimator__ = \
        		str(model.__class__).split('.')[-1].split("""'""")[0]


    def predict(self, data:pd.DataFrame):
      	data = data[self.col]
        data = spark.createDataFrame(data)

        vecAssembler = VectorAssembler(inputCols=self.col,
                                       outputCol="features")
        test_data = vecAssembler.transform(data).select(['features'])

        df_res = \
        		self.model.transform(test_data).select('prediction').toPandas()
        return df_res

model = TestModel(model=gbdt_model,
                  col= ['sepal length (cm)', 'sepal width (cm)',
                        'petal length (cm)', 'petal width (cm)'])

from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs
save_pyspark_model_to_hdfs(model)

Pyspark的pipeline构建

1.数据处理

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
from aiworks_plugins.hdfs_plugins.saved_model_pyspark import save_PickleFile_spark
# spark session 初始化
import re
import dill
import os


conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)

data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)

# 转为pyspark的dataframe
data = spark.createDataFrame(data)

x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))

2.特征工程封装

由于特征工程需要对特定列进行处理,因此需要对其进行封装。特征工程封装中必须实现以下几点:

  • 必须要有transform过程;

  • 如果有用到pyspark库中ml中机器学习的类,必须要是训练完成的类;

  • 必须实现get_spark_model_info函数,需要返回使用到的pyspark.ml模型在该类中的属性名称、以及对应的模型,没有使用到pyspark模型,则为返回空字典。VectorAssembler除外。

封装的过程如下:

ⅰ. 构造一个类(类名是任意的);  +
ⅱ. 实现transform接口;  +
ⅲ. transform接口必须接收一个pyspark.sql.DataFrame对象; +
ⅳ. 实现get_spark_model_info,返回相应字典;  +

下面展示特征工程封装的例子。

(1) 对于自己写的特征工程方法封装,如二次函数转化:

class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def get_spark_model_info(self): #该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,以及对应的模型,没有使用到pyspark模型,则为返回空字典
        return {}

    def transform(self, data):
        for col in self.col:
            data = data.withColumn(f'{col}_pow', data[col] ** 2)
        return data


feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)

(2)对于调用其他库特征工程方法的封装,如此处引用的SaparkMillib中的MinMaxScaler模型;

vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)

from pyspark.ml.feature import MinMaxScaler

min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)


class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.scale_model = model

    def get_spark_model_info(self):
        return {'scale_model': self.scale_model}

    def transform(self, data: pd.DataFrame):
        vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
        data = vecAssembler.transform(data).drop(*self.col)
        data = self.scale_model.transform(data).drop('features')
        return data


std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                                             'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

data = std_feature_model.transform(data)

3.预测模型封装

封装的过程如下: ⅰ. 构造一个类(类名是任意的)。 ⅱ. 实现predict接口。 ⅲ. predict接口必须接收一个pyspark.sql.DataFrame对象 ⅳ. 实现get_spark_model_info,返回相应字典

# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
                     labelCol=label)

gbdt_model = gbdt.fit(data)


class GBDTModel:
    def __init__(self, model, col):
        self.gbdt_model = model
        self.col = col

    def get_spark_model_info(self):
        return {'gbdt_model': self.gbdt_model}

    def transform(self, data: DataFrame):
        df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
        return df_res

predict_model = GBDTModel(gbdt_model, 'scale_features')

4.组成链路

将各个封装好的特征处理和预测模型按照顺序组成List。

pipeline_list = [feat_square, std_feature_model, predict_model]
path="/user/hive/warehouse/yufu_test.db/credit_data/"
save_spark_pipeline_model(pipeline_list, sc, 'spark_pipeline_pm', path)

5.整体示例

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
# spark session 初始化


conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)

data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)

# 转为pyspark的dataframe
data = spark.createDataFrame(data)

x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))

# 特征处理1,col中不应该包含标签列
class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def get_spark_model_info(self):
        """
        该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
        以及对应的模型,没有使用到pyspark模型,则为返回空字典
        """
        return {}

    def transform(self, data):
         # transform输入data必须为pyaprk.sql.DataFrame
        for col in self.col:
            data = data.withColumn(f'{col}_pow', data[col] ** 2)
        return data


feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)


vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)

from pyspark.ml.feature import MinMaxScaler

min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)

# 特征处理2,引用了pyspark.ml中的模型,col中不应该包含标签列
class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.scale_model = model

    def get_spark_model_info(self):
        """
        该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
        以及对应的模型,没有使用到pyspark模型,则为返回空字典
        """
        return {'scale_model': self.scale_model}

    def transform(self, data: pd.DataFrame):
        # transform输入data必须为pyaprk.sql.DataFrame
        from pyspark.ml.feature import VectorAssembler
        vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
        data = vecAssembler.transform(data).drop(*self.col)
        data = self.scale_model.transform(data).drop('features')
        return data


std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                                             'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

data = std_feature_model.transform(data)

# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
                     labelCol=label)

gbdt_model = gbdt.fit(data)

# 创建预测模型,col中不应该包含标签列
class GBDTModel:
    def __init__(self, model, col):
        self.gbdt_model = model
        self.col = col

    def get_spark_model_info(self):
        """
        该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
        以及对应的模型,没有使用到pyspark模型,则为返回空字典
        """
        return {'gbdt_model': self.gbdt_model}

    def transform(self, data: DataFrame):
         # transform输入data必须为pyaprk.sql.DataFrame
        df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
        return df_res

predict_model = GBDTModel(gbdt_model, 'scale_features')

pipeline_list = [feat_square, std_feature_model, predict_model]

from aiworks_plugins.hdfs_plugins import save_spark_pipeline_to_hdfs

save_spark_pipeline_to_hdfs(pipeline_list, sc, model_name='spark_pipeline_pm')
# pipeline_list必须为list,sc为SparkContext, 'spark_pipeline_pm'为保存模型的名称

该算法pipeline部署后,可用以下参数进行模型调用,进行模型调试:

# 部署预测时输入的参数
{
    "sepal length (cm)":5.9,
    "sepal width (cm)":3,
    "petal length (cm)":5.1,
    "petal width (cm)":1.8
}
# 或者
{
    "sepal length (cm)":5.1,
    "sepal width (cm)":3.5,
    "petal length (cm)":1.4,
    "petal width (cm)":0.2
}