MLflow 模型
MLflow 模型是一种标准格式,用于打包机器学习模型,这些模型可用于各种下游工具——例如,通过 REST API 进行实时服务或在 Apache Spark 上进行批处理推理。该格式定义了一种约定,允许您以不同“风格”(flavors)保存模型,这些风格可以被不同的下游工具理解。
存储格式
每个 MLflow 模型是一个目录,其中包含任意文件,以及目录根目录下的一个 MLmodel 文件,该文件可以定义模型可以拥有的多种风格。
MLflow 模型的模型方面可以是序列化对象(例如,pickled 的 scikit-learn 模型)或 Python 脚本(如果在 Databricks 中运行,则为 notebook),其中包含使用 mlflow.models.set_model() API 定义的模型实例。
风格是使 MLflow 模型强大的关键概念:它们是部署工具可以用来理解模型的一种约定,这使得编写可以处理来自任何 ML 库的模型的工具成为可能,而无需将每种工具与每个库集成。MLflow 定义了几种“标准”风格,其内置部署工具都支持这些风格,例如描述如何将模型作为 Python 函数运行的“Python 函数”风格。然而,库也可以定义和使用其他风格。例如,MLflow 的 mlflow.sklearn 库允许将模型加载回 scikit-learn Pipeline 对象以在了解 scikit-learn 的代码中使用,或者加载为仅需要应用模型的通用 Python 函数(例如,使用选项 -t sagemaker 将模型部署到 Amazon SageMaker 的 mlflow deployments 工具)。
MLmodel 文件
特定模型支持的所有风格都定义在其 MLmodel 文件中,采用 YAML 格式。例如,在 MLflow 仓库内运行 python examples/sklearn_logistic_regression/train.py 将在 model 目录下创建以下文件
# Directory written by mlflow.sklearn.save_model(model, "model", input_example=...)
model/
├── MLmodel
├── model.pkl
├── conda.yaml
├── python_env.yaml
├── requirements.txt
├── input_example.json (optional, only logged when input example is provided and valid during model logging)
├── serving_input_example.json (optional, only logged when input example is provided and valid during model logging)
└── environment_variables.txt (optional, only logged when environment variables are used during model inference)
其 MLmodel 文件描述了两种风格
time_created: 2018-05-25T17:28:53.35
flavors:
sklearn:
sklearn_version: 0.19.1
pickled_model: model.pkl
python_function:
loader_module: mlflow.sklearn
除了列出模型风格的flavors字段外,MLmodel YAML 格式还可以包含以下字段
time_created: 模型创建的日期和时间,采用 UTC ISO 8601 格式。run_id: 创建模型的运行 ID,如果模型是使用 跟踪保存的。signature: JSON 格式的模型签名。input_example: 对包含输入示例的工件的引用。databricks_runtime: 如果模型是在 Databricks notebook 或作业中训练的,则为 Databricks 运行时版本和类型。mlflow_version: 用于记录模型的 MLflow 版本。
附加记录的文件
为了重建环境,每当记录模型时,我们会自动记录 conda.yaml、python_env.yaml 和 requirements.txt 文件。然后可以使用 conda 或 virtualenv(使用 pip)来重新安装依赖项。有关这些文件的更多详细信息,请参阅MLflow 如何记录依赖项。
如果在记录模型时提供了模型输入示例,还会记录另外两个文件 input_example.json 和 serving_input_example.json。有关更多详细信息,请参阅模型输入示例。
记录模型时,模型元数据文件(MLmodel、conda.yaml、python_env.yaml、requirements.txt)将复制到名为 metadata 的子目录中。对于轮式模型,还会复制 original_requirements.txt 文件。
当下载在 MLflow 模型注册表中注册的模型时,会在下载者的模型目录中添加一个名为 registered_model_meta 的 YAML 文件。该文件包含 MLflow 模型注册表中引用的模型的名称和版本,将用于部署和其他目的。
如果在 Databricks 中记录模型,MLflow 还会在模型目录中创建一个 metadata 子目录。此子目录包含内部使用的元数据文件的轻量级副本。
环境变量文件
在记录模型时,MLflow 会将在模型推理期间使用的环境变量记录在 environment_variables.txt 文件中。
environment_variables.txt 文件仅包含模型推理期间使用的环境变量的名称,不存储值。
目前,MLflow 只记录名称包含以下任何关键字的环境变量
RECORD_ENV_VAR_ALLOWLIST = {
# api key related
"API_KEY", # e.g. OPENAI_API_KEY
"API_TOKEN",
# databricks auth related
"DATABRICKS_HOST",
"DATABRICKS_USERNAME",
"DATABRICKS_PASSWORD",
"DATABRICKS_TOKEN",
"DATABRICKS_INSECURE",
"DATABRICKS_CLIENT_ID",
"DATABRICKS_CLIENT_SECRET",
"_DATABRICKS_WORKSPACE_HOST",
"_DATABRICKS_WORKSPACE_ID",
}
使用环境变量的 pyfunc 模型的示例
import mlflow
import os
os.environ["TEST_API_KEY"] = "test_api_key"
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
if os.environ.get("TEST_API_KEY"):
return model_input
raise Exception("API key not found")
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
name="model", python_model=MyModel(), input_example="data"
)
环境变量 TEST_API_KEY 会像下面这样记录在 environment_variables.txt 文件中
# This file records environment variable names that are used during model inference.
# They might need to be set when creating a serving endpoint from this model.
# Note: it is not guaranteed that all environment variables listed here are required
TEST_API_KEY
在将模型部署到服务终结点之前,请检查 environment_variables.txt 文件,以确保设置了模型推理所需的所有必要环境变量。请注意,文件中列出的并非所有环境变量始终是模型推理所必需的。有关在 Databricks 服务终结点上设置环境变量的详细说明,请参阅此指南。
要禁用此功能,请将环境变量 MLFLOW_RECORD_ENV_VARS_IN_MODEL_LOGGING 设置为 false。
管理模型依赖项
MLflow 模型会推断模型风格所需的依赖项并自动记录它们。但是,它也允许您定义额外的依赖项或自定义 Python 代码,并提供一个工具在沙盒环境中验证它们。有关 MLflow 模型中依赖项管理的更多详细信息,请参阅管理 MLflow 模型中的依赖项。
模型签名和输入示例
在 MLflow 中,理解模型签名和输入示例的复杂性对于有效的模型管理和部署至关重要。
- 模型签名:定义模型输入、输出和附加推理参数的模式,促进模型交互的标准接口。
- 模型输入示例:提供有效模型输入的具体实例,有助于理解和测试模型要求。此外,如果在记录模型时提供了输入示例,如果没有明确提供,将自动推断并存储模型签名。
- 模型服务有效负载示例:提供用于查询已部署模型终结点的 json 有效负载示例。如果在记录模型时提供了输入示例,将根据输入示例自动生成服务有效负载示例并保存为
serving_input_example.json。
我们的文档深入探讨了几个关键领域
- 支持的签名类型:我们介绍所支持的不同数据类型,例如传统机器学习模型的表格数据和深度学习模型的张量。
- 签名强制执行:讨论 MLflow 如何强制执行模式合规性,确保提供的输入与模型的期望相匹配。
- 记录带签名的模型:指导如何在记录模型时纳入签名,以增强模型操作的清晰度和可靠性。
有关这些概念的详细探索,包括示例和最佳实践,请访问模型签名和示例指南。如果您想在实践中看到签名强制执行,请参阅模型签名笔记本教程以了解更多信息。
模型 API
您可以通过多种方式保存和加载 MLflow 模型。首先,MLflow 包含与一些常用库的集成。例如,mlflow.sklearn 包含用于 scikit-learn 模型的 save_model、log_model 和 load_model 函数。其次,您可以使用 mlflow.models.Model 类来创建和写入模型。此类有四个关键函数
- add_flavor 用于向模型添加风格。每种风格都有一个字符串名称和一个键值属性字典,其中值可以是任何可以序列化为 YAML 的对象。
- save 用于将模型保存到本地目录。
- log 用于使用 MLflow Tracking 将模型记录为当前运行中的工件。
- load 用于从本地目录或先前运行中的工件加载模型。
来自代码的模型
要了解有关“来自代码的模型”功能的更多信息,请访问深入指南以获取更深入的解释并查看其他示例。
“来自代码的模型”功能在 MLflow 2.12.2 及更高版本中可用。此功能是实验性的,未来版本可能会有所更改。
“来自代码的模型”功能允许您直接从独立 Python 脚本定义和记录模型。当您想要记录可以有效地作为代码表示(不需要通过训练优化的权重)存储的模型或依赖于外部服务的应用程序(例如 LangChain chains)时,此功能特别有用。另一个好处是,这种方法完全避免了在 Python 中使用 pickle 或 cloudpickle 模块,这在加载不受信任的模型时可能带有安全风险。
此功能仅支持 LangChain、LlamaIndex 和 PythonModel 模型。
为了从代码记录模型,您可以利用 mlflow.models.set_model() API。此 API 允许您通过在定义模型的文件中直接指定模型类的实例来定义模型。记录此类模型时,指定的是文件路径(而不是对象),该路径指向包含模型类定义和应用于自定义模型实例的 set_model API 用法的 Python 文件。
下图比较了标准模型记录过程和适用于可使用“来自代码的模型”功能保存的模型的“来自代码的模型”功能

