模型在线部署

什么是模型在线部署

模型在线部署,指将训练好的单个机器学习模型或模型Pipline保存至HDFS,并以Restful API形式对外提供模型服务。通过API传入入参,系统进行模型运行,将运行结果返回给调用方,实现模型的在线预测。

Notebook模型保存

Notebook模型在线部署的第一步是进行模型保存,需先将模型保存至HDFS中。

AIWorks的Notebook部署系统提供封装好的模型保存函数,用户只需在在代码中调用该函数,便可进行Noteboook的模型保存。

目前封装以下函数:

  1. 单个模型保存

    • save_python_model_to_hdfs():Python模型部署函数;

    • save_pyspark_model_to_hdfs():Pyspark模型部署函数;

    • save_tf_model_to_hdfs():TensorFlow模型部署函数;

    • save_keras_model_to_hdfs():Keras模型部署函数;

    • save_pytorch_model_to_hdfs():PyTorch模型部署函数;

  2. pipeline保存

    • save_python_pipeline_to_hdfs():Python模型部署函数;

    • save_pyspark_pipeline__to_hdfs():Pyspark模型部署函数;

Notebook中,pipeline的部署只封装了Python与Pyspark,深度学习的任务暂未封装pipeline保存函数;
Notebook中,WEB编辑模式的代码支持模型在线部署,资源上传模式代码暂不支持。

Python单个模型保存

from aiworks_plugins.hdfs_plugins import save_python_model_to_hdfs
save_python_model_to_hdfs(model_name)

代码示例:

import pandas as pd
from sklearn import linear_model
from sklearn.datasets import load_iris

# 建模
iris = load_iris()
x = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target

lr = linear_model.LinearRegression()
lr.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)']], y)

class TestModel:
    def __init__(self, model, col):
        self.model = model
        self.col = col

    def predict(self, data: pd.DataFrame):
        return self.model.predict(data[self.col])

my_model = TestModel(model=lr, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)'])

# 保存模型
from aiworks_plugins.hdfs_plugins import save_python_model_to_hdfs
save_python_model_to_hdfs(my_model)

PySpark单个模型保存

from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs
save_pyspark_model_to_hdfs(model,sc)

代码示例:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd

# spark session 初始化
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

# 训练数据
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)

# 转为pyspark的dataframe
data = spark.createDataFrame(data)
# 转为向量形式
vecAssembler = VectorAssembler(inputCols=col, outputCol="features")
train_data = vecAssembler.transform(data).select(['features', label])

# 定义模型并训练
gbdt = GBTClassifier(featuresCol='features',
                     labelCol=label)
gbdt_model = gbdt.fit(train_data)


class TestModel:
    def __init__(self, model, col):
        self.model = model
        self.col = col
        # 下面一行不可缺少
        self.__estimator__ = str(model.__class__).split('.')[-1].split("""'""")[0]

    def predict(self, data: pd.DataFrame):
        data = data[self.col]
        data = spark.createDataFrame(data)

        vecAssembler = VectorAssembler(inputCols=self.col,
                                       outputCol="features")
        test_data = vecAssembler.transform(data).select(['features'])

        df_res = self.model.transform(test_data).select('prediction').toPandas()
        return df_res


model = TestModel(model=gbdt_model,
                  col=['sepal length (cm)', 'sepal width (cm)',
                       'petal length (cm)', 'petal width (cm)'])

# 保存模型
from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs
save_pyspark_model_to_hdfs(model,sc)

TensorFlow单个模型保存

from aiworks_plugins.tf_plugins import save_tf_model_to_hdfs
save_tf_model_to_hdfs(sess, ["output", "loss"])

代码示例:

import tensorflow as tf
from sklearn.datasets import load_breast_cancer

tf.reset_default_graph()

bc = load_breast_cancer()

x = tf.placeholder(tf.float32, [None, 30], name="x")
y = tf.placeholder(tf.int32, [None], name="y")
out = tf.layers.dense(x, 32, activation=tf.nn.relu)
out = tf.layers.dense(out, 32, activation=tf.nn.relu)
logits = tf.layers.dense(out, 2)
out = tf.nn.softmax(logits)
out = tf.argmax(out, axis=1, name="output")

loss = tf.losses.sparse_softmax_cross_entropy(y, logits)
loss = tf.identity(loss, "loss")

optimizer = tf.train.AdamOptimizer(0.0001).minimize(loss)

