谈谈机器学习模型的部署 - naughty的个人页面 - 开源中国

随着机器学习的广泛应用,如何高效的把训练好的机器学习的模型部署到生产环境,正在被越来越多的工具所支持。我们今天就来看一看不同的工具是如何解决这个问题的。

上图的过程是一个数据科学项目所要经历的典型的过程。从数据采集开始,经历数据分析,数据变形,数据验证,数据拆分,训练,模型创建,模型验证,大规模训练,模型发布,到提供服务,监控和日志。诸多的机器学习工具如Scikt-Learn,Spark, Tensorflow, MXnet, PyTorch提供给数据科学家们不同的选择,同时也给模型的部署带来了不同的挑战。

我们先来简单的看一看机器学习的模型是如何部署,它又会遇到那些挑战。

模型持久化

模型部署一般就是把训练的模型持久化,然后运行服务器加载模型,并提供REST或其它形式的服务接口。我们以RandomForestClassification为例,看一下Sklearn,Spark和Tensorflow是如何持久化模型。

Sklearn

我们使用Iris数据集,利用RandomForestClassifier分类。

from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.externals import joblib

data = load_iris()

X, y = data["data"], data["target"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42)

clf = RandomForestClassifier(max_depth=2, random_state=0)
clf.fit(X_train, y_train)

print(clf.feature_importances_)

print(classification_report(y_test, clf.predict(
    X_test), target_names=data["target_names"]))

joblib.dump(clf, 'classification.pkl')

训练的代码如上。这里模型导出的代码在最后一句。joblib.dump(),参考这里。Sklearn的模型到处本质上是利用Python的Pickle机制。Python的函数进行序列化,也就是说把训练好的Transformer函数序列化并存为文件。

要加载模型也很简单,只要调用joblib.load()就好了。

from sklearn.externals import joblib

from sklearn.datasets import load_iris
from sklearn.metrics import classification_report

data = load_iris()

X, y = data["data"], data["target"]

clf = joblib.load('classification.pkl')

print(clf.feature_importances_)
print(classification_report(y, clf.predict(
    X), target_names=data["target_names"]))

Sklearn对Pickle做了一下封装和优化,但这并不能解决Pickle本身的一些限制,例如:

  • 版本兼容问题,不同的Python,Pickle,Sklearn的版本,生成的序列化文件并不兼容
  • 安全性问题,例如序列化的文件中被人注入恶意代码
  • 扩展问题,你自己写了一个扩展类,无法序列化,或者你在Python中调用了C函数
  • 模型的管理,如果我生成了不同版本的模型,该如何管理

Spark

Spark的Pipeline和Model都支持Save到文件,然后可以很方便的在另一个Context中加载。

训练的代码如下:

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.sql.types import DoubleType

from pyspark import SparkFiles
from pyspark import SparkContext

url = "https://server/iris.csv"

spark.sparkContext.addFile(url)

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.csv(SparkFiles.get("iris.csv"), header=True)

data = data.withColumn("sepal_length", data["sepal_length"].cast(DoubleType()))
data = data.withColumn("sepal_width", data["sepal_width"].cast(DoubleType()))
data = data.withColumn("petal_width", data["petal_width"].cast(DoubleType()))
data = data.withColumn("petal_length", data["petal_length"].cast(DoubleType()))

#data.show()
data.printSchema()

assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_width", "petal_length"],
    outputCol="features")