例如,在一个名为 my_model.py 的独立文件中定义一个模型
import mlflow
from mlflow.models import set_model
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input):
return model_input
# Define the custom PythonModel instance that will be used for inference
set_model(MyModel())
“来自代码的模型”功能不支持捕获来自外部文件引用的导入语句。如果您的依赖项没有通过 pip 安装捕获,则需要通过使用 code_paths 功能通过适当的绝对路径导入引用来包含和解析依赖项。为简单起见,建议将模型定义所需的所有本地依赖项封装在同一 Python 脚本文件中,因为 code_paths 依赖路径解析存在限制。
当从代码定义模型并使用 mlflow.models.set_model() API 时,将在内部执行脚本中定义的代码以确保它是有效代码。如果您的脚本中包含对外部服务的连接(例如,您正在 LangChain 中连接到 GenAI 服务),请注意,在记录模型时会产生对该服务的连接请求。
然后,从不同 Python 脚本中的文件路径记录模型
import mlflow
model_path = "my_model.py"
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
python_model=model_path, # Define the model as the path to the Python file
name="my_model",
)
# Loading the model behaves exactly as if an instance of MyModel had been logged
my_model = mlflow.pyfunc.load_model(model_info.model_uri)
mlflow.models.set_model() API不是线程安全的。如果您从多个线程并发记录模型,请勿尝试使用此功能。此流畅的 API 使用全局活动模型状态,该状态没有任何一致性保证。如果您有兴趣使用线程安全的记录 API,请使用 mlflow.client.MlflowClient API 来记录模型。
内置模型风格
MLflow 提供了一些在您的应用程序中可能很有用的标准风格。特别是,它的许多部署工具都支持这些风格,因此您可以以其中一种风格导出自己的模型,以便从所有这些工具中受益
- Python 函数 (
python_function) - R 函数 (
crate) - H2O (
h2o) - Keras (
keras) - PyTorch (
pytorch) - Scikit-learn (
sklearn) - Spark MLlib (
spark) - TensorFlow (
tensorflow) - ONNX (
onnx) - XGBoost (
xgboost) - LightGBM (
lightgbm) - CatBoost (
catboost) - Spacy(
spaCy) - Statsmodels (
statsmodels) - Prophet (
prophet) - Pmdarima (
pmdarima) - John Snow Labs (
johnsnowlabs) - Transformers (
transformers) - SentenceTransformers (
sentence_transformers)
Python 函数 (python_function)
python_function 模型风格是 MLflow Python 模型的默认模型接口。任何 MLflow Python 模型都应可加载为 python_function 模型。这使得其他 MLflow 工具可以处理任何 Python 模型,而不管使用哪个持久化模块或框架生成了该模型。这种互操作性非常强大,因为它允许任何 Python 模型在各种环境中投入生产。
此外,python_function 模型风格定义了 Python 模型的通用文件系统 模型格式,并提供了将模型保存到此格式和从中加载模型的实用程序。该格式是自包含的,因为它包括加载和使用模型所需的所有信息。依赖项要么直接与模型一起存储,要么通过 conda 环境引用。这种模型格式允许其他工具将其模型与 MLflow 集成。
如何将模型保存为 Python 函数
大多数 python_function 模型是作为其他模型风格的一部分保存的——例如,所有 mlflow 内置风格都在导出的模型中包含 python_function 风格。此外,mlflow.pyfunc 模块定义了显式创建 python_function 模型的函数。此模块还包括创建自定义 Python 模型的实用程序,这是向 ML 模型添加自定义 Python 代码的一种便捷方式。有关更多信息,请参阅自定义 Python 模型文档。
有关如何从 Python 脚本(来自代码的模型功能)存储自定义模型的信息,请参阅来自代码的模型指南以了解推荐的方法。
如何加载和评分 Python 函数模型
加载模型
您可以通过使用 mlflow.pyfunc.load_model() 函数在 Python 中加载 python_function 模型。重要的是要注意,load_model 假设所有依赖项都可用,并且不会执行任何依赖项检查或安装。有关处理依赖项的部署选项,请参阅模型部署部分。
评分模型
加载模型后,可以通过两种主要方式对其进行评分
-
同步评分 标准的评分方法是使用
predict方法,该方法支持各种输入类型并根据输入数据返回标量或集合。该方法签名是textpredict(data: Union[pandas.Series, pandas.DataFrame, numpy.ndarray, csc_matrix, csr_matrix, List[Any], Dict[str, Any], str],
params: Optional[Dict[str, Any]] = None) → Union[pandas.Series, pandas.DataFrame, numpy.ndarray, list, str] -
同步流式评分
注意predict_stream是 MLflow 在 2.12.2 版本中添加的新接口。以前版本的 MLflow 不支持此接口。为了在自定义 Python 函数模型中使用predict_stream,您必须在模型类中实现predict_stream方法并返回一个生成器类型。对于支持流数据处理的模型,可以使用
predict_stream方法。此方法返回一个generator,它会产生响应流,从而可以高效地处理大型数据集或连续数据流。请注意,predict_stream方法并非对所有模型类型都可用。使用方法涉及迭代生成器以消耗响应textpredict_stream(data: Any, params: Optional[Dict[str, Any]] = None) → GeneratorType
演示 predict_stream()
下面是一个示例,演示如何使用 predict_stream() 方法定义、保存、加载和使用可流式传输的模型
import mlflow
import os
# Define a custom model that supports streaming
class StreamableModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
# Regular predict method implementation (optional for this demo)
return "regular-predict-output"
def predict_stream(self, context, model_input, params=None):
# Yielding elements one at a time
for element in ["a", "b", "c", "d", "e"]:
yield element
# Save the model to a directory
tmp_path = "/tmp/test_model"
pyfunc_model_path = os.path.join(tmp_path, "pyfunc_model")
python_model = StreamableModel()
mlflow.pyfunc.save_model(path=pyfunc_model_path, python_model=python_model)
# Load the model
loaded_pyfunc_model = mlflow.pyfunc.load_model(model_uri=pyfunc_model_path)
# Use predict_stream to get a generator
stream_output = loaded_pyfunc_model.predict_stream("single-input")
# Consuming the generator using next
print(next(stream_output)) # Output: 'a'
print(next(stream_output)) # Output: 'b'
# Alternatively, consuming the generator using a for-loop
for response in stream_output:
print(response) # This will print 'c', 'd', 'e'
Python 函数模型接口
所有 PyFunc 模型都将支持 pandas.DataFrame 作为输入。除了 pandas.DataFrame 之外,DL PyFunc 模型还将支持 numpy.ndarrays 形式的张量输入。要验证模型风格是否支持张量输入,请查看该风格的文档。
对于具有基于列的模式的模型,输入通常以 pandas.DataFrame 的形式提供。如果为具有命名列的模式提供映射列名到值的字典作为输入,或者为具有未命名列的模式提供 Python List 或 numpy.ndarray 作为输入,MLflow 将输入转换为 DataFrame。针对 DataFrame 执行有关预期数据类型的模式强制和类型转换。
对于具有基于张量的模式的模型,输入通常以 numpy.ndarray 或映射张量名称到其 np.ndarray 值的字典的形式提供。模式强制将检查提供的输入的形状和类型与模型模式中指定的形状和类型是否匹配,如果不匹配则引发错误。
对于未定义模式的模型,不对模型输入和输出进行任何更改。如果模型不接受提供的输入类型,MLflow 将传播模型引发的任何错误。
用于预测或推理的 PyFunc 模型加载到的 Python 环境可能与训练模型时的环境不同。在环境不匹配的情况下,调用 mlflow.pyfunc.load_model() 时将打印警告消息。此警告语句将识别在训练期间使用的包与当前环境中使用的包版本存在版本不匹配的包。为了获得模型训练所用环境的完整依赖项,您可以调用 mlflow.pyfunc.get_model_dependencies()。此外,如果您想在模型训练时使用的相同环境中运行模型推理,可以通过将 env_manager 参数设置为“conda”来调用 mlflow.pyfunc.spark_udf()。这将根据 conda.yaml 文件生成环境,确保 Python UDF 以训练期间使用的确切包版本执行。
某些 PyFunc 模型可能接受模型加载配置,该配置控制模型的加载方式和预测的计算方式。您可以通过检查模型的风格元数据来了解模型支持哪些配置
model_info = mlflow.models.get_model_info(model_uri)
model_info.flavors[mlflow.pyfunc.FLAVOR_NAME][mlflow.pyfunc.MODEL_CONFIG]
或者,您可以加载 PyFunc 模型并检查 model_config 属性
pyfunc_model = mlflow.pyfunc.load_model(model_uri)
pyfunc_model.model_config
在加载时间,可以通过在 mlflow.pyfunc.load_model() 方法中指定 model_config 参数来更改模型配置
pyfunc_model = mlflow.pyfunc.load_model(model_uri, model_config=dict(temperature=0.93))
更改模型配置值时,这些值会覆盖模型保存时的配置。指示无效的模型配置键会导致该配置被忽略。将显示一条警告消息,提及被忽略的条目。
模型配置与签名中带有默认值的参数:当您需要为模型发布者提供一种更改模型如何加载到内存以及如何为所有样本计算预测的方式时(例如,一个键如 user_gpu),请使用模型配置。模型消费者无法在预测时间更改这些值。使用签名中带有默认值的参数,为用户提供更改如何对每个数据样本计算预测的能力。
R 函数 (crate)
crate 模型风格定义了一种通用的模型格式,用于使用 carrier 包中的 crate 函数将任意 R 预测函数表示为 MLflow 模型。预计预测函数接受一个数据框作为输入,并输出一个数据框、一个向量或一个包含预测的列表。
要使用此风格,需要安装 R。
crate 用法
对于最小的 crate 模型,预测函数的示例配置是
library(mlflow)
library(carrier)
# Load iris dataset
data("iris")
# Learn simple linear regression model
model <- lm(Sepal.Width~Sepal.Length, data = iris)
# Define a crate model
# call package functions with an explicit :: namespace.
crate_model <- crate(
function(new_obs) stats::predict(model, data.frame("Sepal.Length" = new_obs)),
model = model
)
# log the model
model_path <- mlflow_log_model(model = crate_model, artifact_path = "iris_prediction")
# load the logged model and make a prediction
model_uri <- paste0(mlflow_get_run()$artifact_uri, "/iris_prediction")
mlflow_model <- mlflow_load_model(model_uri = model_uri,
flavor = NULL,
client = mlflow_client())
prediction <- mlflow_predict(model = mlflow_model, data = 5)
print(prediction)
H2O (h2o)
h2o 模型风格支持记录和加载 H2O 模型。
Python 中的 mlflow.h2o 模块定义了 save_model() 和 log_model() 方法,R 中的 mlflow_save_model 和 mlflow_log_model 用于以 MLflow 模型格式保存 H2O 模型。这些方法生成带有 python_function 风格的 MLflow 模型,允许您通过 mlflow.pyfunc.load_model() 将它们加载为用于推理的通用 Python 函数。加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。当您使用 mlflow.pyfunc.load_model() 加载带有 h2o 风格的 MLflow 模型时,会调用 h2o.init() 方法。因此,加载器的环境中必须安装正确版本的 h2o(-py)。您可以通过修改持久化 H2O 模型的 YAML 配置文件 model.h2o/h2o.yaml 中的 init 条目来定制传递给 h2o.init() 的参数。
最后,您可以使用 mlflow.h2o.load_model() 方法将带有 h2o 风格的 MLflow 模型加载为 H2O 模型对象。
有关更多信息,请参阅 mlflow.h2o。
h2o pyfunc 用法
对于一个最小的 h2o 模型,下面是一个在分类场景中,pyfunc predict() 方法的示例
import mlflow
import h2o
h2o.init()
from h2o.estimators.glm import H2OGeneralizedLinearEstimator
# import the prostate data
df = h2o.import_file(
"http://s3.amazonaws.com/h2o-public-test-data/smalldata/prostate/prostate.csv.zip"
)
# convert the columns to factors
df["CAPSULE"] = df["CAPSULE"].asfactor()
df["RACE"] = df["RACE"].asfactor()
df["DCAPS"] = df["DCAPS"].asfactor()
df["DPROS"] = df["DPROS"].asfactor()
# split the data
train, test, valid = df.split_frame(ratios=[0.7, 0.15])
# generate a GLM model
glm_classifier = H2OGeneralizedLinearEstimator(
family="binomial", lambda_=0, alpha=0.5, nfolds=5, compute_p_values=True
)
with mlflow.start_run():
glm_classifier.train(
y="CAPSULE", x=["AGE", "RACE", "VOL", "GLEASON"], training_frame=train
)
metrics = glm_classifier.model_performance()
metrics_to_track = ["MSE", "RMSE", "r2", "logloss"]
metrics_to_log = {
key: value
for key, value in metrics._metric_json.items()
if key in metrics_to_track
}
params = glm_classifier.params
mlflow.log_params(params)
mlflow.log_metrics(metrics_to_log)
model_info = mlflow.h2o.log_model(glm_classifier, name="h2o_model_info")
# load h2o model and make a prediction
h2o_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
test_df = test.as_data_frame()
predictions = h2o_pyfunc.predict(test_df)
print(predictions)
# it is also possible to load the model and predict using h2o methods on the h2o frame
# h2o_model = mlflow.h2o.load_model(model_info.model_uri)
# predictions = h2o_model.predict(test)
Keras (keras)
使用 keras 风格的完整指南可在此处查看。
PyTorch (pytorch)
使用 pytorch 风格的完整指南可在此处查看。
有关更多信息,请参阅 mlflow.pytorch。
Scikit-learn (sklearn)
使用 sklearn 风格的完整指南可在此处查看。
有关 API 信息,请参阅 mlflow.sklearn。
Spark MLlib (spark)
使用 spark 风格的完整指南可在此处查看。
有关更多信息,请参阅 mlflow.spark。
TensorFlow (tensorflow)
tensorflow 集成的完整指南可在此处查看。
ONNX (onnx)
onnx 模型风格通过 mlflow.onnx.save_model() 和 mlflow.onnx.log_model() 方法,支持将 ONNX 模型以 MLflow 格式记录。这些方法还会向它们生成的 MLflow 模型添加 python_function 风格,允许将模型解释为通用 Python 函数,以便通过 mlflow.pyfunc.load_model() 进行推理。加载的 PyFunc 模型可以使用 DataFrame 输入和 numpy 数组输入进行评分。MLflow ONNX 模型的 python_function 表示使用 ONNX Runtime 执行引擎进行评估。最后,您可以使用 mlflow.onnx.load_model() 方法以原生 ONNX 格式加载带有 onnx 风格的 MLflow 模型。
有关更多信息,请参阅 mlflow.onnx 和 https://onnx.com.cn/。
保存 ONNX 文件的默认行为是使用 ONNX 保存选项 save_as_external_data=True,以支持超过 2GB 的模型文件。对于小型模型的边缘部署,这可能会引起问题。如果您需要将小型模型保存为单个文件以供此类部署考虑,可以在 mlflow.onnx.save_model() 或 mlflow.onnx.log_model() 中设置参数 save_as_external_data=False 以强制将模型序列化为小文件。请注意,如果模型超过 2GB,保存为单个文件将不起作用。
ONNX pyfunc 用法示例
对于 ONNX 模型,一个示例配置是使用 pytorch 训练一个虚拟模型,将其转换为 ONNX,记录到 mlflow 并使用 pyfunc predict() 方法进行预测是
import numpy as np
import mlflow
from mlflow.models import infer_signature
import onnx
import torch
from torch import nn
# define a torch model
net = nn.Linear(6, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)
X = torch.randn(6)
y = torch.randn(1)
# run model training
epochs = 5
for epoch in range(epochs):
optimizer.zero_grad()
outputs = net(X)
loss = loss_function(outputs, y)
loss.backward()
optimizer.step()
# convert model to ONNX and load it
torch.onnx.export(net, X, "model.onnx")
onnx_model = onnx.load_model("model.onnx")
# log the model into a mlflow run
with mlflow.start_run():
signature = infer_signature(X.numpy(), net(X).detach().numpy())
model_info = mlflow.onnx.log_model(onnx_model, name="model", signature=signature)
# load the logged model and make a prediction
onnx_pyfunc = mlflow.pyfunc.load_model(model_info.model_uri)
predictions = onnx_pyfunc.predict(X.numpy())
print(predictions)
XGBoost (xgboost)
xgboost 集成的完整指南可在此处查看。
有关更多信息,请参阅 mlflow.xgboost。
LightGBM (lightgbm)
lightgbm 模型口味支持通过 LightGBM 模型的 MLflow 格式日志记录,记录方式为使用 mlflow.lightgbm.save_model() 和 mlflow.lightgbm.log_model() 方法。这些方法还会向其生成的 MLflow 模型添加 python_function 口味,允许通过 mlflow.pyfunc.load_model() 将模型解释为通用的 Python 函数进行推理。您也可以使用 mlflow.lightgbm.load_model() 方法以原生的 LightGBM 格式加载带有 lightgbm 模型口味的 MLflow 模型。
请注意,LightGBM 的 scikit-learn API 现已受支持。有关更多信息,请参阅 mlflow.lightgbm。
LightGBM pyfunc 用法
以下示例
- 从
scikit-learn加载 IRIS 数据集 - 训练一个 LightGBM
LGBMClassifier - 使用
mlflow记录模型和特征重要性 - 加载已记录的模型并进行预测
from lightgbm import LGBMClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.models import infer_signature
data = load_iris()
# Remove special characters from feature names to be able to use them as keys for mlflow metrics
feature_names = [
name.replace(" ", "_").replace("(", "").replace(")", "")
for name in data["feature_names"]
]
X_train, X_test, y_train, y_test = train_test_split(
data["data"], data["target"], test_size=0.2
)
# create model instance
lgb_classifier = LGBMClassifier(
n_estimators=10,
max_depth=3,
learning_rate=1,
objective="binary:logistic",
random_state=123,
)
# Fit and save model and LGBMClassifier feature importances as mlflow metrics
with mlflow.start_run():
lgb_classifier.fit(X_train, y_train)
feature_importances = dict(zip(feature_names, lgb_classifier.feature_importances_))
feature_importance_metrics = {
f"feature_importance_{feature_name}": imp_value
for feature_name, imp_value in feature_importances.items()
}
mlflow.log_metrics(feature_importance_metrics)
signature = infer_signature(X_train, lgb_classifier.predict(X_train))
model_info = mlflow.lightgbm.log_model(
lgb_classifier, name="iris-classifier", signature=signature
)
# Load saved model and make predictions
lgb_classifier_saved = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred = lgb_classifier_saved.predict(X_test)
print(y_pred)
CatBoost (catboost)
catboost 模型口味支持通过 CatBoost 模型的 MLflow 格式日志记录,记录方式为使用 mlflow.catboost.save_model() 和 mlflow.catboost.log_model() 方法。这些方法还会向其生成的 MLflow 模型添加 python_function 口味,允许通过 mlflow.pyfunc.load_model() 将模型解释为通用的 Python 函数进行推理。您也可以使用 mlflow.catboost.load_model() 方法以原生的 CatBoost 格式加载带有 catboost 模型口味的 MLflow 模型。
有关更多信息,请参阅 mlflow.catboost。
CatBoost pyfunc 用法
对于 CatBoost 分类器模型,pyfunc predict() 方法的示例配置如下
import mlflow
from mlflow.models import infer_signature
from catboost import CatBoostClassifier
from sklearn import datasets
# prepare data
X, y = datasets.load_wine(as_frame=False, return_X_y=True)
# train the model
model = CatBoostClassifier(
iterations=5,
loss_function="MultiClass",
allow_writing_files=False,
)
model.fit(X, y)
# create model signature
predictions = model.predict(X)
signature = infer_signature(X, predictions)
# log the model into a mlflow run
with mlflow.start_run():
model_info = mlflow.catboost.log_model(model, name="model", signature=signature)
# load the logged model and make a prediction
catboost_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
print(catboost_pyfunc.predict(X[:5]))
Spacy (spaCy)
spaCy 集成的完整指南可以在此处查看:can be viewed here。
Statsmodels (statsmodels)
statsmodels 模型口味支持通过 Statsmodels 模型的 MLflow 格式日志记录,记录方式为使用 mlflow.statsmodels.save_model() 和 mlflow.statsmodels.log_model() 方法。这些方法还会向其生成的 MLflow 模型添加 python_function 口味,允许通过 mlflow.pyfunc.load_model() 将模型解释为通用的 Python 函数进行推理。加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。您也可以使用 mlflow.statsmodels.load_model() 方法以原生的 statsmodels 格式加载带有 statsmodels 模型口味的 MLflow 模型。
目前,自动日志记录仅限于通过对 statsmodels 模型调用 fit 生成的参数、指标和模型。
Statsmodels pyfunc 用法
以下 2 个示例说明了如何使用来自以下 statsmodels api 的基本回归模型 (OLS) 和 ARIMA 时间序列模型:statsmodels.formula.api 和 statsmodels.tsa.api
对于一个最小的 statsmodels 回归模型,下面是 pyfunc predict() 方法的示例
import mlflow
import pandas as pd
from sklearn.datasets import load_diabetes
import statsmodels.formula.api as smf
# load the diabetes dataset from sklearn
diabetes = load_diabetes()
# create X and y dataframes for the features and target
X = pd.DataFrame(data=diabetes.data, columns=diabetes.feature_names)
y = pd.DataFrame(data=diabetes.target, columns=["target"])
# concatenate X and y dataframes
df = pd.concat([X, y], axis=1)
# create the linear regression model (ordinary least squares)
model = smf.ols(
formula="target ~ age + sex + bmi + bp + s1 + s2 + s3 + s4 + s5 + s6", data=df
)
mlflow.statsmodels.autolog(
log_models=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None,
)
with mlflow.start_run():
res = model.fit(method="pinv", use_t=True)
model_info = mlflow.statsmodels.log_model(res, name="OLS_model")
# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# generate predictions
predictions = statsmodels_pyfunc.predict(X)
print(predictions)
对于一个最小的时间序列 ARIMA 模型,下面是 pyfunc predict() 方法的示例
import mlflow
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
# create a time series dataset with seasonality
np.random.seed(0)
# generate a time index with a daily frequency
dates = pd.date_range(start="2022-12-01", end="2023-12-01", freq="D")
# generate the seasonal component (weekly)
seasonality = np.sin(np.arange(len(dates)) * (2 * np.pi / 365.25) * 7)
# generate the trend component
trend = np.linspace(-5, 5, len(dates)) + 2 * np.sin(
np.arange(len(dates)) * (2 * np.pi / 365.25) * 0.1
)
# generate the residual component
residuals = np.random.normal(0, 1, len(dates))
# generate the final time series by adding the components
time_series = seasonality + trend + residuals
# create a dataframe from the time series
data = pd.DataFrame({"date": dates, "value": time_series})
data.set_index("date", inplace=True)
order = (1, 0, 0)
# create the ARIMA model
model = ARIMA(data, order=order)
mlflow.statsmodels.autolog(
log_models=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None,
)
with mlflow.start_run():
res = model.fit()
mlflow.log_params(
{
"order": order,
"trend": model.trend,
"seasonal_order": model.seasonal_order,
}
)
mlflow.log_params(res.params)
mlflow.log_metric("aic", res.aic)
mlflow.log_metric("bic", res.bic)
model_info = mlflow.statsmodels.log_model(res, name="ARIMA_model")
# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# prediction dataframes for a TimeSeriesModel must have exactly one row and include columns called start and end
start = pd.to_datetime("2024-01-01")
end = pd.to_datetime("2024-01-07")
# generate predictions
prediction_data = pd.DataFrame({"start": start, "end": end}, index=[0])
predictions = statsmodels_pyfunc.predict(prediction_data)
print(predictions)
有关更多信息,请参阅 mlflow.statsmodels。
Prophet (prophet)
prophet 集成的完整指南可以在此处查看:can be viewed here。
有关更多信息,请参阅 mlflow.prophet。
Pmdarima (pmdarima)
pmdarima 模型口味支持通过 pmdarima 模型的 MLflow 格式日志记录,记录方式为使用 mlflow.pmdarima.save_model() 和 mlflow.pmdarima.log_model() 方法。这些方法还会向其生成的 MLflow 模型添加 python_function 口味,允许通过 mlflow.pyfunc.load_model() 将模型解释为通用的 Python 函数进行推理。加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。您也可以使用 mlflow.pmdarima.load_model() 方法以原生的 pmdarima 格式加载带有 pmdarima 模型口味的 MLflow 模型。
用于生成预测的 pyfunc 类型的 pmdarima 模型的接口使用一个单行 Pandas DataFrame 配置参数。此配置 Pandas DataFrame 中支持以下列
n_periods(必需) - 指定要生成的目标未来周期数,从训练数据集的最后一个日期时间值开始,利用模型训练时输入训练序列的频率(例如,如果训练数据序列元素代表每小时一个值,为了预测未来 3 天的数据,将列n_periods设置为72)X(可选) - 外生回归变量值 (仅支持 pmdarima 版本 >= 1.8.0) 一个未来时间段事件的二维值数组。有关更多信息,请阅读底层库的解释。return_conf_int(可选) - 一个布尔值 (默认:False),表示是否返回置信区间值。请参阅上方的注释。alpha(可选) - 用于计算置信区间的显著性值。(默认:0.05)
下面展示了一个 pyfunc 预测 pmdarima 模型的示例配置,其中未来预测周期数为 100,计算置信区间,没有外生回归变量元素,默认 alpha 值为 0.05
| 索引 | n-periods | return_conf_int |
|---|---|---|
| 0 | 100 | True |
传递给 pmdarima pyfunc 口味的 Pandas DataFrame 必须只包含 1 行。
预测 pmdarima 口味时,predict 方法的 DataFrame 配置列 return_conf_int 的值控制输出格式。当该列的值设置为 False 或 None (如果未在配置 DataFrame 中提供此列,则为默认值) 时,返回的 Pandas DataFrame 的模式为单列:["yhat"]。当设置为 True 时,返回的 DataFrame 的模式为:["yhat", "yhat_lower", "yhat_upper"],其中包含预测值 (yhat) 的相应下限 (yhat_lower) 和上限 (yhat_upper) 置信区间。
将作为 pyfunc 加载的 pmdarima 制品的示例用法,并计算了置信区间
import pmdarima
import mlflow
import pandas as pd
data = pmdarima.datasets.load_airpassengers()
with mlflow.start_run():
model = pmdarima.auto_arima(data, seasonal=True)
mlflow.pmdarima.save_model(model, "/tmp/model.pmd")
loaded_pyfunc = mlflow.pyfunc.load_model("/tmp/model.pmd")
prediction_conf = pd.DataFrame(
[{"n_periods": 4, "return_conf_int": True, "alpha": 0.1}]
)
predictions = loaded_pyfunc.predict(prediction_conf)
输出 (Pandas DataFrame)
| 索引 | yhat | yhat_lower | yhat_upper |
|---|---|---|---|
| 0 | 467.573731 | 423.30995 | 511.83751 |
| 1 | 490.494467 | 416.17449 | 564.81444 |
| 2 | 509.138684 | 420.56255 | 597.71117 |
| 3 | 492.554714 | 397.30634 | 587.80309 |
如果 return_conf_int 设置为 True 且来自非 pyfunc 制品,则 pmdarima 的签名日志记录将无法正常工作。原生 ARIMA.predict() 在返回置信区间时的输出不是已识别的签名类型。
John Snow Labs (johnsnowlabs)
johnsnowlabs 模型口味为您提供了访问 200+ 种语言的 20,000+ 尖端企业 NLP 模型的权限,涵盖医疗、金融、法律和更多领域。
您可以使用 mlflow.johnsnowlabs.log_model() 来记录和导出您的模型为
这使您能够将任何 John Snow Labs 模型集成到 MLflow 框架中。您可以使用 MLflow 的服务功能轻松部署模型以进行推理。模型被解释为用于推理的通用 Python 函数,方法是使用 mlflow.pyfunc.load_model()。您也可以使用 mlflow.johnsnowlabs.load_model() 函数从存储的制品中加载带有 johnsnowlabs 口味的已保存或已记录的 MLflow 模型。
功能包括:大型语言模型 (LLM)、文本摘要、问答、命名实体识别、关系抽取、情感分析、拼写检查、图像分类、自动语音识别等等,由最新的 Transformer 架构提供支持。模型由 John Snow Labs 提供,需要 John Snow Labs 企业 NLP 许可证。您可以联系我们获取研究或行业许可证。
示例:将 John Snow Labs 模型导出为 MLflow 格式
import json
import os
import pandas as pd
from johnsnowlabs import nlp
import mlflow
from mlflow.pyfunc import spark_udf
# 1) Write your raw license.json string into the 'JOHNSNOWLABS_LICENSE_JSON' env variable for MLflow
creds = {
"AWS_ACCESS_KEY_ID": "...",
"AWS_SECRET_ACCESS_KEY": "...",
"SPARK_NLP_LICENSE": "...",
"SECRET": "...",
}
os.environ["JOHNSNOWLABS_LICENSE_JSON"] = json.dumps(creds)
# 2) Install enterprise libraries
nlp.install()
# 3) Start a Spark session with enterprise libraries
spark = nlp.start()
# 4) Load a model and test it
nlu_model = "en.classify.bert_sequence.covid_sentiment"
model_save_path = "my_model"
johnsnowlabs_model = nlp.load(nlu_model)
johnsnowlabs_model.predict(["I hate COVID,", "I love COVID"])
# 5) Export model with pyfunc and johnsnowlabs flavors
with mlflow.start_run():
model_info = mlflow.johnsnowlabs.log_model(johnsnowlabs_model, name=model_save_path)
# 6) Load model with johnsnowlabs flavor
mlflow.johnsnowlabs.load_model(model_info.model_uri)
# 7) Load model with pyfunc flavor
mlflow.pyfunc.load_model(model_save_path)
pandas_df = pd.DataFrame({"text": ["Hello World"]})
spark_df = spark.createDataFrame(pandas_df).coalesce(1)
pyfunc_udf = spark_udf(
spark=spark,
model_uri=model_save_path,
env_manager="virtualenv",
result_type="string",
)
new_df = spark_df.withColumn("prediction", pyfunc_udf(*pandas_df.columns))
# 9) You can now use the mlflow models serve command to serve the model see next section
# 10) You can also use x command to deploy model inside of a container see next section
将 John Snow Labs 模型作为容器部署
- 启动 Docker 容器
docker run -p 5001:8080 -e JOHNSNOWLABS_LICENSE_JSON=your_json_string "mlflow-pyfunc"
- 查询服务器
curl http://127.0.0.1:5001/invocations -H 'Content-Type: application/json' -d '{
"dataframe_split": {
"columns": ["text"],
"data": [["I hate covid"], ["I love covid"]]
}
}'
在不使用容器的情况下部署 John Snow Labs 模型
- 导出环境变量并启动服务器
export JOHNSNOWLABS_LICENSE_JSON=your_json_string
mlflow models serve -m <model_uri>
- 查询服务器
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
"dataframe_split": {
"columns": ["text"],
"data": [["I hate covid"], ["I love covid"]]
}
}'
Transformers (transformers)
有关使用 transformers 集成的完整指南,包括教程和详细文档,请可在此处找到。
SentenceTransformers (sentence_transformers)
sentence-transformers 集成的完整指南可以在此处查看:can be viewed here。
模型评估
MLflow 评估文档已迁移,可在此处找到。
模型定制
虽然 MLflow 内置的模型持久化实用程序方便于将来自各种流行 ML 库的模型打包成 MLflow Model 格式,但它们并不涵盖所有用例。例如,您可能希望使用 MLflow 内置口味未明确支持的 ML 库中的模型。或者,您可能希望打包自定义推理代码和数据来创建 MLflow 模型。幸运的是,MLflow 提供了两种可用于完成这些任务的解决方案:自定义 Python 模型和自定义口味。
在本节中
自定义 Python 模型
mlflow.pyfunc 模块提供了 save_model() 和 log_model() 实用程序,用于创建带有 python_function 口味的 MLflow 模型,这些模型包含用户指定的代码和制品(文件)依赖项。这些制品依赖项可以包括由任何 Python ML 库生成的序列化模型。
由于这些自定义模型包含 python_function 口味,因此可以将它们部署到 MLflow 支持的任何生产环境,例如 SageMaker、AzureML 或本地 REST 端点。
以下示例演示了如何使用 mlflow.pyfunc 模块创建自定义 Python 模型。有关使用 MLflow 的 python_function 实用程序进行模型定制的附加信息,请参阅python_function 自定义模型文档。
示例:创建带有类型提示的模型
此示例演示了如何创建带有类型提示的自定义 Python 模型,使 MLflow 能够基于模型输入指定的类型提示执行数据验证。有关 PythonModel 类型提示支持的附加信息,请参阅PythonModel 类型提示指南。
带有类型提示的 PythonModel 从 MLflow 2.20.0 版本开始支持数据验证。
import pydantic
import mlflow
from mlflow.pyfunc import PythonModel
# Define the pydantic model input
class Message(pydantic.BaseModel):
role: str
content: str
class CustomModel(PythonModel):
# Define the model_input type hint
# NB: it must be list[...], check the python model type hints guide for more information
def predict(self, model_input: list[Message], params=None) -> list[str]:
return [m.content for m in model_input]
# Construct the model and test
model = CustomModel()
# The input_example can be a list of Message objects as defined in the type hint
input_example = [
Message(role="system", content="Hello"),
Message(role="user", content="Hi"),
]
assert model.predict(input_example) == ["Hello", "Hi"]
# The input example can also be a list of dictionaries that match the Message schema
input_example = [
{"role": "system", "content": "Hello"},
{"role": "user", "content": "Hi"},
]
assert model.predict(input_example) == ["Hello", "Hi"]
# Log the model
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
name="model",
python_model=model,
input_example=input_example,
)
# Load the model as pyfunc
pyfunc_model = mlflow.pyfunc.load_model(model_info.model_uri)
assert pyfunc_model.predict(input_example) == ["Hello", "Hi"]
示例:创建一个自定义“add n”模型
此示例定义了一个自定义模型的类,该模型将指定的数值 n 添加到 Pandas DataFrame 输入的所有列中。然后,它使用 mlflow.pyfunc API 以 MLflow Model 格式保存该模型的实例,其中 n = 5。最后,它以 python_function 格式加载模型并使用它来评估样本输入。
import mlflow.pyfunc
# Define the model class
class AddN(mlflow.pyfunc.PythonModel):
def __init__(self, n):
self.n = n
def predict(self, context, model_input, params=None):
return model_input.apply(lambda column: column + self.n)
# Construct and save the model
model_path = "add_n_model"
add5_model = AddN(n=5)
mlflow.pyfunc.save_model(path=model_path, python_model=add5_model)
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(model_path)
# Evaluate the model
import pandas as pd
model_input = pd.DataFrame([range(10)])
model_output = loaded_model.predict(model_input)
assert model_output.equals(pd.DataFrame([range(5, 15)]))
示例:以 MLflow 格式保存 XGBoost 模型
此示例首先使用 XGBoost 库训练并保存梯度提升树模型。接下来,它在 XGBoost 模型周围定义一个包装器类,该包装器符合 MLflow 的 python_function 推理 API。然后,它使用包装器类和已保存的 XGBoost 模型来构建一个执行推理的 MLflow 模型,该推理使用梯度提升树。最后,它以 python_function 格式加载 MLflow 模型并使用它来评估测试数据。
# Load training and test datasets
from sys import version_info
import xgboost as xgb
from sklearn import datasets
from sklearn.model_selection import train_test_split
PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
iris = datasets.load_iris()
x = iris.data[:, 2:]
y = iris.target
x_train, x_test, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=42)
dtrain = xgb.DMatrix(x_train, label=y_train)
# Train and save an XGBoost model
xgb_model = xgb.train(params={"max_depth": 10}, dtrain=dtrain, num_boost_round=10)
xgb_model_path = "xgb_model.pth"
xgb_model.save_model(xgb_model_path)
# Create an `artifacts` dictionary that assigns a unique name to the saved XGBoost model file.
# This dictionary will be passed to `mlflow.pyfunc.save_model`, which will copy the model file
# into the new MLflow Model's directory.
artifacts = {"xgb_model": xgb_model_path}
# Define the model class
import mlflow.pyfunc
class XGBWrapper(mlflow.pyfunc.PythonModel):
def load_context(self, context):
import xgboost as xgb
self.xgb_model = xgb.Booster()
self.xgb_model.load_model(context.artifacts["xgb_model"])
def predict(self, context, model_input, params=None):
input_matrix = xgb.DMatrix(model_input.values)
return self.xgb_model.predict(input_matrix)
# Create a Conda environment for the new MLflow Model that contains all necessary dependencies.
import cloudpickle
conda_env = {
"channels": ["defaults"],
"dependencies": [
f"python={PYTHON_VERSION}",
"pip",
{
"pip": [
f"mlflow=={mlflow.__version__}",
f"xgboost=={xgb.__version__}",
f"cloudpickle=={cloudpickle.__version__}",
],
},
],
"name": "xgb_env",
}
# Save the MLflow Model
mlflow_pyfunc_model_path = "xgb_mlflow_pyfunc"
mlflow.pyfunc.save_model(
path=mlflow_pyfunc_model_path,
python_model=XGBWrapper(),
artifacts=artifacts,
conda_env=conda_env,
)
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)
# Evaluate the model
import pandas as pd
test_predictions = loaded_model.predict(pd.DataFrame(x_test))
print(test_predictions)
示例:使用 hf:/ 模式记录 transformers 模型以避免复制大文件
此示例展示了如何使用特殊的 hf:/ 模式直接从 huggingface hub 记录 transformers 模型。当模型太大,尤其是在您想直接服务该模型时,这很有用,但如果您想在本地下载和测试模型,则不会节省空间。
import mlflow
from mlflow.models import infer_signature
import numpy as np
import transformers
# Define a custom PythonModel
class QAModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
"""
This method initializes the tokenizer and language model
using the specified snapshot location from model context.
"""
snapshot_location = context.artifacts["bert-tiny-model"]
# Initialize tokenizer and language model
tokenizer = transformers.AutoTokenizer.from_pretrained(snapshot_location)
model = transformers.BertForQuestionAnswering.from_pretrained(snapshot_location)
self.pipeline = transformers.pipeline(
task="question-answering", model=model, tokenizer=tokenizer
)
def predict(self, context, model_input, params=None):
question = model_input["question"][0]
if isinstance(question, np.ndarray):
question = question.item()
ctx = model_input["context"][0]
if isinstance(ctx, np.ndarray):
ctx = ctx.item()
return self.pipeline(question=question, context=ctx)
# Log the model
data = {"question": "Who's house?", "context": "The house is owned by Run."}
pyfunc_artifact_path = "question_answering_model"
with mlflow.start_run() as run:
model_info = mlflow.pyfunc.log_model(
name=pyfunc_artifact_path,
python_model=QAModel(),
artifacts={"bert-tiny-model": "hf:/prajjwal1/bert-tiny"},
input_example=data,
signature=infer_signature(data, ["Run"]),
extra_pip_requirements=["torch", "accelerate", "transformers", "numpy"],
)
自定义口味
要了解如何构建自定义集成以及查看社区开发的扩展库支持的示例,请访问社区模型口味页面。
在部署前验证模型
使用 MLflow Tracking 记录模型后,强烈建议在将模型部署到生产环境之前在本地验证模型。mlflow.models.predict() API 提供了一种方便的方式来在虚拟环境中测试您的模型,提供隔离的执行和若干优势
- 模型依赖项验证:API 通过在虚拟环境中用输入示例执行模型,帮助确保与模型一起记录的依赖项是正确和充分的。有关更多详细信息,请参阅验证预测环境。
- 输入数据验证:API 可用于验证输入数据是否按预期与模型交互,方法是模拟模型服务期间的相同数据处理过程。确保输入数据是与 pyfunc 模型的 predict 函数要求对齐的有效示例。
- 额外环境变量验证:通过指定
extra_envs参数,您可以测试运行模型是否需要额外的环境变量。请注意,os.environ中所有现有的环境变量会自动传递到虚拟环境中。
import mlflow
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
return model_input
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
name="model",
python_model=MyModel(),
input_example=["a", "b", "c"],
)
mlflow.models.predict(
model_uri=model_info.model_uri,
input_data=["a", "b", "c"],
pip_requirements_override=["..."],
extra_envs={"MY_ENV_VAR": "my_value"},
)
如果您的模型依赖项(参见模型制品的 requirements.txt)包含预发布包(例如,mlflow==3.2.0rc0),请通过 extra_envs 字段将环境变量 UV_PRERELEASE=allow 设置为允许。
mlflow.models.predict(
model_uri=model_info.model_uri,
input_data=["a", "b", "c"],
extra_envs={"UV_PRERELEASE": "allow"},
)
环境管理器
mlflow.models.predict() API 支持以下环境管理器来创建用于预测的虚拟环境
- virtualenv:默认的环境管理器。
- uv:一个用 Rust 编写的极快的环境管理器。自 MLflow 2.20.0 起这是一个实验性功能。
- conda:使用 conda 创建环境。
local:使用当前环境运行模型。请注意,此模式下不支持pip_requirements_override。
从 MLflow 2.20.0 开始,uv 可用,并且速度极快。运行 pip install uv 来安装 uv,或参考uv 安装指南获取其他安装方法。
使用 uv 为预测创建虚拟环境的示例
import mlflow
mlflow.models.predict(
model_uri="models:/<model_id>",
input_data="your_data",
env_manager="uv",
)
内置部署工具
此信息已移至MLflow 部署页面。
将 python_function 模型导出为 Apache Spark UDF
如果您的模型推理延迟非常长(例如 transformers 模型),可能超过默认的 60 秒超时,您可以在定义 MLflow 模型的 spark_udf 实例时使用 extra_env 参数,指定覆盖环境变量 MLFLOW_SCORING_SERVER_REQUEST_TIMEOUT。有关进一步指导,请参阅 :py:func:mlflow.pyfunc.spark_udf。
您可以将 python_function 模型输出为 Apache Spark UDF,它可以上传到 Spark 集群并用于对模型进行评分。
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model>")
df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))
如果模型包含签名,则可以在不指定列名参数的情况下调用 UDF。在这种情况下,UDF 将使用来自签名的列名进行调用,因此评估数据框的列名必须与模型签名的列名匹配。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model-with-signature>")
df = spark_df.withColumn("prediction", pyfunc_udf())
如果模型包含带有张量规范输入的签名,您需要将一个数组类型的列作为相应的 UDF 参数传递。此列中的值必须由一维数组组成。UDF 将使用 'C' 顺序(即使用类 C 的索引顺序读取/写入元素)重塑数组值并将值转换为所需的张量规范类型。例如,假设一个模型需要输入 'a' 形状为 (-1, 2, 3),输入 'b' 形状为 (-1, 4, 5)。为了对这些数据进行推理,我们需要准备一个 Spark DataFrame,其中包含列 'a'(包含长度为 6 的数组)和列 'b'(包含长度为 20 的数组)。然后我们可以像下面的示例代码一样调用 UDF
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Assuming the model requires input 'a' of shape (-1, 2, 3) and input 'b' of shape (-1, 4, 5)
model_path = "<path-to-model-requiring-multidimensional-inputs>"
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_path)
# The `spark_df` has column 'a' containing arrays of length 6 and
# column 'b' containing arrays of length 20
df = spark_df.withColumn("prediction", pyfunc_udf(struct("a", "b")))
生成的 UDF 基于 Spark 的 Pandas UDF,目前仅限于为每次观测生成单个值、值数组或包含多个相同类型字段的结构体。默认情况下,我们返回第一个适合 double 的数值列。您可以通过提供 result_type 参数来控制返回的结果。支持以下值
'int'或 IntegerType:返回最左边且能适配int32结果的整数,如果没有则引发异常。'long'或 LongType:返回最左边且能适配int64结果的长整数,如果没有则引发异常。- ArrayType (IntegerType | LongType):返回所有能适配请求大小的整数列。
'float'或 FloatType:返回最左边且能转换为float32的数值结果,如果没有数值列则引发异常。'double'或 DoubleType:返回最左边且能转换为double的数值结果,如果没有数值列则引发异常。- ArrayType ( FloatType | DoubleType ): 返回所有转换为请求类型的数值列。如果没有数值列,则引发异常。
'string'或 StringType:结果是转换为字符串的最左边列。- ArrayType ( StringType ): 返回所有转换为字符串的列。
'bool'或'boolean'或 BooleanType:返回最左边且能转换为bool的列,如果值无法转换则引发异常。'field1 FIELD1_TYPE, field2 FIELD2_TYPE, ...':一个结构体类型,包含由逗号分隔的多个字段,每个字段类型必须是上述列出的类型之一。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Suppose the PyFunc model `predict` method returns a dict like:
# `{'prediction': 1-dim_array, 'probability': 2-dim_array}`
# You can supply result_type to be a struct type containing
# 2 fields 'prediction' and 'probability' like following.
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, "<path-to-model>", result_type="prediction float, probability: array<float>"
)
df = spark_df.withColumn("prediction", pyfunc_udf())
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, "path/to/model", result_type=ArrayType(FloatType())
)
# The prediction column will contain all the numeric columns returned by the model as floats
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))
如果您想使用 conda 来恢复用于训练模型的 python 环境,请在调用 mlflow.pyfunc.spark_udf() 时设置 env_manager 参数。
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark,
"path/to/model",
result_type=ArrayType(FloatType()),
env_manager="conda", # Use conda to restore the environment used in training
)
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))
如果您想通过远程客户端的 Databricks connect 调用 mlflow.pyfunc.spark_udf(),则需要在 Databricks 运行时中首先构建模型环境。
from mlflow.pyfunc import build_model_env
# Build the model env and save it as an archive file to the provided UC volume directory
# and print the saved model env archive file path (like '/Volumes/.../.../XXXXX.tar.gz')
print(build_model_env(model_uri, "/Volumes/..."))
# print the cluster id. Databricks Connect client needs to use the cluster id.
print(spark.conf.get("spark.databricks.clusterUsageTags.clusterId"))
预构建模型环境后,您可以通过 Databricks connect 在远程客户端使用 'prebuilt_model_env' 参数运行 mlflow.pyfunc.spark_udf(),
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.remote(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
cluster_id="<cluster id>", # get cluster id by spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
).getOrCreate()
# The path generated by `build_model_env` in Databricks runtime.
model_env_uc_uri = "dbfs:/Volumes/.../.../XXXXX.tar.gz"
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, model_uri, prebuilt_env_uri=model_env_uc_uri
)
部署到自定义目标
除了内置的部署工具外,MLflow 还提供了一个可插拔的 mlflow.deployments() 和 mlflow deployments CLI,用于将模型部署到自定义目标和环境。要部署到自定义目标,您必须首先安装适当的第三方 Python 插件。请在此处查看已知社区维护的插件列表:here。
命令
mlflow deployments CLI 包含以下命令,也可以使用 mlflow.deployments Python API 编程式调用
- Create:将 MLflow 模型部署到指定的自定义目标
- Delete:删除部署
- Update:更新现有部署,例如部署新模型版本或更改部署的配置(例如增加副本数)
- List:列出所有部署的 ID
- Get:打印特定部署的详细描述
- Run Local:在本地部署模型以进行测试
- Help:显示指定目标的帮助字符串
更多信息,请参阅
mlflow deployments --help
mlflow deployments create --help
mlflow deployments delete --help
mlflow deployments update --help
mlflow deployments list --help
mlflow deployments get --help
mlflow deployments run-local --help
mlflow deployments help --help
社区模型口味
请访问社区模型口味页面,以获取 MLflow 社区开发和维护的其他有用 MLflow 口味的概述。