模型在线部署
什么是模型在线部署
模型在线部署,指将训练好的单个机器学习模型或模型Pipline保存至HDFS,并以Restful API形式对外提供模型服务。通过API传入入参,系统进行模型运行,将运行结果返回给调用方,实现模型的在线预测。
Notebook模型保存
Notebook模型在线部署的第一步是进行模型保存,需先将模型保存至HDFS中。
AIWorks的Notebook部署系统提供封装好的模型保存函数,用户只需在在代码中调用该函数,便可进行Noteboook的模型保存。
目前封装以下函数:
-
单个模型保存
-
save_python_model_to_hdfs():Python模型部署函数;
-
save_pyspark_model_to_hdfs():Pyspark模型部署函数;
-
save_tf_model_to_hdfs():TensorFlow模型部署函数;
-
save_keras_model_to_hdfs():Keras模型部署函数;
-
save_pytorch_model_to_hdfs():PyTorch模型部署函数;
-
-
pipeline保存
-
save_python_pipeline_to_hdfs():Python模型部署函数;
-
save_pyspark_pipeline__to_hdfs():Pyspark模型部署函数;
-
Notebook中,pipeline的部署只封装了Python与Pyspark,深度学习的任务暂未封装pipeline保存函数; |
Notebook中,WEB编辑模式的代码支持模型在线部署,资源上传模式代码暂不支持。 |
Python单个模型保存
from aiworks_plugins.hdfs_plugins import save_python_model_to_hdfs
save_python_model_to_hdfs(model_name)
代码示例:
import pandas as pd
from sklearn import linear_model
from sklearn.datasets import load_iris
# 建模
iris = load_iris()
x = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target
lr = linear_model.LinearRegression()
lr.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)']], y)
class TestModel:
def __init__(self, model, col):
self.model = model
self.col = col
def predict(self, data: pd.DataFrame):
return self.model.predict(data[self.col])
my_model = TestModel(model=lr, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)'])
# 保存模型
from aiworks_plugins.hdfs_plugins import save_python_model_to_hdfs
save_python_model_to_hdfs(my_model)
PySpark单个模型保存
from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs
save_pyspark_model_to_hdfs(model,sc)
代码示例:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
# spark session 初始化
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
# 训练数据
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)
# 转为pyspark的dataframe
data = spark.createDataFrame(data)
# 转为向量形式
vecAssembler = VectorAssembler(inputCols=col, outputCol="features")
train_data = vecAssembler.transform(data).select(['features', label])
# 定义模型并训练
gbdt = GBTClassifier(featuresCol='features',
labelCol=label)
gbdt_model = gbdt.fit(train_data)
class TestModel:
def __init__(self, model, col):
self.model = model
self.col = col
# 下面一行不可缺少
self.__estimator__ = str(model.__class__).split('.')[-1].split("""'""")[0]
def predict(self, data: pd.DataFrame):
data = data[self.col]
data = spark.createDataFrame(data)
vecAssembler = VectorAssembler(inputCols=self.col,
outputCol="features")
test_data = vecAssembler.transform(data).select(['features'])
df_res = self.model.transform(test_data).select('prediction').toPandas()
return df_res
model = TestModel(model=gbdt_model,
col=['sepal length (cm)', 'sepal width (cm)',
'petal length (cm)', 'petal width (cm)'])
# 保存模型
from aiworks_plugins.hdfs_plugins import save_pyspark_model_to_hdfs
save_pyspark_model_to_hdfs(model,sc)
TensorFlow单个模型保存
from aiworks_plugins.tf_plugins import save_tf_model_to_hdfs
save_tf_model_to_hdfs(sess, ["output", "loss"])
代码示例:
import tensorflow as tf
from sklearn.datasets import load_breast_cancer
tf.reset_default_graph()
bc = load_breast_cancer()
x = tf.placeholder(tf.float32, [None, 30], name="x")
y = tf.placeholder(tf.int32, [None], name="y")
out = tf.layers.dense(x, 32, activation=tf.nn.relu)
out = tf.layers.dense(out, 32, activation=tf.nn.relu)
logits = tf.layers.dense(out, 2)
out = tf.nn.softmax(logits)
out = tf.argmax(out, axis=1, name="output")
loss = tf.losses.sparse_softmax_cross_entropy(y, logits)
loss = tf.identity(loss, "loss")
optimizer = tf.train.AdamOptimizer(0.0001).minimize(loss)
sess = tf.Session()
sess.run(tf.global_variables_initializer())
for i in range(1000):
sess.run([optimizer], feed_dict={x: bc.data, y: bc.target})
# 保存模型
from aiworks_plugins.tf_plugins import save_tf_model_to_hdfs
save_tf_model_to_hdfs(sess, ["output", "loss"])
Keras单个模型保存
from aiworks_plugins.tf_plugins import save_keras_model_to_hdfs
save_keras_model_to_hdfs(network)
代码示例:
from sklearn import datasets
from keras import models
from keras import layers
from keras.utils import to_categorical
digits = datasets.load_digits()
n_samples = len(digits.images)
data = digits.images.reshape((n_samples, -1))
train_images = data[:n_samples // 2]
train_images = train_images.astype('float32') / 255
test_images = data[n_samples // 2:]
test_images = test_images.astype('float32') / 255
train_labels = to_categorical(digits.target[:n_samples // 2])
test_labels = to_categorical(digits.target[n_samples // 2:])
network = models.Sequential()
network.add(layers.Dense(35, activation='relu', input_shape=(8 * 8,)))
network.add(layers.Dense(10, activation='softmax'))
network.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])
network.fit(train_images, train_labels, epochs=300, batch_size=100,verbose=0)
# 保存模型
from aiworks_plugins.tf_plugins import save_keras_model_to_hdfs
save_keras_model_to_hdfs(network)
PyTorch单个模型保存
from aiworks_plugins.tf_plugins import save_torch_model_to_hdfs
save_torch_model_to_hdfs(net,torch.Tensor(1,30))
代码示例:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
#network init.
class pNet(nn.Module):
def __init__(self):
super(pNet, self).__init__()
self.l1 = nn.Linear(30, 60)
self.a1 = nn.Sigmoid()
self.l2 = nn.Linear(60, 2)
self.a2 = nn.ReLU()
self.l3 = nn.Softmax(dim=1)
def forward(self, x):
x = self.l1(x)
x = self.a1(x)
x = self.l2(x)
x = self.a2(x)
x = self.l3(x)
return x
net = pNet()
#data load.
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()
x_train = breast_cancer.data
y_train = breast_cancer.target
#training network.
epochs = 500
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.005) # PyTorch suit to tiny learning rate
for epoch in range(epochs):
optimizer.zero_grad()
y_pred = net(x_train)
loss = criterion(y_pred, y_train)
loss.backward()
optimizer.step()
#save model.
from aiworks_plugins.tf_plugins import save_torch_model_to_hdfs
save_torch_model_to_hdfs(net,torch.Tensor(1,30))
Python的pipeline保存
Notebook中Python代码的pipeline构建过程如下:
1.特征工程的封装
由于特征工程需要对特定列进行处理,因此需要对其进行封装。在特征工程封装中必须实现transform过程。封装的过程如下:
ⅰ. 构造一个类(类名是任意的); + ⅱ. 实现transform接口; + ⅲ. transform接口必须接收一个pandas的DataFrame对象; +
举例:
(1)对于自己写的特征工程方法的封装,例如二次函数转化;
class FeatureSquare:
def __init__(self, col):
self.col = col
def transform(self, data):
for col in self.col:
data[f'{col}_pow'] = data[col].apply(lambda x: x**2)
return data
feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
(2)对于调用其他库特征工程方法的封装,例如标准化处理;
class FeatureScale:
def __init__(self, model, col):
self.col = col
self.model = model
def transform(self, data: pd.DataFrame):
data[self.col] = self.model.transform(data[self.col])
return data
from sklearn.preprocessing import MinMaxScaler
stdscale = MinMaxScaler()
stdscale.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']])
std_feature_model = FeatureScale(stdscale, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
2.预测工程的封装
自定义的python模型部署后,会调用此模型的"predict"接口,因此必须在自定的python模型中实现predict接口,为了统一api,还需要指定预测的列名,以及predict接口需要传入pandas的dataframe。总结一下:
ⅰ. 构造一个类(类名是任意的); + ⅱ. 实现predict接口; + ⅲ. predict接口必须接收一个pandas的DataFrame对象; +
举例:
class PredictModel:
def __init__(self, model, col):
self.model = model
self.col = col
def predict(self, data: pd.DataFrame):
return self.model.predict(data[self.col])
rf = RandomForestClassifier()
rf.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']], y)
predict_model = PredictModel(model=rf, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
3.整理流程封装
pipeline_model = [f2i, std_feature_model, predict_model]
from aiworks_plugins import save_python_pipeline_model_to_hdfs
save_python_pipeline_model_to_hdfs(pipeline_model)
4.整体实例举例
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
# 建模
iris = load_iris()
x = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target
# 特征处理1,col中不应该包含标签列
class FeatureSquare:
def __init__(self, col):
self.col = col
def transform(self, data):
for col in self.col:
data[f'{col}_pow'] = data[col].apply(lambda x: x**2)
return data
feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
x = feat_square.transform(x)
# 特征处理2,引用了sklearn中的模型,col中不应该包含标签列
class FeatureScale:
def __init__(self, model, col):
self.col = col
self.model = model
def transform(self, data: pd.DataFrame):
data[self.col] = self.model.transform(data[self.col])
return data
from sklearn.preprocessing import MinMaxScaler
stdscale = MinMaxScaler()
stdscale.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']])
std_feature_model = FeatureScale(stdscale, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
x = std_feature_model.transform(x)
# 创建预测模型,col中不应该包含标签列
class PredictModel:
def __init__(self, model, col):
self.model = model
self.col = col
def predict(self, data: pd.DataFrame):
return self.model.predict(data[self.col])
rf = RandomForestClassifier()
rf.fit(x[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow']], y)
predict_model = PredictModel(model=rf, col=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
# 构建流程表, 按照特征处理的顺序构建
pipeline_model = [feat_square, std_feature_model, predict_model]
from aiworks_plugins.hdfs_plugins import save_python_pipeline_to_hdfs
save_python_pipeline_to_hdfs(pipeline_model, model_name='rf_pipeline')
该算法Pipeline部署后,进行模型预测,可输入以下参数进行模型测试:
# 部署预测时输入的代码块
{"sepal length (cm)" : 5.9, "sepal width (cm)": 3.0, "petal length (cm)": 5.1, "petal width (cm)": 1.8}
# 或者
{"sepal length (cm)" : 5.1, "sepal width (cm)": 3.5, "petal length (cm)": 1.4, "petal width (cm)": 0.2}
Pyspark的pipeline保存
1.数据处理
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
from aiworks_plugins.hdfs_plugins.saved_model_pyspark import save_PickleFile_spark
# spark session 初始化
import re
import dill
import os
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)
# 转为pyspark的dataframe
data = spark.createDataFrame(data)
x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))
2.特征工程封装
由于特征工程需要对特定列进行处理,因此需要对其进行封装。特征工程封装中必须实现以下几点:
-
必须要有transform过程;
-
如果有用到pyspark库中ml中机器学习的类,必须要是训练完成的类;
-
必须实现get_spark_model_info函数,需要返回使用到的pyspark.ml模型在该类中的属性名称、以及对应的模型,没有使用到pyspark模型,则为返回空字典。VectorAssembler除外。
封装的过程如下:
ⅰ. 构造一个类(类名是任意的); + ⅱ. 实现transform接口; + ⅲ. transform接口必须接收一个pyspark.sql.DataFrame对象; + ⅳ. 实现get_spark_model_info,返回相应字典; +
下面展示特征工程封装的例子。
(1)对于自己写的特征工程方法封装,如二次函数转化:
class FeatureSquare:
def __init__(self, col):
self.col = col
def get_spark_model_info(self): #该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,以及对应的模型,没有使用到pyspark模型,则为返回空字典
return {}
def transform(self, data):
for col in self.col:
data = data.withColumn(f'{col}_pow', data[col] ** 2)
return data
feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)
(2)对于调用其他库特征工程方法的封装,如此处引用的SaparkMillib中的MinMaxScaler模型;
vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)
from pyspark.ml.feature import MinMaxScaler
min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)
class FeatureScale:
def __init__(self, model, col):
self.col = col
self.scale_model = model
def get_spark_model_info(self):
return {'scale_model': self.scale_model}
def transform(self, data: pd.DataFrame):
vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
data = vecAssembler.transform(data).drop(*self.col)
data = self.scale_model.transform(data).drop('features')
return data
std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
data = std_feature_model.transform(data)
3.预测模型封装
封装的过程如下:
ⅰ. 构造一个类(类名是任意的); + ⅱ. 实现predict接口; + ⅲ. predict接口必须接收一个pyspark.sql.DataFrame对象; + ⅳ. 实现get_spark_model_info,返回相应字典。 +
# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
labelCol=label)
gbdt_model = gbdt.fit(data)
class GBDTModel:
def __init__(self, model, col):
self.gbdt_model = model
self.col = col
def get_spark_model_info(self):
return {'gbdt_model': self.gbdt_model}
def transform(self, data: DataFrame):
df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
return df_res
predict_model = GBDTModel(gbdt_model, 'scale_features')
4.组成链路
将各个封装好的特征处理和预测模型按照顺序组成List。
pipeline_list = [feat_square, std_feature_model, predict_model]
path="/user/hive/warehouse/yufu_test.db/credit_data/"
save_spark_pipeline_model(pipeline_list, sc, 'spark_pipeline_pm', path)
5.整体示例
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from sklearn.datasets import load_iris
import pandas as pd
# spark session 初始化
conf = SparkConf().setAppName("pyspark_test")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
iris = load_iris()
col = iris.feature_names
label = 'label'
data = pd.DataFrame(iris.data, columns=col)
data[label] = iris.target
data[label] = data[label].map(lambda x: 1 if x > 1 else 0)
# 转为pyspark的dataframe
data = spark.createDataFrame(data)
x =spark.createDataFrame(pd.DataFrame(iris.data, columns=col))
# 特征处理1,col中不应该包含标签列
class FeatureSquare:
def __init__(self, col):
self.col = col
def get_spark_model_info(self):
"""
该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
以及对应的模型,没有使用到pyspark模型,则为返回空字典
"""
return {}
def transform(self, data):
# transform输入data必须为pyaprk.sql.DataFrame
for col in self.col:
data = data.withColumn(f'{col}_pow', data[col] ** 2)
return data
feat_square = FeatureSquare(['sepal length (cm)', 'sepal width (cm)'])
data = feat_square.transform(data)
vecAssembler = VectorAssembler(inputCols=data.columns, outputCol="features")
data_assemble = vecAssembler.transform(data)
from pyspark.ml.feature import MinMaxScaler
min_max_scale = MinMaxScaler(inputCol='features', outputCol='scale_features')
min_max_model = min_max_scale.fit(data_assemble)
# 特征处理2,引用了pyspark.ml中的模型,col中不应该包含标签列
class FeatureScale:
def __init__(self, model, col):
self.col = col
self.scale_model = model
def get_spark_model_info(self):
"""
该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
以及对应的模型,没有使用到pyspark模型,则为返回空字典
"""
return {'scale_model': self.scale_model}
def transform(self, data: pd.DataFrame):
# transform输入data必须为pyaprk.sql.DataFrame
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=self.col, outputCol="features")
data = vecAssembler.transform(data).drop(*self.col)
data = self.scale_model.transform(data).drop('features')
return data
std_feature_model = FeatureScale(min_max_model, ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)',
'petal width (cm)', 'sepal length (cm)_pow', 'sepal width (cm)_pow'])
data = std_feature_model.transform(data)
# 定义模型并训练
gbdt = GBTClassifier(featuresCol='scale_features',
labelCol=label)
gbdt_model = gbdt.fit(data)
# 创建预测模型,col中不应该包含标签列
class GBDTModel:
def __init__(self, model, col):
self.gbdt_model = model
self.col = col
def get_spark_model_info(self):
"""
该函数必须要实现,需要返回使用到的pyspark模型再类中的属性名称,
以及对应的模型,没有使用到pyspark模型,则为返回空字典
"""
return {'gbdt_model': self.gbdt_model}
def transform(self, data: DataFrame):
# transform输入data必须为pyaprk.sql.DataFrame
df_res = self.gbdt_model.transform(data).select('prediction').toPandas()
return df_res
predict_model = GBDTModel(gbdt_model, 'scale_features')
pipeline_list = [feat_square, std_feature_model, predict_model]
from aiworks_plugins.hdfs_plugins import save_spark_pipeline_to_hdfs
save_spark_pipeline_to_hdfs(pipeline_list, sc, model_name='spark_pipeline_pm')
# pipeline_list必须为list,sc为SparkContext, 'spark_pipeline_pm'为保存模型的名称
该算法pipeline部署后,可用以下参数进行模型调用,进行模型调试:
# 部署预测时输入的参数
{
"sepal length (cm)":5.9,
"sepal width (cm)":3,
"petal length (cm)":5.1,
"petal width (cm)":1.8
}
# 或者
{
"sepal length (cm)":5.1,
"sepal width (cm)":3.5,
"petal length (cm)":1.4,
"petal width (cm)":0.2
}
Notebook模型部署
模型保存后,接下来进入模型部署。
步骤一: 在Notebook编辑页面操作栏点击模型在线部署按钮,进入部署流程;+
步骤二:选择部署模型、部署类型,定义模型名称与描述。
-
部署类型:包含新建模型服务与增加已有模型服务版本。
-
新建模型服务:即新增模型,需定义模型名称与模型描述。
-
模型名称:不超过32个字符,支持字母、数字与下划线;
-
模型描述:不超过200个字符,除空格外,不限制其他字符格式;
-
-
增加已有模型服务版本:选择已部署的某个模型,在该模型上增加一个版本号,同个模型的调用地址一样,可选择哪个版本对外提供服务。
-
选择已有模型:选择单个已部署模型;
-
模型版本号:系统自增,标识该模型;
-
-
步骤三:模型部署成功后,系统将提供该模型的RestfulAPI,并可进入在模型管理页面进行模型调试,开始模型调用。