sess = tf.Session()
sess.run(tf.global_variables_initializer())

for i in range(1000):
    sess.run([optimizer], feed_dict={x: bc.data, y: bc.target})

# 保存模型
from aiworks_plugins.tf_plugins import save_tf_model_to_hdfs
save_tf_model_to_hdfs(sess, ["output", "loss"])

Keras单个模型保存

from aiworks_plugins.tf_plugins import save_keras_model_to_hdfs
save_keras_model_to_hdfs(network)

代码示例:

from sklearn import datasets
from keras import models
from keras import layers
from keras.utils import to_categorical

digits = datasets.load_digits()
n_samples = len(digits.images)
data = digits.images.reshape((n_samples, -1))

train_images = data[:n_samples // 2]
train_images = train_images.astype('float32') / 255

test_images = data[n_samples // 2:]
test_images = test_images.astype('float32') / 255

train_labels = to_categorical(digits.target[:n_samples // 2])

test_labels = to_categorical(digits.target[n_samples // 2:])
network = models.Sequential()
network.add(layers.Dense(35, activation='relu', input_shape=(8 * 8,)))
network.add(layers.Dense(10, activation='softmax'))
network.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])
network.fit(train_images, train_labels, epochs=300, batch_size=100,verbose=0)

# 保存模型
from aiworks_plugins.tf_plugins import save_keras_model_to_hdfs
save_keras_model_to_hdfs(network)

PyTorch单个模型保存

from aiworks_plugins.tf_plugins import save_torch_model_to_hdfs
save_torch_model_to_hdfs(net,torch.Tensor(1,30))

代码示例:

import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np

#network init.
class pNet(nn.Module):
    def __init__(self):
        super(pNet, self).__init__()
        self.l1 = nn.Linear(30, 60)
        self.a1 = nn.Sigmoid()
        self.l2 = nn.Linear(60, 2)
        self.a2 = nn.ReLU()
        self.l3 = nn.Softmax(dim=1)
    def forward(self, x):
        x = self.l1(x)
        x = self.a1(x)
        x = self.l2(x)
        x = self.a2(x)
        x = self.l3(x)
        return x
net = pNet()

#data load.
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()
x_train = breast_cancer.data
y_train = breast_cancer.target

#training network.
epochs = 500
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.005)  # PyTorch suit to tiny learning rate
for epoch in range(epochs):
    optimizer.zero_grad()
    y_pred = net(x_train)
    loss = criterion(y_pred, y_train)
    loss.backward()
    optimizer.step()

#save model.
from aiworks_plugins.tf_plugins import save_torch_model_to_hdfs
save_torch_model_to_hdfs(net,torch.Tensor(1,30))

Python的pipeline保存

Notebook中Python代码的pipeline构建过程如下:

1.特征工程的封装

由于特征工程需要对特定列进行处理,因此需要对其进行封装。在特征工程封装中必须实现transform过程。封装的过程如下:

ⅰ. 构造一个类(类名是任意的); +
ⅱ. 实现transform接口;  +
ⅲ. transform接口必须接收一个pandas的DataFrame对象; +

举例:

(1)对于自己写的特征工程方法的封装,例如二次函数转化;

class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def transform(self, data):
        for col in self.col:
            data[f'{col}_pow'] = data[col].apply(lambda x: x**2)
        return data

feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])

(2)对于调用其他库特征工程方法的封装,例如标准化处理;

class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.model = model

    def transform(self, data: pd.DataFrame):
        data[self.col] = self.model.transform(data[self.col])
        return data

from sklearn.preprocessing import MinMaxScaler
stdscale = MinMaxScaler()
stdscale.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']])
std_feature_model = FeatureScale(stdscale, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                                             'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

2.预测工程的封装

自定义的python模型部署后,会调用此模型的"predict"接口,因此必须在自定的python模型中实现predict接口,为了统一api,还需要指定预测的列名,以及predict接口需要传入pandas的dataframe。总结一下:

ⅰ. 构造一个类(类名是任意的); +
ⅱ. 实现predict接口; +
ⅲ. predict接口必须接收一个pandas的DataFrame对象; +
举例:
class PredictModel:
    def __init__(self, model, col):
        self.model = model
        self.col = col

    def predict(self, data: pd.DataFrame):
        return self.model.predict(data[self.col])
rf = RandomForestClassifier()
rf.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']], y)
predict_model = PredictModel(model=rf, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

3.整理流程封装

pipeline_model = [f2i, std_feature_model, predict_model]

from aiworks_plugins import save_python_pipeline_model_to_hdfs

save_python_pipeline_model_to_hdfs(pipeline_model)

4.整体实例举例

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

# 建模
iris = load_iris()
x = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target

# 特征处理1,col中不应该包含标签列
class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def transform(self, data):
        for col in self.col:
            data[f'{col}_pow'] = data[col].apply(lambda x: x**2)
        return data

feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])

x = feat_square.transform(x)

# 特征处理2,引用了sklearn中的模型,col中不应该包含标签列
class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.model = model

    def transform(self, data: pd.DataFrame):
        data[self.col] = self.model.transform(data[self.col])
        return data

from sklearn.preprocessing import MinMaxScaler
stdscale = MinMaxScaler()
stdscale.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']])
std_feature_model = FeatureScale(stdscale, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                                             'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
x = std_feature_model.transform(x)


# 创建预测模型,col中不应该包含标签列
class PredictModel:
    def __init__(self, model, col):
        self.model = model
        self.col = col

    def predict(self, data: pd.DataFrame):
        return self.model.predict(data[self.col])
rf = RandomForestClassifier()
rf.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']], y)
predict_model = PredictModel(model=rf, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

# 构建流程表, 按照特征处理的顺序构建
pipeline_model = [feat_square, std_feature_model, predict_model]

from aiworks_plugins.hdfs_plugins import save_python_pipeline_to_hdfs
save_python_pipeline_to_hdfs(pipeline_model, model_name='rf_pipeline')

该算法Pipeline部署后,进行模型预测,可输入以下参数进行模型测试:

# 部署预测时输入的代码块
{"sepal length (cm)" : 5.9, "sepal width (cm)": 3.0, "petal length (cm)": 5.1, "petal width (cm)": 1.8}
# 或者
{"sepal length (cm)" : 5.1, "sepal width (cm)": 3.5, "petal length (cm)": 1.4, "petal width (cm)": 0.2}

Pyspark的pipeline保存

1.数据处理

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
from aiworks_plugins.hdfs_plugins.saved_model_pyspark import save_PickleFile_spark
# spark session 初始化
import re
import dill
import os


conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)

data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)

# 转为pyspark的dataframe
data = spark.createDataFrame(data)

x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))