output = assembler.transform(data)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="species", outputCol="indexedLabel").fit(output)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(output)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[assembler, labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "species", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[3]
print(rfModel)  # summary only

filebase="hdfs://server:9000/tmp"

pipeline.write().overwrite().save("{}/classification-pipeline".format(filebase))
model.write().overwrite().save("{}/classification-model".format(filebase))

模型加载的代码如下:

%pyspark
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

from pyspark import SparkFiles

url = "https://server/iris.csv"
spark.sparkContext.addFile(url)

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.csv(SparkFiles.get("iris.csv"), header=True)

data = data.withColumn("sepal_length", data["sepal_length"].cast(DoubleType()))
data = data.withColumn("sepal_width", data["sepal_width"].cast(DoubleType()))
data = data.withColumn("petal_width", data["petal_width"].cast(DoubleType()))
data = data.withColumn("petal_length", data["petal_length"].cast(DoubleType()))

filebase="hdfs://server:9000/tmp/"

pipeline = Pipeline.read().load("{}/classification-pipeline".format(filebase))
model = PipelineModel.read().load("{}/classification-model".format(filebase))

# Make predictions.
predictions = model.transform(data)

# Select example rows to display.
predictions.select("predictedLabel", "species", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

调用model的toDebugString方法可以看到分类器的内部细节。

RandomForestClassificationModel (uid=rfc_225ef4968bf9) with 10 trees
  Tree 0 (weight 1.0):
    If (feature 3 <= 1.9)
     Predict: 1.0
    Else (feature 3 > 1.9)
     If (feature 3 <= 4.7)
      Predict: 2.0
     Else (feature 3 > 4.7)
      If (feature 3 <= 5.1)
       If (feature 1 <= 2.5)
        Predict: 0.0
       Else (feature 1 > 2.5)
        If (feature 1 <= 2.7)
         Predict: 2.0
        Else (feature 1 > 2.7)
         Predict: 0.0
      Else (feature 3 > 5.1)
       Predict: 0.0
  Tree 1 (weight 1.0):
    If (feature 3 <= 1.9)
     Predict: 1.0
    Else (feature 3 > 1.9)
     If (feature 3 <= 4.9)
      If (feature 2 <= 1.6)
       Predict: 2.0
      Else (feature 2 > 1.6)
       If (feature 0 <= 4.9)
        Predict: 0.0
       Else (feature 0 > 4.9)
        If (feature 0 <= 5.9)
         Predict: 2.0
        Else (feature 0 > 5.9)
         Predict: 0.0
     Else (feature 3 > 4.9)
      If (feature 1 <= 3.0)
       If (feature 3 <= 5.1)
        If (feature 2 <= 1.7)
         Predict: 0.0
        Else (feature 2 > 1.7)
         Predict: 0.0
       Else (feature 3 > 5.1)
        Predict: 0.0
      Else (feature 1 > 3.0)
       Predict: 0.0
  Tree 2 (weight 1.0):
    If (feature 3 <= 1.9)
     Predict: 1.0
    Else (feature 3 > 1.9)
     If (feature 3 <= 5.0)
      If (feature 2 <= 1.6)
       Predict: 2.0
      Else (feature 2 > 1.6)
       If (feature 1 <= 2.5)
        Predict: 0.0
       Else (feature 1 > 2.5)
        Predict: 2.0
     Else (feature 3 > 5.0)
      If (feature 0 <= 6.0)
       If (feature 1 <= 2.7)
        If (feature 0 <= 5.8)
         Predict: 0.0
        Else (feature 0 > 5.8)
         Predict: 2.0
       Else (feature 1 > 2.7)
        Predict: 0.0
      Else (feature 0 > 6.0)
       Predict: 0.0
  Tree 3 (weight 1.0):
    If (feature 3 <= 1.9)
     Predict: 1.0
    Else (feature 3 > 1.9)
     If (feature 3 <= 4.9)
      If (feature 2 <= 1.5)
       Predict: 2.0
      Else (feature 2 > 1.5)
       If (feature 2 <= 1.7)
        Predict: 0.0
       Else (feature 2 > 1.7)
        Predict: 2.0
     Else (feature 3 > 4.9)
      If (feature 3 <= 5.1)
       If (feature 0 <= 6.5)
        If (feature 0 <= 5.9)
         Predict: 0.0
        Else (feature 0 > 5.9)
         Predict: 0.0
       Else (feature 0 > 6.5)
        Predict: 2.0
      Else (feature 3 > 5.1)
       Predict: 0.0
  Tree 4 (weight 1.0):
    If (feature 2 <= 0.5)
     Predict: 1.0
    Else (feature 2 > 0.5)
     If (feature 2 <= 1.5)
      If (feature 2 <= 1.4)
       Predict: 2.0
      Else (feature 2 > 1.4)
       If (feature 3 <= 4.9)
        Predict: 2.0
       Else (feature 3 > 4.9)
        Predict: 0.0
     Else (feature 2 > 1.5)
      If (feature 2 <= 1.8)
       If (feature 3 <= 5.0)
        If (feature 0 <= 4.9)
         Predict: 0.0
        Else (feature 0 > 4.9)
         Predict: 2.0
       Else (feature 3 > 5.0)
        Predict: 0.0
      Else (feature 2 > 1.8)
       Predict: 0.0
  Tree 5 (weight 1.0):
    If (feature 2 <= 0.5)
     Predict: 1.0
    Else (feature 2 > 0.5)
     If (feature 2 <= 1.6)
      If (feature 2 <= 1.3)
       Predict: 2.0
      Else (feature 2 > 1.3)
       If (feature 3 <= 4.9)
        Predict: 2.0
       Else (feature 3 > 4.9)
        Predict: 0.0
     Else (feature 2 > 1.6)
      If (feature 3 <= 4.8)
       If (feature 2 <= 1.7)
        Predict: 0.0
       Else (feature 2 > 1.7)
        Predict: 2.0
      Else (feature 3 > 4.8)
       Predict: 0.0
  Tree 6 (weight 1.0):
    If (feature 3 <= 1.9)
     Predict: 1.0
    Else (feature 3 > 1.9)
     If (feature 3 <= 4.9)
      If (feature 2 <= 1.6)
       Predict: 2.0
      Else (feature 2 > 1.6)
       If (feature 1 <= 2.8)
        Predict: 0.0
       Else (feature 1 > 2.8)
        Predict: 2.0
     Else (feature 3 > 4.9)
      If (feature 1 <= 2.7)
       If (feature 2 <= 1.6)
        If (feature 3 <= 5.0)
         Predict: 0.0
        Else (feature 3 > 5.0)
         Predict: 2.0
       Else (feature 2 > 1.6)
        Predict: 0.0
      Else (feature 1 > 2.7)
       Predict: 0.0
  Tree 7 (weight 1.0):
    If (feature 0 <= 5.4)
     If (feature 2 <= 0.5)
      Predict: 1.0
     Else (feature 2 > 0.5)
      Predict: 2.0
    Else (feature 0 > 5.4)
     If (feature 2 <= 1.7)
      If (feature 3 <= 1.5)
       Predict: 1.0
      Else (feature 3 > 1.5)
       If (feature 0 <= 6.9)
        If (feature 3 <= 5.0)
         Predict: 2.0
        Else (feature 3 > 5.0)
         Predict: 0.0
       Else (feature 0 > 6.9)
        Predict: 0.0
     Else (feature 2 > 1.7)
      If (feature 0 <= 5.9)
       If (feature 2 <= 1.8)
        Predict: 2.0
       Else (feature 2 > 1.8)
        Predict: 0.0
      Else (feature 0 > 5.9)
       Predict: 0.0
  Tree 8 (weight 1.0):
    If (feature 3 <= 1.7)
     Predict: 1.0
    Else (feature 3 > 1.7)
     If (feature 3 <= 5.1)
      If (feature 2 <= 1.6)
       If (feature 2 <= 1.4)
        Predict: 2.0
       Else (feature 2 > 1.4)
        If (feature 1 <= 2.2)
         Predict: 0.0
        Else (feature 1 > 2.2)
         Predict: 2.0
      Else (feature 2 > 1.6)
       If (feature 1 <= 2.5)
        Predict: 0.0
       Else (feature 1 > 2.5)
        If (feature 3 <= 5.0)
         Predict: 2.0
        Else (feature 3 > 5.0)
         Predict: 0.0
     Else (feature 3 > 5.1)
      Predict: 0.0
  Tree 9 (weight 1.0):
    If (feature 2 <= 0.5)
     Predict: 1.0
    Else (feature 2 > 0.5)
     If (feature 0 <= 6.1)
      If (feature 3 <= 4.8)
       If (feature 0 <= 4.9)
        If (feature 2 <= 1.0)
         Predict: 2.0
        Else (feature 2 > 1.0)
         Predict: 0.0
       Else (feature 0 > 4.9)
        Predict: 2.0
      Else (feature 3 > 4.8)
       Predict: 0.0
     Else (feature 0 > 6.1)
      If (feature 3 <= 4.9)
       If (feature 1 <= 2.8)
        If (feature 0 <= 6.2)
         Predict: 0.0
        Else (feature 0 > 6.2)
         Predict: 2.0
       Else (feature 1 > 2.8)
        Predict: 2.0
      Else (feature 3 > 4.9)
       Predict: 0.0

下图是Spark存储的Piple模型的目录结构:

我们可以看到,它包含了元数据Pipeline的五个阶段的数据,这里的文件都是二进制的数据,只有Spark自己可以加载。

Tensorflow

最后我们来看一下Tensorflow。Tensorflow提供了tf.train.Saver来导出他的模型到元图(MetaGraph)。

from __future__ import print_function

import tensorflow as tf
from tensorflow.contrib.tensor_forest.python import tensor_forest
from tensorflow.python.ops import resources

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

import numpy as np

# Ignore all GPUs, tf random forest does not benefit from it.
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""

data = load_iris()
dX, dy = data["data"], data["target"]
X_train, X_test, y_train, y_test = train_test_split(
    dX, dy, test_size=0.33, random_state=42)

# Parameters
num_steps = 500  # Total steps to train
batch_size = 10  # The number of samples per batch
num_classes = 3  # The 10 digits
num_features = 4  # Each image is 28x28 pixels
num_trees = 10
max_nodes = 100

# Input and Target data
X = tf.placeholder(tf.float32, shape=[None, num_features])
# For random forest, labels must be integers (the class id)
Y = tf.placeholder(tf.int32, shape=[None])

# Random Forest Parameters
hparams = tensor_forest.ForestHParams(num_classes=num_classes,
                                      num_features=num_features,
                                      num_trees=num_trees,
                                      max_nodes=max_nodes).fill()

# Build the Random Forest
forest_graph = tensor_forest.RandomForestGraphs(hparams)
# Get training graph and loss
train_op = forest_graph.training_graph(X, Y)
loss_op = forest_graph.training_loss(X, Y)

# Measure the accuracy
infer_op, _, _ = forest_graph.inference_graph(X)
correct_prediction = tf.equal(tf.argmax(infer_op, 1), tf.cast(Y, tf.int64))
accuracy_op = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

# Initialize the variables (i.e. assign their default value) and forest resources
init_vars = tf.group(tf.global_variables_initializer(),
                     resources.initialize_resources(resources.shared_resources()))


def next_batch(size):
    index = range(len(X_train))
    index_batch = np.random.choice(index, size)
    return X_train[index_batch], y_train[index_batch]


# Start TensorFlow session
sess = tf.Session()

# Run the initializer
sess.run(init_vars)

saver = tf.train.Saver()

# Training
for i in range(1, num_steps + 1):
    # Prepare Data
    # Get the next batch of MNIST data (only images are needed, not labels)
    batch_x, batch_y = next_batch(batch_size)
    _, l = sess.run([train_op, loss_op], feed_dict={X: batch_x, Y: batch_y})
    if i % 50 == 0 or i == 1:
        acc = sess.run(accuracy_op, feed_dict={X: batch_x, Y: batch_y})
        print('Step %i, Loss: %f, Acc: %f' % (i, l, acc))
# Test Model
print("Test Accuracy:", sess.run(
    accuracy_op, feed_dict={X: X_test, Y: y_test}))

# Print the tensors related to this model
print(accuracy_op)
print(infer_op)
print(X)
print(Y)

# save the model to a check point file
save_path = saver.save(sess, "/tmp/model.ckpt")

导出的模型会包含以下文件:

其中checkpoint是元数据,包含其它文件的路径信息。还包含了一个Pickle文件和其它几个checkpiont文件。可以看出,Tensorflow也利用了Python的Pickle机制来存储模型,并在这之外加入了额外的元数据。

模型加载的代码如下:

from __future__ import print_function

import tensorflow as tf

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# note: this has to be imported in case to support forest graph
from tensorflow.contrib.tensor_forest.python import tensor_forest

import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""

saver = tf.train.import_meta_graph('/tmp/model.ckpt.meta')

data = load_iris()

dX, dy = data["data"], data["target"]

graph = tf.get_default_graph()
with tf.Session() as sess:
    new_saver = tf.train.import_meta_graph('/tmp/model.ckpt.meta')
    new_saver.restore(sess, '/tmp/model.ckpt')
    #input = graph.get_operation_by_name("train")
    # print(graph.as_graph_def())
    load_infer_op = graph.get_tensor_by_name('probabilities:0')
    accuracy_op = graph.get_tensor_by_name('Mean_1:0')
    X = graph.get_tensor_by_name('Placeholder:0')
    Y = graph.get_tensor_by_name('Placeholder_1:0')
    print("Test Accuracy:", sess.run(accuracy_op, feed_dict={X: dX, Y: dy}))
    result = sess.run(load_infer_op, feed_dict={X: dX})
    prediction_result = [i.argmax() for i in result]
    print(classification_report(dy, prediction_result,
                                target_names=data["target_names"]))

这里要注意的是,RandomForest不是tensforflow的核心包,所以在模型加载的时候必须tensorflow.contrib.tensor_forest.python.tensor_forest, 否则模型是无法成功加载的。因为不加载的话tensor_forest中定义的一些属性会缺失。

另外就是Tensorflow也可以存储计算图,调用tf.train.write_graph()方法可以把图定义存储下来。当然也可以在TesnsorBoard中展示该图。

tf.train.write_graph(sess.graph_def, '/tmp', 'train.pbtxt')
%cat /tmp/train.pbtxt

好了,我们看到,Sklearn,Spark和Tensorflow都提供了自己的模型持久化的方法,那么简单来说,只要使用一个web服务器例如Flask,加一些模型加载和管理的方法,然后暴露REST API就可以提供预测服务了,是不是很简单呢?

其实要在生产环境下提供服务,还需要面对很多其它的挑战,例如:

  • 在云上如何扩展和伸缩
  • 如何进行性能调优
  • 如何管理模型的版本
  • 安全性
  • 如何持续集成和持续部署
  • 如何支持AB测试

为了解决模型部署的挑战,不同的组织开发了一些开源的工具,例如:ClipperSeldonMFlowMLeapOracle GraphpipeMXnet model server等等,我们就选其中几个看个究竟。

Clipper

Clipper是由UC Berkeley RISE Lab 开发的, 在用户应用和机器学习模型之间的一个提供预测服务的系统,通过解耦合用户应用和机器学习系统的方式,简化部署流程。

它有以下功能:

  • 利用简单标准化的REST接口来简化机器学习系统的集成,支持主要的机器学习框架。
  • 使用开发模型相同的库和环境简化模型部署
  • 利用可适配的Batching,缓存等技术改善吞吐量
  • 通过智能选择和合并模型来改善预测的准确率

Clipper的架构如下图:

Clipper使用了容器和微服务技术来构架架构。使用Redis来管理配置,Prometheus来进行监控。Clipper支持使用Kubernetes或者本地的Docker来管理容器。

Clipper支持以下几种模型:

  • 纯Python函数
  • PyShark
  • PyTorch
  • Tensorflow
  • MXnet
  • 自定义

Clipper模型部署的基本过程如下,大家可以参考我的这个notebook

  1. 创建Clipper集群(使用K8s或者本地Docker)
  2. 创建一个应用
  3. 训练模型
  4. 调用Clipper提供的模型部署方法部署模型,这里不同的工具需要调用不同的部署方法。部署时,会把训练好的Estimator利用CloudPickle之久化,本地构建一个容器镜像,部署到Docker或者K8s。
  5. 把模型和应用关联到一起,相当于发布模型。然后就可以调用对应的REST API来做预测了。

我试着把之前的三种工具的RomdomForest的例子用Clipper发布到我的Kubernetes集群,踩到了以下的坑坑:

  • 我本地的Cloudpickle的版本太新,导致模型不能反序列化,参考这个Issue
  • Tensorflow在Pickle的时候失败,应该是调用了C的code
  • 我的K8s运行在AWS上,我在K8S上使用内部IP失败,clipper连接一直在使用外部的域名,导致无法部署PySpark的模型。

总之,除了Sklearn成功部署之外,Tensorflow和Spark都失败了。大家可以参考这里的例子

Seldon

Seldon是一家创办于伦敦的公司,致力于提供对于基于开源软件的机器学习系统的控制。Seldon Core是该公司开源的提供在Kubernetes上部署机器学习模型的工具。它拥有以下功能:

  • Python/Spark/H2O/R 的模型支持
  • REST API和gRPC接口
  • 部署基于Model/Routers/Combiner/Transformers的图的微服务
  • 利用K8S来提供扩展,安全性,监控等等DevOps的功能

Seldon的使用过程如上图,

  1. 首先在K8s上安装Seldon Core,Seldon利用ksonnet,以CRD的形式安装seldon core
  2. 利用S2i(s2i是openshift开源的一款工具,用于把代码构建成容器镜像),构建运行时模型容器,并注册到容器注册表
  3. 编写你的运行图,并提交到K8s来部署你的模型

Seldon支持基于四种基本单元,Model,Transformer, Router, Combiner来构建你的运行图,并按照该图在K8s创建对应的资源和实例,来获得AB测试,模型ensemble的功能。

例如下图的几个例子:

AB 测试

模型ensemble

复杂图

图模式是Seldon最大的亮点,可以训练不同的模型,然后利用图来组合出不同的运行时,非常方便。更多的例子参考这里

笔者尝试在K8S上利用Seldon部署之前提到的三种工具生成的模型,都获得了成功(代码在这里)。这里分享一下遇到的几个问题:

  • Seldon支持Java的Python,然而用运行PySpark,这两个都需要,所以我不得不自己构建了一个镜像,手工在Python镜像上安装Java
  • 因为使用CDR的原因,我没有找到有效改变容器的liveness和readiness的设置,因为Spark初始化模型在Hadoop上,加载模型需要时间,总是readiness超时导致容器无法正常启动,K8s不断的重启容器。所以我只好修改代码,让模型加载变成Lazy Load,但是这样第一次REST Call会比较耗时,但是容器和服务总算是能够正常启动。

在我之前的一篇介绍Kubeflow的文章中,大家可以了解到,Kubeflow就是使用Seldon来管理模型部署的。

MLflow

MLflow是Databricks开发的开源系统,用于管理机器学习的端到端的生命周期。我之前写过一篇介绍该工具的文章

MLflow提供跟踪,项目管理和模型管理的功能。使用MLFlow来提供一个基于Sklearn的模型服务非常简单,

from __future__ import print_function

from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

import mlflow
import mlflow.sklearn

if __name__ == "__main__":
    data = load_iris()

    X, y = data["data"], data["target"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.33, random_state=42)

    clf = RandomForestClassifier(max_depth=2, random_state=0)
    clf.fit(X_train, y_train)

    print(clf.feature_importances_)

    print(classification_report(y_test, clf.predict(
        X_test), target_names=data["target_names"]))

    mlflow.sklearn.log_model(clf, "model")
    print("Model saved in run %s" % mlflow.active_run().info.run_uuid)

调用mlflow.sklearn.log_model(), MLflow创建以下的目录来管理模型:

我们看到在artifacts目录下有Python的pickle文件和另一个元数据文件,MLModel。

artifact_path: model
flavors:
  python_function:
    data: model.pkl
    loader_module: mlflow.sklearn
    python_version: 2.7.10
  sklearn:
    pickled_model: model.pkl
    sklearn_version: 0.20.0
run_id: 44ae85c084904b4ea5bad5aa42c9ce05
utc_time_created: '2018-10-02 23:38:49.786871'

使用 mlflow sklearn serve -m model 就可以很方便的提供基于sklearn的模型服务了。

虽然MLFlow也号称支持Spark和Tensorflow,但是他们都是基于Python来做,我尝试使用,但是文档和例子比较少,所以没能成功。但原理上都是使用Pickle➕元数据的方式。大家有兴趣的可以尝试一下。

关于部署功能,MLFlow的一个亮点是和SagemakerAzureML的支持。

MLeap

MLeap的目标是提供一个在Spark和Sklearn之间可移植的模型格式,和运行引擎。它包含:

  • 基于JSON的序列化
  • 运行引擎
  • Benchmark

MLeap的架构如下图:

这是一个使用MLeap导出Sklearn模型的例子:

# Initialize MLeap libraries before Scikit/Pandas
import mleap.sklearn.preprocessing.data
import mleap.sklearn.pipeline
from mleap.sklearn.ensemble import forest
from mleap.sklearn.preprocessing.data import FeatureExtractor

# Import Scikit Transformer(s)
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier

data = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
input_features = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

output_vector_name = 'extracted_features' # Used only for serialization purposes
output_features = [x for x in input_features]

feature_extractor_tf = FeatureExtractor(input_scalars=input_features,
                                        output_vector=output_vector_name,
                                        output_vector_items=output_features)

classification_tf = RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
                       max_depth=2, max_features='auto', max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, n_estimators=10, n_jobs=1,
                       oob_score=False, random_state=0, verbose=0, warm_start=False)

classification_tf.mlinit(input_features="features", prediction_column='species',feature_names="features")

rf_pipeline = Pipeline([(feature_extractor_tf.name, feature_extractor_tf),
                        (classification_tf.name, classification_tf)])
rf_pipeline.mlinit()
rf_pipeline.fit(data[input_features],data['species'])

rf_pipeline.serialize_to_bundle('./', 'mleap-scikit-rf-pipeline', init=True)

导出的模型结构如下图所示:

这个是randonforest的模型json

{
   "attributes": {
      "num_features": {
         "long": 4
      }, 
      "trees": {
         "type": "list", 
         "string": [
            "tree0", 
            "tree1", 
            "tree2", 
            "tree3", 
            "tree4", 
            "tree5", 
            "tree6", 
            "tree7", 
            "tree8", 
            "tree9"
         ]
      }, 
      "tree_weights": {
         "double": [
            1.0, 
            1.0, 
            1.0, 
            1.0, 
            1.0, 
            1.0, 
            1.0, 
            1.0, 
            1.0, 
            1.0
         ], 
         "type": "list"
      }
   }, 
   "op": "random_forest_classifier"
}

我们可以看出MLeap把模型完全序列化成与代码无关的JSON文件,这样就可以在不同的运行时工具Spark/Sklearn之间做到可移植。

MLeap对模型提供服务,不需要依赖任何Sklearn或者Spark的代码。只要启动MLeap的Server,然后提交模型就好了。

docker run -p 65327:65327 -v /tmp/models:/models combustml/mleap-serving:0.9.0-SNAPSHOT
curl -XPUT -H "content-type: application/json" \
        -d '{"path":"/models/yourmodel.zip"}' \
        http://localhost:65327/model

下面的代码用Scala在Spark 上训练一个同样的Randonforest分类模型,并利用MLeap持久化模型。

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.types.{IntegerType, DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import ml.combust.mleap.spark.SparkSupport._
import resource._

import org.apache.spark.SparkFiles

spark.sparkContext.addFile("https://s3-us-west-2.amazonaws.com/mlapi-samples/demo/data/input/iris.csv")
val data = spark.read.format("csv").option("header", "true").load(SparkFiles.get("iris.csv"))

//data.show()
//data.printSchema()

// Transform, convert string coloumn to number
// this transform is not part of the pipeline
val featureDf = data.select(data("sepal_length").cast(DoubleType).as("sepal_length"),
                            data("sepal_width").cast(DoubleType).as("sepal_width"),
                            data("petal_width").cast(DoubleType).as("petal_width"),
                            data("petal_length").cast(DoubleType).as("petal_length"),
                            data("species") )

// assember the features
val assembler = new VectorAssembler()
  .setInputCols(Array("sepal_length", "sepal_width", "petal_width", "petal_length"))
  .setOutputCol("features")
  
val output = assembler.transform(featureDf)

// create lable and features
val labelIndexer = new StringIndexer()
  .setInputCol("species")
  .setOutputCol("indexedLabel")
  .fit(output)

val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(output)
  
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = featureDf.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(assembler, labelIndexer, featureIndexer, rf, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "species", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

val rfModel = model.stages(3).asInstanceOf[RandomForestClassificationModel]
println("Learned classification forest model:\n" + rfModel.toDebugString)

val pipelineModel = SparkUtil.createPipelineModel(Array(model))

for(bundle <- managed(BundleFile("file:/tmp/mleap-examples/rf"))) {
  pipelineModel.writeBundle.format(SerializationFormat.Json).save(bundle)
}

导出的模型和之前的Sklearn具有相同的格式。

MLeap的问题在于要支持所有的算法,对于每一个算法都要实现对应的序列化,这也使得它的需要很多的开发来支持客户自定义的算法。对于常用算法的支持,大家可以参考这里

其它

除了以上几个,还有一些我们没有涉及,有兴趣的读者可以自行搜索。

总结

Seldon Core和K8S结合的很好,它提供的运行图的方式非常强大,它也是我实验中唯一一个能够成功部署Sklearn,Spark和Tensorflow三种模型的工具,非常推荐!

Clipper提供基于K8s和Docker的模型部署,它的模型版本管理做得不错,但是代码不太稳定,小问题不少,基于CloudPickle也有不少的限制,只能支持Python也是个问题。推荐给数据科学家有比较多的本地交互的情况。

MLFlow能够提供很方便的基于Python的模型服务,但是缺乏和容器的结合。但是它能够支持和Sagemaker,AzureML等云的支持。推荐给已经在使用这些云的玩家。

MLeap的特色是支持模型的可交互性,也就是说我可以把sklearn训练的模型导出在Spark上运行,这的功能很有吸引力,但是要支持全部的算法,它还有很长的路要走。关于机器学习模型标准化的问题,大家也可以关注PMML。现阶段各个工具对PMML的支持比较有限,随着深度学习的广泛应用,PMML何去何从还未可知。

下表是对以上几个工具的简单总结,供大家参考

Model Persistent

ML Tools

Kubernetest Integration

Version

License

Implementation

Seldon Core

S2i + Pickle

Tensorflow, SKlearn, Keras, R, H2O, Nodejs, PMML

Yes

0.3.2

Apache

Docker + K8s CRD

Clipper

Pickle

Python, PySpark, PyTorch, Tensorflow, MXnet, Customer Container

Yes

0.3.0

Apache

CPP / Python

MLFlow

Directory + Metadata

Python, H2O, Kera, MLeap, PyTorch, Sklearn, Spark, Tensorflow, R

No

Alpha

Apache

Python

MLeap

 JSON

Spark,Sklearn, Tensorflow

No

0.12.0

Apache

Scala/Java

参考


Original url: Access

Created at: 2018-10-09 09:34:15

Category: default

Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论