自定义Python模型

Notebook提供灵活的算法建模环境,用户可通过代码自定义实现模型。

Python单个模型构建

自定义Python模型部署后,会调用此模型的"predict"接口,因此必须在自定的python模型中实现predict接口,为了统一api,还需要指定预测的列名,以及predict接口需要传入pandas的dataframe。总结如下:

  1. 构造一个类,类名随意;

  2. 实现predict接口;

  3. predict接口必须接收一个pandas的DataFrame对象;

示例如下:

import pandas as pd
from aiworks_plugins.hdfs_plugins import save_python_model_to_hdfs


class TestModel:
  	def __init__(self):
      	pass
    def predict(self, data:pd.DataFrame):
      	pass

my_model = TestModel()
save_python_model_to_hdfs(my_model)

您可以实现自己的模型,也可以调用其他的Python依赖库去实现。下面是一个比较好的实现方法:

import pandas as pd
from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier

"""
假设你想用sklearn的gbdt去对iris建模,并且只选用iris数据集的前三个特征列。
"""
iris = load_iris()
x = pd.DataFrame(iris.data,columns=iris.feature_names)
y = iris.target

gbdt = GradientBoostingClassifier()
gbdt.fit(x[['sepal length (cm)', 'sepal width (cm)','petal length (cm)']], y)


class TestModel:
  	def __init__(self, model, col):
      	self.model = model
        self.col = col
    def predict(self, data:pd.DataFrame):
      	return self.model.predict(data[self.col])

my_model = TestModel(model=gbdt,col=['sepal length (cm)',
                                     'sepal width (cm)','petal length (cm)'])

# 保存模型
from aiworks_plugins.hdfs_plugins import save_python_model_to_hdfs
save_python_model_to_hdfs(my_model)

Python的pipeline构建

Notebook中Python代码的pipeline构建过程如下:

1.特征工程的封装

由于特征工程需要对特定列进行处理,因此需要对其进行封装。在特征工程封装中必须实现transform过程。封装的过程如下:
ⅰ. 构造一个类(类名是任意的);
ⅱ. 实现transform接口;
ⅲ. transform接口必须接收一个pandas的DataFrame对象;

举例:

(1)对于自己写的特征工程方法的封装,例如二次函数转化;

class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def transform(self, data):
        for col in self.col:
            data[f'{col}_pow'] = data[col].apply(lambda x: x**2)
        return data

feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])

(2)对于调用其他库特征工程方法的封装,例如标准化处理;

class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.model = model

    def transform(self, data: pd.DataFrame):
        data[self.col] = self.model.transform(data[self.col])
        return data

from sklearn.preprocessing import MinMaxScaler
stdscale = MinMaxScaler()
stdscale.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']])
std_feature_model = FeatureScale(stdscale, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                                             'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

2.预测工程的封装

自定义的python模型部署后,会调用此模型的"predict"接口,因此必须在自定的python模型中实现predict接口,为了统一api,还需要指定预测的列名,以及predict接口需要传入pandas的dataframe。总结一下:
ⅰ. 构造一个类(类名是任意的);
ⅱ. 实现predict接口;
ⅲ. predict接口必须接收一个pandas的DataFrame对象;

举例:
class PredictModel:
    def __init__(self, model, col):
        self.model = model
        self.col = col

    def predict(self, data: pd.DataFrame):
        return self.model.predict(data[self.col])
rf = RandomForestClassifier()
rf.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']], y)
predict_model = PredictModel(model=rf, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

3.整理流程封装

pipeline_model = [f2i, std_feature_model, predict_model]

from aiworks_plugins import save_python_pipeline_model_to_hdfs

save_python_pipeline_model_to_hdfs(pipeline_model)

4.整体实例举例

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

# 建模
iris = load_iris()
x = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target

# 特征处理1,col中不应该包含标签列
class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def transform(self, data):
        for col in self.col:
            data[f'{col}_pow'] = data[col].apply(lambda x: x**2)
        return data

feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])

x = feat_square.transform(x)

# 特征处理2,引用了sklearn中的模型,col中不应该包含标签列
class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.model = model

    def transform(self, data: pd.DataFrame):
        data[self.col] = self.model.transform(data[self.col])
        return data

from sklearn.preprocessing import MinMaxScaler
stdscale = MinMaxScaler()
stdscale.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']])
std_feature_model = FeatureScale(stdscale, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                                             'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
x = std_feature_model.transform(x)


# 创建预测模型,col中不应该包含标签列
class PredictModel:
    def __init__(self, model, col):
        self.model = model
        self.col = col

    def predict(self, data: pd.DataFrame):
        return self.model.predict(data[self.col])
rf = RandomForestClassifier()
rf.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']], y)
predict_model = PredictModel(model=rf, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

# 构建流程表, 按照特征处理的顺序构建
pipeline_model = [feat_square, std_feature_model, predict_model]

from aiworks_plugins.hdfs_plugins import save_python_pipeline_to_hdfs
save_python_pipeline_to_hdfs(pipeline_model, model_name='rf_pipeline')

该算法Pipeline部署后,进行模型预测,可输入以下参数进行模型测试:

# 部署预测时输入的代码块
{"sepal length (cm)" : 5.9, "sepal width (cm)": 3.0, "petal length (cm)": 5.1, "petal width (cm)": 1.8}
# 或者
{"sepal length (cm)" : 5.1, "sepal width (cm)": 3.5, "petal length (cm)": 1.4, "petal width (cm)": 0.2}