2.特征工程封装

由于特征工程需要对特定列进行处理,因此需要对其进行封装。特征工程封装中必须实现以下几点:

  • 必须要有transform过程;

  • 如果有用到pyspark库中ml中机器学习的类,必须要是训练完成的类;

  • 必须实现get_spark_model_info函数,需要返回使用到的pyspark.ml模型在该类中的属性名称、以及对应的模型,没有使用到pyspark模型,则为返回空字典。VectorAssembler除外。

封装的过程如下:

ⅰ. 构造一个类(类名是任意的); +
ⅱ. 实现transform接口; +
ⅲ. transform接口必须接收一个pyspark.sql.DataFrame对象; +
ⅳ. 实现get_spark_model_info,返回相应字典; +

下面展示特征工程封装的例子。

(1)对于自己写的特征工程方法封装,如二次函数转化:

class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def get_spark_model_info(self): #该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,以及对应的模型,没有使用到pyspark模型,则为返回空字典
        return {}

    def transform(self, data):
        for col in self.col:
            data = data.withColumn(f'{col}_pow', data[col] ** 2)
        return data


feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)

(2)对于调用其他库特征工程方法的封装,如此处引用的SaparkMillib中的MinMaxScaler模型;

vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)

from pyspark.ml.feature import MinMaxScaler

min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)


class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.scale_model = model

    def get_spark_model_info(self):
        return {'scale_model': self.scale_model}

    def transform(self, data: pd.DataFrame):
        vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
        data = vecAssembler.transform(data).drop(*self.col)
        data = self.scale_model.transform(data).drop('features')
        return data


std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

data = std_feature_model.transform(data)

3.预测模型封装

封装的过程如下:

ⅰ. 构造一个类(类名是任意的); +
ⅱ. 实现predict接口; +
ⅲ. predict接口必须接收一个pyspark.sql.DataFrame对象; +
ⅳ. 实现get_spark_model_info,返回相应字典。 +
# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
                     labelCol=label)

gbdt_model = gbdt.fit(data)


class GBDTModel:
    def __init__(self, model, col):
        self.gbdt_model = model
        self.col = col

    def get_spark_model_info(self):
        return {'gbdt_model': self.gbdt_model}

    def transform(self, data: DataFrame):
        df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
        return df_res

predict_model = GBDTModel(gbdt_model, 'scale_features')

4.组成链路

将各个封装好的特征处理和预测模型按照顺序组成List。

pipeline_list = [feat_square, std_feature_model, predict_model]
path="/user/hive/warehouse/yufu_test.db/credit_data/"
save_spark_pipeline_model(pipeline_list, sc, 'spark_pipeline_pm', path)

5.整体示例

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
# spark session 初始化


conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)

data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)

# 转为pyspark的dataframe
data = spark.createDataFrame(data)

x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))

# 特征处理1,col中不应该包含标签列
class FeatureSquare:

    def __init__(self, col):
        self.col = col

    def get_spark_model_info(self):
        """
        该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
        以及对应的模型,没有使用到pyspark模型,则为返回空字典
        """
        return {}

    def transform(self, data):
         # transform输入data必须为pyaprk.sql.DataFrame
        for col in self.col:
            data = data.withColumn(f'{col}_pow', data[col] ** 2)
        return data


feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)


vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)

from pyspark.ml.feature import MinMaxScaler

min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)

# 特征处理2,引用了pyspark.ml中的模型,col中不应该包含标签列
class FeatureScale:
    def __init__(self, model, col):
        self.col = col
        self.scale_model = model

    def get_spark_model_info(self):
        """
        该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
        以及对应的模型,没有使用到pyspark模型,则为返回空字典
        """
        return {'scale_model': self.scale_model}

    def transform(self, data: pd.DataFrame):
        # transform输入data必须为pyaprk.sql.DataFrame
        from pyspark.ml.feature import VectorAssembler
        vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
        data = vecAssembler.transform(data).drop(*self.col)
        data = self.scale_model.transform(data).drop('features')
        return data


std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
                                             'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])

data = std_feature_model.transform(data)

# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
                     labelCol=label)

gbdt_model = gbdt.fit(data)

# 创建预测模型,col中不应该包含标签列
class GBDTModel:
    def __init__(self, model, col):
        self.gbdt_model = model
        self.col = col

    def get_spark_model_info(self):
        """
        该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
        以及对应的模型,没有使用到pyspark模型,则为返回空字典
        """
        return {'gbdt_model': self.gbdt_model}

    def transform(self, data: DataFrame):
         # transform输入data必须为pyaprk.sql.DataFrame
        df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
        return df_res

predict_model = GBDTModel(gbdt_model, 'scale_features')

pipeline_list = [feat_square, std_feature_model, predict_model]

from aiworks_plugins.hdfs_plugins import save_spark_pipeline_to_hdfs

save_spark_pipeline_to_hdfs(pipeline_list, sc, model_name='spark_pipeline_pm')
# pipeline_list必须为list,sc为SparkContext, 'spark_pipeline_pm'为保存模型的名称

该算法pipeline部署后,可用以下参数进行模型调用,进行模型调试:

# 部署预测时输入的参数
{
    "sepal length (cm)":5.9,
    "sepal width (cm)":3,
    "petal length (cm)":5.1,
    "petal width (cm)":1.8
}
# 或者
{
    "sepal length (cm)":5.1,
    "sepal width (cm)":3.5,
    "petal length (cm)":1.4,
    "petal width (cm)":0.2
}

Notebook模型部署

模型保存后,接下来进入模型部署。 步骤一: 在Notebook编辑页面操作栏点击模型在线部署按钮,进入部署流程;+ 步骤二:选择部署模型、部署类型,定义模型名称与描述。

  • 部署类型:包含新建模型服务与增加已有模型服务版本。

    • 新建模型服务:即新增模型,需定义模型名称与模型描述。

      • 模型名称:不超过32个字符,支持字母、数字与下划线;

      • 模型描述:不超过200个字符,除空格外,不限制其他字符格式;

    • 增加已有模型服务版本:选择已部署的某个模型,在该模型上增加一个版本号,同个模型的调用地址一样,可选择哪个版本对外提供服务。

      • 选择已有模型:选择单个已部署模型;

      • 模型版本号:系统自增,标识该模型;

步骤三:模型部署成功后,系统将提供该模型的RestfulAPI,并可进入在模型管理页面进行模型调试,开始模型调用。