MLflow Models
MLflow Model 是一种用于打包机器学习模型的标准格式,可用于各种下游工具,例如通过 REST API 进行实时服务或在 Apache Spark 上进行批量推理。该格式定义了一种约定,允许您以不同的“风格(flavor)”保存模型,这些风格可由不同的下游工具理解。
存储格式
每个 MLflow Model 都是一个目录,包含任意文件,以及目录根目录中的 MLmodel
文件,该文件可以定义模型的多种可查看的风格(flavor)。
MLflow Model 的模型方面可以是序列化对象(例如,pickle 化的 scikit-learn
模型),也可以是包含使用 mlflow.models.set_model()
API 定义的模型实例的 Python 脚本(如果在 Databricks 中运行,也可以是 notebook)。
风格(Flavors)是使 MLflow Models 强大的关键概念:它们是部署工具用来理解模型的约定,这使得编写可以与任何机器学习库中的模型协同工作的工具成为可能,而无需将每个工具与每个库集成。MLflow 定义了几种“标准”风格,其所有内置部署工具都支持这些风格,例如描述如何将模型作为 Python 函数运行的“Python 函数”风格。但是,库也可以定义和使用其他风格。例如,MLflow 的 mlflow.sklearn
库允许将模型作为 scikit-learn Pipeline
对象加载回来,用于了解 scikit-learn 的代码中,或者作为通用 Python 函数用于只需要应用模型的工具中(例如,使用选项 -t sagemaker
的 mlflow deployments
工具,用于将模型部署到 Amazon SageMaker)。
MLmodel 文件
特定模型支持的所有风格都在其 YAML 格式的 MLmodel
文件中定义。例如,从 MLflow 仓库运行 python examples/sklearn_logistic_regression/train.py
将在 model
目录下创建以下文件
# Directory written by mlflow.sklearn.save_model(model, "model", input_example=...)
model/
├── MLmodel
├── model.pkl
├── conda.yaml
├── python_env.yaml
├── requirements.txt
├── input_example.json (optional, only logged when input example is provided and valid during model logging)
├── serving_input_example.json (optional, only logged when input example is provided and valid during model logging)
└── environment_variables.txt (optional, only logged when environment variables are used during model inference)
并且其 MLmodel
文件描述了两种风格
time_created: 2018-05-25T17:28:53.35
flavors:
sklearn:
sklearn_version: 0.19.1
pickled_model: model.pkl
python_function:
loader_module: mlflow.sklearn
除了列出模型风格的 flavors 字段外,MLmodel YAML 格式还可以包含以下字段
time_created
: 模型创建的日期和时间,采用 UTC ISO 8601 格式。run_id
: 创建模型的运行 ID,如果模型是使用跟踪保存的。signature
: JSON 格式的模型签名。input_example
: 引用包含输入示例的 artifact。databricks_runtime
: Databricks 运行时版本和类型,如果模型是在 Databricks notebook 或 job 中训练的。mlflow_version
: 用于记录模型的 MLflow 版本。
附加记录的文件
为了重新创建环境,每当记录模型时,我们都会自动记录 conda.yaml
、python_env.yaml
和 requirements.txt
文件。然后,这些文件可用于使用 conda
或 virtualenv
和 pip
重新安装依赖项。有关这些文件的更多详细信息,请参阅MLflow Model 如何记录依赖项。
如果在记录模型时提供了模型输入示例,则会记录两个附加文件 input_example.json
和 serving_input_example.json
。有关详细信息,请参阅模型输入示例。
记录模型时,模型元数据文件(MLmodel
、conda.yaml
、python_env.yaml
、requirements.txt
)会复制到名为 metadata
的子目录中。对于 wheeled 模型,还会复制 original_requirements.txt
文件。
当从 MLflow Model Registry 下载注册的模型时,下载器一侧的模型目录中会添加一个名为 registered_model_meta
的 YAML 文件。此文件包含 MLflow Model Registry 中引用的模型的名称和版本,将用于部署及其他目的。
如果您在 Databricks 中记录模型,MLflow 也会在模型目录中创建一个名为 metadata
的子目录。该子目录包含上述元数据文件的轻量级副本,供内部使用。
环境变量文件
MLflow 在记录模型时,会将模型推理期间使用的环境变量记录到 environment_variables.txt
文件中。
environment_variables.txt
文件仅包含模型推理期间使用的环境变量的名称,值不存储。
目前,MLflow 仅记录名称中包含以下任何关键字的环境变量
RECORD_ENV_VAR_ALLOWLIST = {
# api key related
"API_KEY", # e.g. OPENAI_API_KEY
"API_TOKEN",
# databricks auth related
"DATABRICKS_HOST",
"DATABRICKS_USERNAME",
"DATABRICKS_PASSWORD",
"DATABRICKS_TOKEN",
"DATABRICKS_INSECURE",
"DATABRICKS_CLIENT_ID",
"DATABRICKS_CLIENT_SECRET",
"_DATABRICKS_WORKSPACE_HOST",
"_DATABRICKS_WORKSPACE_ID",
}
使用环境变量的 pyfunc 模型示例
import mlflow
import os
os.environ["TEST_API_KEY"] = "test_api_key"
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
if os.environ.get("TEST_API_KEY"):
return model_input
raise Exception("API key not found")
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
"model", python_model=MyModel(), input_example="data"
)
环境变量 TEST_API_KEY
在 environment_variables.txt 文件中记录如下
# This file records environment variable names that are used during model inference.
# They might need to be set when creating a serving endpoint from this model.
# Note: it is not guaranteed that all environment variables listed here are required
TEST_API_KEY
在将模型部署到服务端点之前,请检查 environment_variables.txt 文件,以确保已设置模型推理所需的所有环境变量。请注意,文件中列出的所有环境变量并非始终是模型推理所必需的。有关在 databricks 服务端点上设置环境变量的详细说明,请参阅此指南。
要禁用此功能,请将环境变量 MLFLOW_RECORD_ENV_VARS_IN_MODEL_LOGGING
设置为 false
。
管理模型依赖项
MLflow Model 会推断模型风格所需的依赖项并自动记录它们。但是,它也允许您定义额外的依赖项或自定义 Python 代码,并提供一个工具在沙箱环境中验证它们。有关更多详细信息,请参阅MLflow Models 中的依赖管理。
模型签名和输入示例
在 MLflow 中,理解模型签名和输入示例的复杂性对于有效的模型管理和部署至关重要。
- 模型签名:定义模型输入、输出和附加推理参数的模式,促进模型交互的标准化接口。
- 模型输入示例:提供有效模型输入的具体实例,有助于理解和测试模型要求。此外,如果在记录模型时提供了输入示例,则如果未明确提供,将自动推断并存储模型签名。
- 模型服务有效载荷示例:提供用于查询部署模型端点的 json 有效载荷示例。如果在记录模型时提供了输入示例,则会根据输入示例自动生成服务有效载荷示例,并将其保存为
serving_input_example.json
。
我们的文档深入探讨了几个关键领域
- 支持的签名类型:我们涵盖了支持的不同数据类型,例如传统机器学习模型的表格数据和深度学习模型的张量。
- 签名强制执行:讨论 MLflow 如何强制执行模式合规性,确保提供的输入与模型预期相符。
- 记录带签名的模型:指导如何在记录模型时包含签名,提高模型操作的清晰度和可靠性。
要详细了解这些概念,包括示例和最佳实践,请访问模型签名和示例指南。如果您想查看签名强制执行的实际应用,请参阅模型签名 notebook 教程以了解更多信息。
模型 API
您可以通过多种方式保存和加载 MLflow Models。首先,MLflow 集成了几个常用库。例如,mlflow.sklearn 包含用于 scikit-learn 模型的 save_model、log_model 和 load_model 函数。其次,您可以使用 mlflow.models.Model
类来创建和写入模型。此类具有四个关键功能
- add_flavor 用于向模型添加风格。每种风格都有一个字符串名称和一个键值属性字典,其中值可以是任何可以序列化为 YAML 的对象。
- save 用于将模型保存到本地目录。
- log 用于使用 MLflow Tracking 将模型作为 artifact 记录到当前运行中。
- load 用于从本地目录或从先前运行中的 artifact 加载模型。
代码模型
要详细了解代码模型功能,请访问深入指南以获取更详细的解释和更多示例。
代码模型功能在 MLflow 2.12.2 及更高版本中可用。此功能是实验性的,在未来版本中可能会有所更改。
代码模型功能允许您直接从独立的 python 脚本定义和记录模型。当您希望记录可以有效存储为代码表示的模型(无需通过训练优化权重的模型)或依赖外部服务(例如 LangChain 链)的应用程序时,此功能特别有用。另一个好处是,此方法完全绕过了 Python 中使用 pickle
或 cloudpickle
模块,因为在加载不受信任的模型时,这些模块可能存在安全风险。
此功能仅支持 LangChain、LlamaIndex 和 PythonModel 模型。
为了从代码记录模型,您可以利用 mlflow.models.set_model()
API。此 API 允许您通过直接在定义模型的文件中指定模型类实例来定义模型。记录此类模型时,会指定一个文件路径(而不是对象),该路径指向包含模型类定义以及应用于自定义模型实例的 set_model
API 用法的 Python 文件。
下图比较了标准模型记录过程和适用于使用代码模型功能保存的模型的代码模型功能
例如,在名为 my_model.py
的单独文件中定义模型
import mlflow
from mlflow.models import set_model
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input):
return model_input
# Define the custom PythonModel instance that will be used for inference
set_model(MyModel())
代码模型功能不支持捕获来自外部文件引用的 import 语句。如果您有未通过 pip
安装捕获的依赖项,则需要使用 code_paths 功能,通过适当的绝对路径 import 引用来包含和解析依赖项。为简单起见,由于 code_paths
依赖路径解析的限制,建议将从代码定义模型所需的所有本地依赖项封装在同一个 python 脚本文件中。
从代码定义模型并使用 mlflow.models.set_model()
API 时,内部会执行被记录脚本中定义的代码,以确保其为有效代码。如果您的脚本内有连接到外部服务(例如,您正在 LangChain 中连接到 GenAI 服务),请注意在记录模型时会向该服务产生连接请求。
然后,在另一个 python 脚本中从文件路径记录模型
import mlflow
model_path = "my_model.py"
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
python_model=model_path, # Define the model as the path to the Python file
artifact_path="my_model",
)
# Loading the model behaves exactly as if an instance of MyModel had been logged
my_model = mlflow.pyfunc.load_model(model_info.model_uri)
mlflow.models.set_model()
API 不是线程安全的。如果您从多个线程同时记录模型,请勿尝试使用此功能。此 fluent API 利用一个全局活动模型状态,不保证一致性。如果您对线程安全的记录 API 感兴趣,请使用 mlflow.client.MlflowClient
API 记录模型。
内置模型风格
MLflow 提供了一些标准风格,这些风格可能在您的应用程序中有用。具体来说,其许多部署工具都支持这些风格,因此您可以导出自己的模型,使用其中一种风格,以受益于所有这些工具
- Python Function (
python_function
) - R Function (
crate
) - H2O (
h2o
) - Keras (
keras
) - MLeap (
mleap
) - PyTorch (
pytorch
) - Scikit-learn (
sklearn
) - Spark MLlib (
spark
) - TensorFlow (
tensorflow
) - ONNX (
onnx
) - XGBoost (
xgboost
) - LightGBM (
lightgbm
) - CatBoost (
catboost
) - Spacy(
spaCy
) - Fastai(
fastai
) - Statsmodels (
statsmodels
) - Prophet (
prophet
) - Pmdarima (
pmdarima
) - OpenAI (
openai
) (实验性) - LangChain (
langchain
) (实验性) - John Snow Labs (
johnsnowlabs
) (实验性) - Diviner (
diviner
) - Transformers (
transformers
) (实验性) - SentenceTransformers (
sentence_transformers
) (实验性) - Promptflow (
promptflow
) (实验性)
Python Function (python_function
)
python_function
模型风格作为 MLflow Python 模型的默认模型接口。任何 MLflow Python 模型都应可加载为 python_function
模型。这使得其他 MLflow 工具可以与任何 python 模型协同工作,无论使用哪种持久化模块或框架生成模型。这种互操作性非常强大,因为它允许任何 Python 模型在各种环境中投入生产。
此外,python_function
模型风格定义了一种用于 Python 模型的通用文件系统模型格式,并提供了用于将模型保存和加载到此格式的实用程序。该格式是自包含的,因为它包含了加载和使用模型所需的所有信息。依赖项可以直接与模型一起存储,也可以通过 conda 环境引用。此模型格式允许其他工具将其模型与 MLflow 集成。
如何将模型保存为 Python 函数
大多数 python_function
模型是作为其他模型风格的一部分保存的 - 例如,所有 mlflow 内置风格在导出的模型中都包含 python_function
风格。此外,mlflow.pyfunc
模块定义了明确创建 python_function
模型的函数。此模块还包括用于创建自定义 Python 模型的实用程序,这是向 ML 模型添加自定义 python 代码的便捷方法。有关更多信息,请参阅自定义 Python 模型文档。
有关如何从 python 脚本存储自定义模型(代码模型功能)的信息,请参阅代码模型指南以获取推荐方法。
如何加载和评估 Python 函数模型
加载模型
您可以使用 mlflow.pyfunc.load_model()
函数在 Python 中加载 python_function
模型。需要注意的是,load_model
假定所有依赖项都已可用,并且不会执行任何依赖项检查或安装。对于处理依赖项的部署选项,请参阅模型部署部分。
评估模型
模型加载后,可以通过两种主要方式进行评估
-
同步评估 标准的评估方法是使用
predict
方法,该方法支持各种输入类型,并根据输入数据返回标量或集合。该方法的签名是predict(data: Union[pandas.Series, pandas.DataFrame, numpy.ndarray, csc_matrix, csr_matrix, List[Any], Dict[str, Any], str],
params: Optional[Dict[str, Any]] = None) → Union[pandas.Series, pandas.DataFrame, numpy.ndarray, list, str] -
同步流式评估
注意predict_stream
是 MLflow 2.12.2 版本中新增的接口。MLflow 的早期版本不支持此接口。要在自定义 Python Function Model 中使用predict_stream
,您必须在模型类中实现predict_stream
方法并返回生成器类型。对于支持流式数据处理的模型,可以使用 predict_stream 方法。此方法返回一个
generator
,它产生响应流,从而能够高效处理大型数据集或连续数据流。请注意,并非所有模型类型都支持predict_stream
方法。其用法涉及迭代生成器以消费响应predict_stream(data: Any, params: Optional[Dict[str, Any]] = None) → GeneratorType
演示 predict_stream()
以下是演示如何使用 predict_stream()
方法定义、保存、加载和使用可流式模型的示例
import mlflow
import os
# Define a custom model that supports streaming
class StreamableModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
# Regular predict method implementation (optional for this demo)
return "regular-predict-output"
def predict_stream(self, context, model_input, params=None):
# Yielding elements one at a time
for element in ["a", "b", "c", "d", "e"]:
yield element
# Save the model to a directory
tmp_path = "/tmp/test_model"
pyfunc_model_path = os.path.join(tmp_path, "pyfunc_model")
python_model = StreamableModel()
mlflow.pyfunc.save_model(path=pyfunc_model_path, python_model=python_model)
# Load the model
loaded_pyfunc_model = mlflow.pyfunc.load_model(model_uri=pyfunc_model_path)
# Use predict_stream to get a generator
stream_output = loaded_pyfunc_model.predict_stream("single-input")
# Consuming the generator using next
print(next(stream_output)) # Output: 'a'
print(next(stream_output)) # Output: 'b'
# Alternatively, consuming the generator using a for-loop
for response in stream_output:
print(response) # This will print 'c', 'd', 'e'
Python Function 模型接口
所有 PyFunc 模型都支持将 pandas.DataFrame
作为输入。除了 pandas.DataFrame
,DL PyFunc 模型还支持以 numpy.ndarrays
形式的张量输入。要验证模型风格是否支持张量输入,请查看该风格的文档。
对于具有基于列模式的模型,输入通常以 pandas.DataFrame
的形式提供。如果为具有命名列的模式提供了将列名映射到值的字典作为输入,或者为具有未命名列的模式提供了 python List
或 numpy.ndarray
作为输入,MLflow 将把输入转换为 DataFrame。针对 DataFrame 执行模式强制和预期数据类型的类型转换。
对于具有基于张量模式的模型,输入通常以 numpy.ndarray
或将张量名称映射到其 np.ndarray 值的字典形式提供。模式强制将检查提供的输入的形状和类型是否与模型模式中指定的形状和类型匹配,如果不匹配则会引发错误。
对于未定义模式的模型,不对模型输入和输出进行任何更改。如果模型不接受提供的输入类型,MLflow 将传播模型引发的任何错误。
用于预测或推理加载 PyFunc 模型的 python 环境可能与训练模型时的环境不同。如果环境不匹配,调用 mlflow.pyfunc.load_model()
时会打印一条警告消息。此警告语句将识别训练期间使用的包与当前环境之间版本不匹配的包。为了获取训练模型时使用的环境的完整依赖项,您可以调用 mlflow.pyfunc.get_model_dependencies()
。此外,如果您想在模型训练使用的相同环境中运行模型推理,您可以调用 mlflow.pyfunc.spark_udf()
并将 env_manager
参数设置为“conda”。这将从 conda.yaml
文件生成环境,确保 python UDF 以训练期间使用的确切包版本执行。
一些 PyFunc 模型可能接受模型加载配置,该配置控制模型的加载方式和预测计算方式。您可以通过检查模型的风格元数据来了解模型支持哪些配置
model_info = mlflow.models.get_model_info(model_uri)
model_info.flavors[mlflow.pyfunc.FLAVOR_NAME][mlflow.pyfunc.MODEL_CONFIG]
或者,您可以加载 PyFunc 模型并检查 model_config
属性
pyfunc_model = mlflow.pyfunc.load_model(model_uri)
pyfunc_model.model_config
可以在加载时通过在 mlflow.pyfunc.load_model()
方法中指定 model_config
参数来更改模型配置
pyfunc_model = mlflow.pyfunc.load_model(model_uri, model_config=dict(temperature=0.93))
更改模型配置值时,这些值会使用保存模型时的配置。为模型指定无效的模型配置键会导致该配置被忽略。将显示警告,提及被忽略的条目。
模型配置与签名中带有默认值的参数:当您需要为模型发布者提供一种方式来更改模型加载到内存的方式以及如何计算所有样本的预测时,请使用模型配置。例如,像 user_gpu
这样的键。模型使用者无法在预测时更改这些值。使用签名中带有默认值的参数,为用户提供更改每个数据样本预测计算方式的能力。
R Function (crate
)
crate
模型风格定义了一种通用模型格式,用于使用 carrier 包中的 crate
函数将任意 R 预测函数表示为 MLflow 模型。预测函数应接受一个 dataframe 作为输入,并生成一个 dataframe、一个向量或一个列表作为输出,其中包含预测结果。
此风格需要安装 R 才能使用。
crate
用法
对于一个最小的 crate 模型,predict 函数的配置示例如下
library(mlflow)
library(carrier)
# Load iris dataset
data("iris")
# Learn simple linear regression model
model <- lm(Sepal.Width~Sepal.Length, data = iris)
# Define a crate model
# call package functions with an explicit :: namespace.
crate_model <- crate(
function(new_obs) stats::predict(model, data.frame("Sepal.Length" = new_obs)),
model = model
)
# log the model
model_path <- mlflow_log_model(model = crate_model, artifact_path = "iris_prediction")
# load the logged model and make a prediction
model_uri <- paste0(mlflow_get_run()$artifact_uri, "/iris_prediction")
mlflow_model <- mlflow_load_model(model_uri = model_uri,
flavor = NULL,
client = mlflow_client())
prediction <- mlflow_predict(model = mlflow_model, data = 5)
print(prediction)
H2O (h2o
)
h2o
模型风格支持记录和加载 H2O 模型。
mlflow.h2o 模块在 python 中定义了 save_model() 和 log_model() 方法,并在 R 中定义了 mlflow_save_model 和 mlflow_log_model 方法,用于以 MLflow Model 格式保存 H2O 模型。这些方法生成具有 python_function
风格的 MLflow Models,允许您通过 mlflow.pyfunc.load_model()
将它们加载为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。当您使用 mlflow.pyfunc.load_model()
加载具有 h2o
风格的 MLflow Models 时,会调用 h2o.init() 方法。因此,必须在加载器的环境中安装正确版本的 h2o(-py)
。您可以通过修改持久化 H2O 模型的 YAML 配置文件:model.h2o/h2o.yaml
的 init
条目来自定义传递给 h2o.init() 的参数。
最后,您可以使用 mlflow.h2o.load_model()
方法将具有 h2o
风格的 MLflow Models 作为 H2O 模型对象加载。
有关更多信息,请参阅 mlflow.h2o。
h2o pyfunc 用法
对于一个最小的 h2o 模型,以下是分类场景中 pyfunc predict() 方法的示例
import mlflow
import h2o
h2o.init()
from h2o.estimators.glm import H2OGeneralizedLinearEstimator
# import the prostate data
df = h2o.import_file(
"http://s3.amazonaws.com/h2o-public-test-data/smalldata/prostate/prostate.csv.zip"
)
# convert the columns to factors
df["CAPSULE"] = df["CAPSULE"].asfactor()
df["RACE"] = df["RACE"].asfactor()
df["DCAPS"] = df["DCAPS"].asfactor()
df["DPROS"] = df["DPROS"].asfactor()
# split the data
train, test, valid = df.split_frame(ratios=[0.7, 0.15])
# generate a GLM model
glm_classifier = H2OGeneralizedLinearEstimator(
family="binomial", lambda_=0, alpha=0.5, nfolds=5, compute_p_values=True
)
with mlflow.start_run():
glm_classifier.train(
y="CAPSULE", x=["AGE", "RACE", "VOL", "GLEASON"], training_frame=train
)
metrics = glm_classifier.model_performance()
metrics_to_track = ["MSE", "RMSE", "r2", "logloss"]
metrics_to_log = {
key: value
for key, value in metrics._metric_json.items()
if key in metrics_to_track
}
params = glm_classifier.params
mlflow.log_params(params)
mlflow.log_metrics(metrics_to_log)
model_info = mlflow.h2o.log_model(glm_classifier, artifact_path="h2o_model_info")
# load h2o model and make a prediction
h2o_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
test_df = test.as_data_frame()
predictions = h2o_pyfunc.predict(test_df)
print(predictions)
# it is also possible to load the model and predict using h2o methods on the h2o frame
# h2o_model = mlflow.h2o.load_model(model_info.model_uri)
# predictions = h2o_model.predict(test)
Keras (keras
)
keras
模型风格支持记录和加载 Keras 模型。它在 Python 和 R 客户端中都可用。在 R 中,您可以使用 mlflow_save_model
和 mlflow_log_model
保存或记录模型。这些函数使用 Keras 库的内置模型持久化函数将 Keras 模型序列化为 HDF5 文件。您可以使用 R 中的 mlflow_load_model
函数将具有 keras
风格的 MLflow Models 加载为 Keras Model 对象。
Keras pyfunc 用法
对于一个最小的 Sequential 模型,以下是 pyfunc predict() 方法的配置示例
import mlflow
import numpy as np
import pathlib
import shutil
from tensorflow import keras
mlflow.tensorflow.autolog()
X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
y = np.array([0, 0, 1, 1, 1, 0])
model = keras.Sequential(
[
keras.Input(shape=(1,)),
keras.layers.Dense(1, activation="sigmoid"),
]
)
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(X, y, batch_size=3, epochs=5, validation_split=0.2)
local_artifact_dir = "/tmp/mlflow/keras_model"
pathlib.Path(local_artifact_dir).mkdir(parents=True, exist_ok=True)
model_uri = f"runs:/{mlflow.last_active_run().info.run_id}/model"
keras_pyfunc = mlflow.pyfunc.load_model(
model_uri=model_uri, dst_path=local_artifact_dir
)
data = np.array([-4, 1, 0, 10, -2, 1]).reshape(-1, 1)
predictions = keras_pyfunc.predict(data)
shutil.rmtree(local_artifact_dir)
MLeap (mleap
)
自 MLflow 2.6.0 起,mleap
模型风格已弃用,并将在未来版本中移除。
mleap
模型风格支持使用 MLeap 持久化机制以 MLflow 格式保存 Spark 模型。MLeap 是一种推理优化的格式和执行引擎,用于不依赖于 SparkContext 评估输入的 Spark 模型。
您可以通过指定 mlflow.spark.save_model()
或 mlflow.spark.log_model()
方法的 sample_input
参数(推荐)来以 mleap
风格保存 Spark 模型为 MLflow 格式。有关更多详细信息,请参阅Spark MLlib。
mlflow.mleap 模块还定义了 save_model() 和 log_model() 方法,用于以 MLflow 格式保存 MLeap 模型,但这些方法在其生成的模型中不包含 python_function
风格。类似地,可以在 R 中使用 mlflow_save_model
保存 mleap
模型,并使用 mlflow_load_model
加载,其中 mlflow_save_model
需要将 sample_input
指定为包含模型输入数据的示例 Spark dataframe,这是 MLeap 进行数据模式推断所必需的。
mlflow/java
包中提供了用于加载具有 MLeap 风格的 MLflow Models 的配套模块。
有关更多信息,请参阅 mlflow.spark、mlflow.mleap 和 MLeap 文档。
PyTorch (pytorch
)
pytorch
模型风格支持记录和加载 PyTorch 模型。
mlflow.pytorch 模块定义了用于保存和加载具有 pytorch
风格的 MLflow Models 的实用程序。您可以使用 mlflow.pytorch.save_model()
和 mlflow.pytorch.log_model()
方法以 MLflow 格式保存 PyTorch 模型;这两个函数都使用 torch.save() 方法序列化 PyTorch 模型。此外,您可以使用 mlflow.pytorch.load_model()
方法将具有 pytorch
风格的 MLflow Models 作为 PyTorch 模型对象加载。此加载的 PyFunc 模型可以使用 DataFrame 输入和 numpy 数组输入进行评估。最后,mlflow.pytorch.save_model()
和 mlflow.pytorch.log_model()
生成的模型包含 python_function
风格,允许您通过 mlflow.pyfunc.load_model()
将它们加载为通用 Python 函数进行推理。
使用 PyTorch 风格时,如果在预测时有可用的 GPU,将使用默认 GPU 运行推理。要禁用此行为,用户可以使用 MLFLOW_DEFAULT_PREDICTION_DEVICE,或为 predict
函数传递带有 device
参数的设备。
在多 GPU 训练的情况下,请确保仅使用全局 rank 0 GPU 保存模型。这可以避免记录同一模型的多个副本。
PyTorch pyfunc 用法
对于一个最小的 PyTorch 模型,以下是 pyfunc predict() 方法的配置示例
import numpy as np
import mlflow
from mlflow.models import infer_signature
import torch
from torch import nn
net = nn.Linear(6, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)
X = torch.randn(6)
y = torch.randn(1)
epochs = 5
for epoch in range(epochs):
optimizer.zero_grad()
outputs = net(X)
loss = loss_function(outputs, y)
loss.backward()
optimizer.step()
with mlflow.start_run() as run:
signature = infer_signature(X.numpy(), net(X).detach().numpy())
model_info = mlflow.pytorch.log_model(net, "model", signature=signature)
pytorch_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
predictions = pytorch_pyfunc.predict(torch.randn(6).numpy())
print(predictions)
有关更多信息,请参阅 mlflow.pytorch。
Scikit-learn (sklearn
)
sklearn
模型风格提供了一个易于使用的接口,用于保存和加载 scikit-learn 模型。mlflow.sklearn 模块定义了 save_model()
和 log_model()
函数,这些函数使用 Python 的 pickle 模块 (Pickle) 或 CloudPickle 进行模型序列化,以 MLflow 格式保存 scikit-learn 模型。这些函数生成具有 python_function
风格的 MLflow Models,允许通过 mlflow.pyfunc.load_model()
将它们加载为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。最后,您可以使用 mlflow.sklearn.load_model()
方法将具有 sklearn
风格的 MLflow Models 作为 scikit-learn 模型对象加载。
Scikit-learn pyfunc 用法
对于一个 Scikit-learn LogisticRegression 模型,以下是 pyfunc predict() 方法的配置示例
import mlflow
from mlflow.models import infer_signature
import numpy as np
from sklearn.linear_model import LogisticRegression
with mlflow.start_run():
X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
y = np.array([0, 0, 1, 1, 1, 0])
lr = LogisticRegression()
lr.fit(X, y)
signature = infer_signature(X, lr.predict(X))
model_info = mlflow.sklearn.log_model(
sk_model=lr, artifact_path="model", signature=signature
)
sklearn_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
data = np.array([-4, 1, 0, 10, -2, 1]).reshape(-1, 1)
predictions = sklearn_pyfunc.predict(data)
有关更多信息,请参阅 mlflow.sklearn。
Spark MLlib (spark
)
spark
模型风格支持将 Spark MLlib 模型导出为 MLflow Models。
mlflow.spark()
模块定义了
save_model()
将 Spark MLlib 模型保存到 DBFS 路径。log_model()
将 Spark MLlib 模型上传到跟踪服务器。load_model()
将具有spark
风格的 MLflow Models 加载为 Spark MLlib pipelines。
这些函数生成的 MLflow Models 包含 python_function
风格,允许您通过 mlflow.pyfunc.load_model()
将它们加载为通用 Python 函数。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。当通过 mlflow.pyfunc.load_model()
将具有 spark
风格的模型加载为 Python 函数时,会为模型推理创建一个新的 SparkContext;此外,该函数在评估之前会将所有 Pandas DataFrame 输入转换为 Spark DataFrames。虽然这种初始化开销和格式转换延迟对于高性能用例来说并不理想,但它使您能够轻松地将任何 MLlib PipelineModel 部署到 MLflow 支持的任何生产环境(SageMaker、AzureML 等)。
Spark MLlib pyfunc 用法
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
import mlflow
# Prepare training data from a list of (label, features) tuples.
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
training = spark.createDataFrame(
[
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5])),
],
["label", "features"],
)
# Create and fit a LogisticRegression instance
lr = LogisticRegression(maxIter=10, regParam=0.01)
lr_model = lr.fit(training)
# Serialize the Model
with mlflow.start_run():
model_info = mlflow.spark.log_model(lr_model, "spark-model")
# Load saved model
lr_model_saved = mlflow.pyfunc.load_model(model_info.model_uri)
# Make predictions on test data.
# The DataFrame used in the predict method must be a Pandas DataFrame
test = spark.createDataFrame(
[
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5])),
],
["label", "features"],
).toPandas()
prediction = lr_model_saved.predict(test)
请注意,当向 log_model()
或 save_model()
提供 sample_input
参数时,Spark 模型会自动通过调用 mlflow.mleap.add_to_model()
保存为 mleap
风格。
例如,以下代码块
training_df = spark.createDataFrame(
[
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
],
["id", "text", "label"],
)
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.spark.log_model(model, "spark-model", sample_input=training_df)
导致以下目录结构被记录到 MLflow Experiment 中
# Directory written by with the addition of mlflow.mleap.add_to_model(model, "spark-model", training_df)
# Note the addition of the mleap directory
spark-model/
├── mleap
├── sparkml
├── MLmodel
├── conda.yaml
├── python_env.yaml
└── requirements.txt
有关更多信息,请参阅 mlflow.mleap
。
有关更多信息,请参阅 mlflow.spark。
TensorFlow (tensorflow
)
下面的简单示例展示了如何使用低级 TensorFlow API 在 mlflow 中为自定义训练循环记录参数和指标。有关 mlflow 和 tf.keras
模型的示例,请参阅tf-keras-example。
import numpy as np
import tensorflow as tf
import mlflow
x = np.linspace(-4, 4, num=512)
y = 3 * x + 10
# estimate w and b where y = w * x + b
learning_rate = 0.1
x_train = tf.Variable(x, trainable=False, dtype=tf.float32)
y_train = tf.Variable(y, trainable=False, dtype=tf.float32)
# initial values
w = tf.Variable(1.0)
b = tf.Variable(1.0)
with mlflow.start_run():
mlflow.log_param("learning_rate", learning_rate)
for i in range(1000):
with tf.GradientTape(persistent=True) as tape:
# calculate MSE = 0.5 * (y_predict - y_train)^2
y_predict = w * x_train + b
loss = 0.5 * tf.reduce_mean(tf.square(y_predict - y_train))
mlflow.log_metric("loss", value=loss.numpy(), step=i)
# Update the trainable variables
# w = w - learning_rate * gradient of loss function w.r.t. w
# b = b - learning_rate * gradient of loss function w.r.t. b
w.assign_sub(learning_rate * tape.gradient(loss, w))
b.assign_sub(learning_rate * tape.gradient(loss, b))
print(f"W = {w.numpy():.2f}, b = {b.numpy():.2f}")
ONNX (onnx
)
onnx
模型风格支持通过 mlflow.onnx.save_model()
和 mlflow.onnx.log_model()
方法以 MLflow 格式记录 ONNX 模型。这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。此加载的 PyFunc 模型可以使用 DataFrame 输入和 numpy 数组输入进行评估。最后,您可以使用 mlflow.onnx.load_model()
方法以原生 ONNX 格式加载具有 onnx
风格的 MLflow Models。
有关更多信息,请参阅 mlflow.onnx
和 https://onnx.org.cn/。
保存 ONNX 文件的默认行为是使用 ONNX 保存选项 save_as_external_data=True
,以支持超过 2GB 的模型文件。对于小型模型文件的边缘部署,这可能会产生问题。如果出于此类部署考虑需要将小型模型保存为单个文件,可以在 mlflow.onnx.save_model()
或 mlflow.onnx.log_model()
中设置参数 save_as_external_data=False
,以强制将模型序列化为小型文件。请注意,如果模型超过 2GB,保存为单个文件将不起作用。
ONNX pyfunc 用法示例
对于 ONNX 模型,一个使用 pytorch 训练一个虚拟模型、将其转换为 ONNX、记录到 mlflow 并使用 pyfunc predict() 方法进行预测的配置示例如下
import numpy as np
import mlflow
from mlflow.models import infer_signature
import onnx
import torch
from torch import nn
# define a torch model
net = nn.Linear(6, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)
X = torch.randn(6)
y = torch.randn(1)
# run model training
epochs = 5
for epoch in range(epochs):
optimizer.zero_grad()
outputs = net(X)
loss = loss_function(outputs, y)
loss.backward()
optimizer.step()
# convert model to ONNX and load it
torch.onnx.export(net, X, "model.onnx")
onnx_model = onnx.load_model("model.onnx")
# log the model into a mlflow run
with mlflow.start_run():
signature = infer_signature(X.numpy(), net(X).detach().numpy())
model_info = mlflow.onnx.log_model(onnx_model, "model", signature=signature)
# load the logged model and make a prediction
onnx_pyfunc = mlflow.pyfunc.load_model(model_info.model_uri)
predictions = onnx_pyfunc.predict(X.numpy())
print(predictions)
XGBoost (xgboost
)
xgboost
模型风格支持通过 python 中的 XGBoost 模型以及 R 中的 mlflow.xgboost.save_model()
和 mlflow.xgboost.log_model()
方法以 MLflow 格式记录 XGBoost 模型。这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。您还可以使用 mlflow.xgboost.load_model()
方法以原生 XGBoost 格式加载具有 xgboost
模型风格的 MLflow Models。
请注意,xgboost
模型风格仅支持 xgboost.Booster 的实例,而不支持实现 scikit-learn API 的模型。
XGBoost
pyfunc 用法
以下示例
- 从
scikit-learn
加载 IRIS 数据集 - 训练一个 XGBoost 分类器
- 使用
mlflow
记录模型和参数 - 加载记录的模型并进行预测
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import mlflow
from mlflow.models import infer_signature
data = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
data["data"], data["target"], test_size=0.2
)
xgb_classifier = XGBClassifier(
n_estimators=10,
max_depth=3,
learning_rate=1,
objective="binary:logistic",
random_state=123,
)
# log fitted model and XGBClassifier parameters
with mlflow.start_run():
xgb_classifier.fit(X_train, y_train)
clf_params = xgb_classifier.get_xgb_params()
mlflow.log_params(clf_params)
signature = infer_signature(X_train, xgb_classifier.predict(X_train))
model_info = mlflow.xgboost.log_model(
xgb_classifier, "iris-classifier", signature=signature
)
# Load saved model and make predictions
xgb_classifier_saved = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred = xgb_classifier_saved.predict(X_test)
有关更多信息,请参阅 mlflow.xgboost。
LightGBM (lightgbm
)
lightgbm
模型风格支持通过 mlflow.lightgbm.save_model()
和 mlflow.lightgbm.log_model()
方法以 MLflow 格式记录 LightGBM 模型。这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。您还可以使用 mlflow.lightgbm.load_model()
方法以原生 LightGBM 格式加载具有 lightgbm
模型风格的 MLflow Models。
请注意,现在支持 LightGBM 的 scikit-learn API。有关更多信息,请参阅 mlflow.lightgbm
。
LightGBM
pyfunc 用法
以下示例
- 从
scikit-learn
加载 IRIS 数据集 - 训练一个 LightGBM
LGBMClassifier
- 使用
mlflow
记录模型和特征重要性 - 加载记录的模型并进行预测
from lightgbm import LGBMClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.models import infer_signature
data = load_iris()
# Remove special characters from feature names to be able to use them as keys for mlflow metrics
feature_names = [
name.replace(" ", "_").replace("(", "").replace(")", "")
for name in data["feature_names"]
]
X_train, X_test, y_train, y_test = train_test_split(
data["data"], data["target"], test_size=0.2
)
# create model instance
lgb_classifier = LGBMClassifier(
n_estimators=10,
max_depth=3,
learning_rate=1,
objective="binary:logistic",
random_state=123,
)
# Fit and save model and LGBMClassifier feature importances as mlflow metrics
with mlflow.start_run():
lgb_classifier.fit(X_train, y_train)
feature_importances = dict(zip(feature_names, lgb_classifier.feature_importances_))
feature_importance_metrics = {
f"feature_importance_{feature_name}": imp_value
for feature_name, imp_value in feature_importances.items()
}
mlflow.log_metrics(feature_importance_metrics)
signature = infer_signature(X_train, lgb_classifier.predict(X_train))
model_info = mlflow.lightgbm.log_model(
lgb_classifier, "iris-classifier", signature=signature
)
# Load saved model and make predictions
lgb_classifier_saved = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred = lgb_classifier_saved.predict(X_test)
print(y_pred)
CatBoost (catboost
)
catboost
模型风格支持通过 mlflow.catboost.save_model()
和 mlflow.catboost.log_model()
方法以 MLflow 格式记录 CatBoost 模型。这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。您还可以使用 mlflow.catboost.load_model()
方法以原生 CatBoost 格式加载具有 catboost
模型风格的 MLflow Models。
有关更多信息,请参阅 mlflow.catboost
。
CatBoost
pyfunc 用法
对于一个 CatBoost 分类器模型,以下是 pyfunc predict() 方法的配置示例
import mlflow
from mlflow.models import infer_signature
from catboost import CatBoostClassifier
from sklearn import datasets
# prepare data
X, y = datasets.load_wine(as_frame=False, return_X_y=True)
# train the model
model = CatBoostClassifier(
iterations=5,
loss_function="MultiClass",
allow_writing_files=False,
)
model.fit(X, y)
# create model signature
predictions = model.predict(X)
signature = infer_signature(X, predictions)
# log the model into a mlflow run
with mlflow.start_run():
model_info = mlflow.catboost.log_model(model, "model", signature=signature)
# load the logged model and make a prediction
catboost_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
print(catboost_pyfunc.predict(X[:5]))
Spacy(spaCy
)
spaCy
模型风格支持通过 mlflow.spacy.save_model()
和 mlflow.spacy.log_model()
方法以 MLflow 格式记录 spaCy 模型。此外,这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。您还可以使用 mlflow.spacy.load_model()
方法以原生 spaCy 格式加载具有 spacy
模型风格的 MLflow Models。
有关更多信息,请参阅 mlflow.spacy
。
Spacy
pyfunc 用法
以下示例展示了如何训练 Spacy
TextCategorizer
模型,将模型 artifact 和指标记录到 mlflow tracking server,然后加载保存的模型进行预测。对于此示例,我们将使用 nltk
包中可用的 Polarity 2.0
数据集。该数据集包含 10000 条正面和 10000 条负面的简短影评。
首先,我们将文本和情感标签(“pos”或“neg”)从 NLTK 原生格式转换为 Spacy
的 DocBin
格式
import pandas as pd
import spacy
from nltk.corpus import movie_reviews
from spacy import Language
from spacy.tokens import DocBin
nltk.download("movie_reviews")
def get_sentences(sentiment_type: str) -> pd.DataFrame:
"""Reconstruct the sentences from the word lists for each review record for a specific ``sentiment_type``
as a pandas DataFrame with two columns: 'sentence' and 'sentiment'.
"""
file_ids = movie_reviews.fileids(sentiment_type)
sent_df = []
for file_id in file_ids:
sentence = " ".join(movie_reviews.words(file_id))
sent_df.append({"sentence": sentence, "sentiment": sentiment_type})
return pd.DataFrame(sent_df)
def convert(data_df: pd.DataFrame, target_file: str):
"""Convert a DataFrame with 'sentence' and 'sentiment' columns to a
spacy DocBin object and save it to 'target_file'.
"""
nlp = spacy.blank("en")
sentiment_labels = data_df.sentiment.unique()
spacy_doc = DocBin()
for _, row in data_df.iterrows():
sent_tokens = nlp.make_doc(row["sentence"])
# To train a Spacy TextCategorizer model, the label must be attached to the "cats" dictionary of the "Doc"
# object, e.g. {"pos": 1.0, "neg": 0.0} for a "pos" label.
for label in sentiment_labels:
sent_tokens.cats[label] = 1.0 if label == row["sentiment"] else 0.0
spacy_doc.add(sent_tokens)
spacy_doc.to_disk(target_file)
# Build a single DataFrame with both positive and negative reviews, one row per review
review_data = [get_sentences(sentiment_type) for sentiment_type in ("pos", "neg")]
review_data = pd.concat(review_data, axis=0)
# Split the DataFrame into a train and a dev set
train_df = review_data.groupby("sentiment", group_keys=False).apply(
lambda x: x.sample(frac=0.7, random_state=100)
)
dev_df = review_data.loc[review_data.index.difference(train_df.index), :]
# Save the train and dev data files to the current directory as "corpora.train" and "corpora.dev", respectively
convert(train_df, "corpora.train")
convert(dev_df, "corpora.dev")
为了设置训练作业,我们首先需要按照Spacy 文档中的描述生成一个配置文件。为简单起见,我们只在 pipeline 中使用一个 TextCategorizer
。
python -m spacy init config --pipeline textcat --lang en mlflow-textcat.cfg
将配置文件中的默认 train 和 dev 路径更改为当前目录
[paths]
- train = null
- dev = null
+ train = "."
+ dev = "."
在 Spacy
中,训练循环在 Spacy 的代码内部定义。Spacy 提供了一个“logging”扩展点,我们可以在其中使用 mlflow
。为此,
- 我们必须定义一个函数将指标 / 模型输入写入
mlfow
- 在
Spacy
的组件注册表中将其注册为 logger - 更改
Spacy
配置文件 (mlflow-textcat.cfg
) 中的默认 console logger
from typing import IO, Callable, Tuple, Dict, Any, Optional
import spacy
from spacy import Language
import mlflow
@spacy.registry.loggers("mlflow_logger.v1")
def mlflow_logger():
"""Returns a function, ``setup_logger`` that returns two functions:
* ``log_step`` is called internally by Spacy for every evaluation step. We can log the intermediate train and
validation scores to the mlflow tracking server here.
* ``finalize``: is called internally by Spacy after training is complete. We can log the model artifact to the
mlflow tracking server here.
"""
def setup_logger(
nlp: Language,
stdout: IO = sys.stdout,
stderr: IO = sys.stderr,
) -> Tuple[Callable, Callable]:
def log_step(info: Optional[Dict[str, Any]]):
if info:
step = info["step"]
score = info["score"]
metrics = {}
for pipe_name in nlp.pipe_names:
loss = info["losses"][pipe_name]
metrics[f"{pipe_name}_loss"] = loss
metrics[f"{pipe_name}_score"] = score
mlflow.log_metrics(metrics, step=step)
def finalize():
uri = mlflow.spacy.log_model(nlp, "mlflow_textcat_example")
mlflow.end_run()
return log_step, finalize
return setup_logger
查看 spacy-loggers 库以获取更完整的实现。
在 Spacy
配置文件中指向我们的 mlflow logger。对于此示例,我们将降低训练步数和评估频率
[training.logger]
- @loggers = "spacy.ConsoleLogger.v1"
- dev = null
+ @loggers = "mlflow_logger.v1"
[training]
- max_steps = 20000
- eval_frequency = 100
+ max_steps = 100
+ eval_frequency = 10
训练我们的模型
from spacy.cli.train import train as spacy_train
spacy_train("mlflow-textcat.cfg")
为了进行预测,我们加载上次运行中保存的模型
from mlflow import MlflowClient
# look up the last run info from mlflow
client = MlflowClient()
last_run = client.search_runs(experiment_ids=["0"], max_results=1)[0]
# We need to append the spacy model directory name to the artifact uri
spacy_model = mlflow.pyfunc.load_model(
f"{last_run.info.artifact_uri}/mlflow_textcat_example"
)
predictions_in = dev_df.loc[:, ["sentence"]]
predictions_out = spacy_model.predict(predictions_in).squeeze().tolist()
predicted_labels = [
"pos" if row["pos"] > row["neg"] else "neg" for row in predictions_out
]
print(dev_df.assign(predicted_sentiment=predicted_labels))
Fastai(fastai
)
fastai
模型风格支持通过 mlflow.fastai.save_model()
和 mlflow.fastai.log_model()
方法以 MLflow 格式记录 fastai Learner 模型。此外,这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。您还可以使用 mlflow.fastai.load_model()
方法以原生 fastai 格式加载具有 fastai
模型风格的 MLflow Models。
利用加载为 pyfunc 类型的 fastai
模型生成预测的接口使用 Pandas DataFrame 参数。
此示例运行 fastai 表格教程,记录实验,以 fastai
格式保存模型,并加载模型使用 fastai
data loader 获取预测结果
from fastai.data.external import URLs, untar_data
from fastai.tabular.core import Categorify, FillMissing, Normalize, TabularPandas
from fastai.tabular.data import TabularDataLoaders
from fastai.tabular.learner import tabular_learner
from fastai.data.transforms import RandomSplitter
from fastai.metrics import accuracy
from fastcore.basics import range_of
import pandas as pd
import mlflow
import mlflow.fastai
def print_auto_logged_info(r):
tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
artifacts = [
f.path for f in mlflow.MlflowClient().list_artifacts(r.info.run_id, "model")
]
print(f"run_id: {r.info.run_id}")
print(f"artifacts: {artifacts}")
print(f"params: {r.data.params}")
print(f"metrics: {r.data.metrics}")
print(f"tags: {tags}")
def main(epochs=5, learning_rate=0.01):
path = untar_data(URLs.ADULT_SAMPLE)
path.ls()
df = pd.read_csv(path / "adult.csv")
dls = TabularDataLoaders.from_csv(
path / "adult.csv",
path=path,
y_names="salary",
cat_names=[
"workclass",
"education",
"marital-status",
"occupation",
"relationship",
"race",
],
cont_names=["age", "fnlwgt", "education-num"],
procs=[Categorify, FillMissing, Normalize],
)
splits = RandomSplitter(valid_pct=0.2)(range_of(df))
to = TabularPandas(
df,
procs=[Categorify, FillMissing, Normalize],
cat_names=[
"workclass",
"education",
"marital-status",
"occupation",
"relationship",
"race",
],
cont_names=["age", "fnlwgt", "education-num"],
y_names="salary",
splits=splits,
)
dls = to.dataloaders(bs=64)
model = tabular_learner(dls, metrics=accuracy)
mlflow.fastai.autolog()
with mlflow.start_run() as run:
model.fit(5, 0.01)
mlflow.fastai.log_model(model, "model")
print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))
model_uri = f"runs:/{run.info.run_id}/model"
loaded_model = mlflow.fastai.load_model(model_uri)
test_df = df.copy()
test_df.drop(["salary"], axis=1, inplace=True)
dl = learn.dls.test_dl(test_df)
predictions, _ = loaded_model.get_preds(dl=dl)
px = pd.DataFrame(predictions).astype("float")
px.head(5)
main()
输出 (Pandas DataFrame
)
索引 | 第一类的概率 | 第二类的概率 |
---|---|---|
0 | 0.545088 | 0.454912 |
1 | 0.503172 | 0.496828 |
2 | 0.962663 | 0.037337 |
3 | 0.206107 | 0.793893 |
4 | 0.807599 | 0.192401 |
或者,当使用 python_function
风格时,从 DataFrame 获取预测结果。
from fastai.data.external import URLs, untar_data
from fastai.tabular.core import Categorify, FillMissing, Normalize, TabularPandas
from fastai.tabular.data import TabularDataLoaders
from fastai.tabular.learner import tabular_learner
from fastai.data.transforms import RandomSplitter
from fastai.metrics import accuracy
from fastcore.basics import range_of
import pandas as pd
import mlflow
import mlflow.fastai
model_uri = ...
path = untar_data(URLs.ADULT_SAMPLE)
df = pd.read_csv(path / "adult.csv")
test_df = df.copy()
test_df.drop(["salary"], axis=1, inplace=True)
loaded_model = mlflow.pyfunc.load_model(model_uri)
loaded_model.predict(test_df)
输出 (Pandas DataFrame
)
索引 | 第一类的概率,第二类的概率 |
---|---|
0 | [0.5450878, 0.45491222] |
1 | [0.50317234, 0.49682766] |
2 | [0.9626626, 0.037337445] |
3 | [0.20610662, 0.7938934] |
4 | [0.8075987, 0.19240129] |
有关更多信息,请参阅 mlflow.fastai
。
Statsmodels (statsmodels
)
statsmodels
模型风格支持通过 Statsmodels 模型和 mlflow.statsmodels.save_model()
和 mlflow.statsmodels.log_model()
方法以 MLflow 格式记录 Statsmodels 模型。这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。您还可以使用 mlflow.statsmodels.load_model()
方法以原生 statsmodels 格式加载具有 statsmodels
模型风格的 MLflow Models。
目前,自动记录仅限于对 statsmodels
模型调用 fit
生成的参数、指标和模型。
Statsmodels pyfunc 用法
以下两个示例说明了基本回归模型 (OLS) 和来自 statsmodels apis:statsmodels.formula.api 和 statsmodels.tsa.api 的 ARIMA 时间序列模型的用法
对于一个最小的 statsmodels 回归模型,以下是 pyfunc predict() 方法的示例
import mlflow
import pandas as pd
from sklearn.datasets import load_diabetes
import statsmodels.formula.api as smf
# load the diabetes dataset from sklearn
diabetes = load_diabetes()
# create X and y dataframes for the features and target
X = pd.DataFrame(data=diabetes.data, columns=diabetes.feature_names)
y = pd.DataFrame(data=diabetes.target, columns=["target"])
# concatenate X and y dataframes
df = pd.concat([X, y], axis=1)
# create the linear regression model (ordinary least squares)
model = smf.ols(
formula="target ~ age + sex + bmi + bp + s1 + s2 + s3 + s4 + s5 + s6", data=df
)
mlflow.statsmodels.autolog(
log_models=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None,
)
with mlflow.start_run():
res = model.fit(method="pinv", use_t=True)
model_info = mlflow.statsmodels.log_model(res, artifact_path="OLS_model")
# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# generate predictions
predictions = statsmodels_pyfunc.predict(X)
print(predictions)
对于一个最小的时间序列 ARIMA 模型,以下是 pyfunc predict() 方法的示例
import mlflow
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
# create a time series dataset with seasonality
np.random.seed(0)
# generate a time index with a daily frequency
dates = pd.date_range(start="2022-12-01", end="2023-12-01", freq="D")
# generate the seasonal component (weekly)
seasonality = np.sin(np.arange(len(dates)) * (2 * np.pi / 365.25) * 7)
# generate the trend component
trend = np.linspace(-5, 5, len(dates)) + 2 * np.sin(
np.arange(len(dates)) * (2 * np.pi / 365.25) * 0.1
)
# generate the residual component
residuals = np.random.normal(0, 1, len(dates))
# generate the final time series by adding the components
time_series = seasonality + trend + residuals
# create a dataframe from the time series
data = pd.DataFrame({"date": dates, "value": time_series})
data.set_index("date", inplace=True)
order = (1, 0, 0)
# create the ARIMA model
model = ARIMA(data, order=order)
mlflow.statsmodels.autolog(
log_models=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None,
)
with mlflow.start_run():
res = model.fit()
mlflow.log_params(
{
"order": order,
"trend": model.trend,
"seasonal_order": model.seasonal_order,
}
)
mlflow.log_params(res.params)
mlflow.log_metric("aic", res.aic)
mlflow.log_metric("bic", res.bic)
model_info = mlflow.statsmodels.log_model(res, artifact_path="ARIMA_model")
# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# prediction dataframes for a TimeSeriesModel must have exactly one row and include columns called start and end
start = pd.to_datetime("2024-01-01")
end = pd.to_datetime("2024-01-07")
# generate predictions
prediction_data = pd.DataFrame({"start": start, "end": end}, index=[0])
predictions = statsmodels_pyfunc.predict(prediction_data)
print(predictions)
有关更多信息,请参阅 mlflow.statsmodels
。
Prophet (prophet
)
prophet
模型风格支持通过 mlflow.prophet.save_model()
和 mlflow.prophet.log_model()
方法以 MLflow 格式记录 Prophet 模型。这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。您还可以使用 mlflow.prophet.load_model()
方法以原生 prophet 格式加载具有 prophet
模型风格的 MLflow Models。
Prophet pyfunc 用法
此示例使用来自 Prophet 的 GitHub 仓库的时间序列数据集,其中包含 Peyton Manning 的 Wikipedia 页面的每日访问量日志,持续数年。数据集示例如下
ds | y |
---|---|
2007-12-10 | 9.59076113897809 |
2007-12-11 | 8.51959031601596 |
2007-12-12 | 8.18367658262066 |
2007-12-13 | 8.07246736935477 |
import numpy as np
import pandas as pd
from prophet import Prophet, serialize
from prophet.diagnostics import cross_validation, performance_metrics
import mlflow
from mlflow.models import infer_signature
# URL to the dataset
SOURCE_DATA = "https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv"
np.random.seed(12345)
def extract_params(pr_model):
params = {attr: getattr(pr_model, attr) for attr in serialize.SIMPLE_ATTRIBUTES}
return {k: v for k, v in params.items() if isinstance(v, (int, float, str, bool))}
# Load the training data
train_df = pd.read_csv(SOURCE_DATA)
# Create a "test" DataFrame with the "ds" column containing 10 days after the end date in train_df
test_dates = pd.date_range(start="2016-01-21", end="2016-01-31", freq="D")
test_df = pd.DataFrame({"ds": test_dates})
# Initialize Prophet model with specific parameters
prophet_model = Prophet(changepoint_prior_scale=0.5, uncertainty_samples=7)
with mlflow.start_run():
# Fit the model on the training data
prophet_model.fit(train_df)
# Extract and log model parameters
params = extract_params(prophet_model)
mlflow.log_params(params)
# Perform cross-validation
cv_results = cross_validation(
prophet_model,
initial="900 days",
period="30 days",
horizon="30 days",
parallel="threads",
disable_tqdm=True,
)
# Calculate and log performance metrics
cv_metrics = performance_metrics(cv_results, metrics=["mse", "rmse", "mape"])
average_metrics = cv_metrics.drop(columns=["horizon"]).mean(axis=0).to_dict()
mlflow.log_metrics(average_metrics)
# Generate predictions and infer model signature
train = prophet_model.history
# Log the Prophet model with MLflow
model_info = mlflow.prophet.log_model(
prophet_model,
artifact_path="prophet_model",
input_example=train[["ds"]].head(10),
)
# Load the saved model as a pyfunc
prophet_model_saved = mlflow.pyfunc.load_model(model_info.model_uri)
# Generate predictions for the test set
predictions = prophet_model_saved.predict(test_df)
# Truncate and display the forecast if needed
forecast = predictions[["ds", "yhat"]]
print(f"forecast:\n{forecast.head(5)}")
输出 (Pandas DataFrame
)
索引 | ds | yhat | yhat_upper | yhat_lower |
---|---|---|---|---|
0 | 2016-01-21 | 8.526513 | 8.827397 | 8.328563 |
1 | 2016-01-22 | 8.541355 | 9.434994 | 8.112758 |
2 | 2016-01-23 | 8.308332 | 8.633746 | 8.201323 |
3 | 2016-01-24 | 8.676326 | 9.534593 | 8.020874 |
4 | 2016-01-25 | 8.983457 | 9.430136 | 8.121798 |
有关更多信息,请参阅 mlflow.prophet
。
Pmdarima (pmdarima
)
pmdarima
模型风格支持通过 mlflow.pmdarima.save_model()
和 mlflow.pmdarima.log_model()
方法以 MLflow 格式记录 pmdarima 模型。这些方法还向它们生成的 MLflow Models 添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为通用 Python 函数进行推理。此加载的 PyFunc 模型只能使用 DataFrame 输入进行评估。您还可以使用 mlflow.pmdarima.load_model()
方法以原生 pmdarima 格式加载具有 pmdarima
模型风格的 MLflow Models。
利用加载为 pyfunc
类型的 pmdarima
模型生成预测的接口使用一个单行 Pandas DataFrame
配置参数。此配置 Pandas DataFrame
中支持以下列
n_periods
(必需) - 指定从训练数据集的最后一个日期时间值开始生成的未来期数,利用训练模型时输入训练序列的频率(例如,如果训练数据序列元素每小时表示一个值,为了预测未来 3 天的数据,将列n_periods
设置为72
)X
(可选) - 外部回归变量值(仅 pmdarima 版本 >= 1.8.0 支持)用于未来时间段事件的 2D 值数组。有关更多信息,请阅读底层库的解释。return_conf_int
(可选) - 一个布尔值(默认值:False
),表示是否返回置信区间值。参见上面的注意。alpha
(可选) - 计算置信区间的显著性值。(默认值:0.05
)
下面展示了 pmdarima
模型的 pyfunc
predict 的配置示例,未来预测期数为 100,生成置信区间计算,没有外部回归变量元素,默认 alpha 为 0.05
索引 | n-periods | return_conf_int |
---|---|---|
0 | 100 | True |
传递给 pmdarima
pyfunc
风格的 Pandas DataFrame
必须只包含 1 行。
预测 pmdarima
风格时,predict
方法的 DataFrame
配置列 return_conf_int
的值控制输出格式。当该列的值设置为 False
或 None
(如果在配置 DataFrame
中未提供此列时的默认值)时,返回的 Pandas DataFrame
的模式是单列:["yhat"]
。当设置为 True
时,返回的 DataFrame
的模式是:["yhat", "yhat_lower", "yhat_upper"]
,其中相应的下(yhat_lower
)和上(yhat_upper
)置信区间添加到预测结果(yhat
)中。
将加载为 pyfunc 的 pmdarima artifact 与计算的置信区间一起使用的示例
import pmdarima
import mlflow
import pandas as pd
data = pmdarima.datasets.load_airpassengers()
with mlflow.start_run():
model = pmdarima.auto_arima(data, seasonal=True)
mlflow.pmdarima.save_model(model, "/tmp/model.pmd")
loaded_pyfunc = mlflow.pyfunc.load_model("/tmp/model.pmd")
prediction_conf = pd.DataFrame(
[{"n_periods": 4, "return_conf_int": True, "alpha": 0.1}]
)
predictions = loaded_pyfunc.predict(prediction_conf)
输出 (Pandas DataFrame
)
索引 | yhat | yhat_lower | yhat_upper |
---|---|---|---|
0 | 467.573731 | 423.30995 | 511.83751 |
1 | 490.494467 | 416.17449 | 564.81444 |
2 | 509.138684 | 420.56255 | 597.71117 |
3 | 492.554714 | 397.30634 | 587.80309 |
如果从非 pyfunc
工件将 return_conf_int
设置为 True
,则 pmdarima
的签名日志功能将无法正常工作。原生 ARIMA.predict()
在返回置信区间时的输出不是可识别的签名类型。
OpenAI (openai
)(实验性)
使用 openai
风格的完整指南,包括教程和详细文档,可在此处查看。
LangChain (langchain
)(实验性)
使用 langchain 风格的完整指南可在此处查看,包括教程和详细文档。
John Snow Labs (johnsnowlabs
)(实验性)
johnsnowlabs
风格正在积极开发中,并被标记为实验性。公共 API 可能会发生变化,随着更多功能添加到该风格,新功能可能会有所增加。
johnsnowlabs
模型风格可让您访问适用于医疗、金融、法律和更多领域的 200 多种语言的 20,000 多个最先进的企业 NLP 模型。
您可以使用 mlflow.johnsnowlabs.log_model()
将模型记录并导出为
这使得您可以将 任何 John Snow Labs 模型 集成到 MLflow 框架中。您可以轻松部署模型进行推理,使用 MLflow 的服务功能。模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。您还可以使用 mlflow.johnsnowlabs.load_model()
函数从已存储的工件加载保存或记录的具有 johnsnowlabs
风格的 MLflow 模型。
功能包括:LLM、文本摘要、问答、命名实体识别、关系提取、情感分析、拼写检查、图像分类、自动语音识别等,由最新的 Transformer 架构提供支持。这些模型由 John Snow Labs 提供,需要 John Snow Labs 企业 NLP 许可证。您可以联系我们以获取研究或行业许可证。
示例:将 John Snow Labs 导出为 MLflow 格式
import json
import os
import pandas as pd
from johnsnowlabs import nlp
import mlflow
from mlflow.pyfunc import spark_udf
# 1) Write your raw license.json string into the 'JOHNSNOWLABS_LICENSE_JSON' env variable for MLflow
creds = {
"AWS_ACCESS_KEY_ID": "...",
"AWS_SECRET_ACCESS_KEY": "...",
"SPARK_NLP_LICENSE": "...",
"SECRET": "...",
}
os.environ["JOHNSNOWLABS_LICENSE_JSON"] = json.dumps(creds)
# 2) Install enterprise libraries
nlp.install()
# 3) Start a Spark session with enterprise libraries
spark = nlp.start()
# 4) Load a model and test it
nlu_model = "en.classify.bert_sequence.covid_sentiment"
model_save_path = "my_model"
johnsnowlabs_model = nlp.load(nlu_model)
johnsnowlabs_model.predict(["I hate COVID,", "I love COVID"])
# 5) Export model with pyfunc and johnsnowlabs flavors
with mlflow.start_run():
model_info = mlflow.johnsnowlabs.log_model(johnsnowlabs_model, model_save_path)
# 6) Load model with johnsnowlabs flavor
mlflow.johnsnowlabs.load_model(model_info.model_uri)
# 7) Load model with pyfunc flavor
mlflow.pyfunc.load_model(model_save_path)
pandas_df = pd.DataFrame({"text": ["Hello World"]})
spark_df = spark.createDataFrame(pandas_df).coalesce(1)
pyfunc_udf = spark_udf(
spark=spark,
model_uri=model_save_path,
env_manager="virtualenv",
result_type="string",
)
new_df = spark_df.withColumn("prediction", pyfunc_udf(*pandas_df.columns))
# 9) You can now use the mlflow models serve command to serve the model see next section
# 10) You can also use x command to deploy model inside of a container see next section
将 John Snow Labs 模型部署为容器
- 启动 Docker 容器
docker run -p 5001:8080 -e JOHNSNOWLABS_LICENSE_JSON=your_json_string "mlflow-pyfunc"
- 查询服务器
curl http://127.0.0.1:5001/invocations -H 'Content-Type: application/json' -d '{
"dataframe_split": {
"columns": ["text"],
"data": [["I hate covid"], ["I love covid"]]
}
}'
在不使用容器的情况下部署 John Snow Labs 模型
- 导出环境变量并启动服务器
export JOHNSNOWLABS_LICENSE_JSON=your_json_string
mlflow models serve -m <model_uri>
- 查询服务器
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
"dataframe_split": {
"columns": ["text"],
"data": [["I hate covid"], ["I love covid"]]
}
}'
Diviner (diviner
)
diviner
模型风格支持通过 mlflow.diviner.save_model()
和 mlflow.diviner.log_model()
方法以 MLflow 格式记录 diviner 模型。这些方法还会为它们生成的 MLflow 模型添加 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能通过 DataFrame 输入进行评分。您还可以使用 mlflow.diviner.load_model()
方法以原生 diviner 格式加载具有 diviner
模型风格的 MLflow 模型。
Diviner 类型
Diviner 是一个库,提供了一个编排框架,用于对相关的系列组执行时间序列预测。diviner
中的预测通过包装流行的开源库实现,例如 prophet 和 pmdarima。diviner
库提供了一组简化的 API,可以使用单个输入 DataFrame 和统一的高级 API 为多个数据分组同时生成不同的时间序列预测。
Diviner 的指标和参数记录
与其他 MLflow 支持的风格不同,Diviner 有分组模型的概念。作为许多(可能数千个)独立预测模型的集合,日志记录服务器记录每个模型的个体指标和参数负担很大。因此,Diviner 的 API 将指标和参数公开为 Pandas
DataFrames
,而不是离散的原始值。
举例说明,假设我们正在预测世界各地主要城市的每小时用电量。我们的输入数据样本如下所示
country | city | datetime | watts |
---|---|---|---|
US | NewYork | 2022-03-01 00:01:00 | 23568.9 |
US | NewYork | 2022-03-01 00:02:00 | 22331.7 |
US | Boston | 2022-03-01 00:01:00 | 14220.1 |
US | Boston | 2022-03-01 00:02:00 | 14183.4 |
CA | Toronto | 2022-03-01 00:01:00 | 18562.2 |
CA | Toronto | 2022-03-01 00:02:00 | 17681.6 |
MX | MexicoCity | 2022-03-01 00:01:00 | 19946.8 |
MX | MexicoCity | 2022-03-01 00:02:00 | 19444.0 |
如果我们根据此数据进行 fit
,并提供分组键为
grouping_keys = ["country", "city"]
对于提供的每个分组键,都会生成一个模型
[("US", "NewYork"), ("US", "Boston"), ("CA", "Toronto"), ("MX", "MexicoCity")]
对于构建的每个模型,将其各自的指标和参数输入到 MLflow 跟踪服务器不会成为问题。然而,如果我们对地球上的每个主要城市建模并每天运行此预测场景,则会成为一个问题。如果我们遵守世界银行的条件,截至 2022 年,这意味着超过 10,000 个模型。仅运行此预测几周,我们就会得到一个非常大的指标表。
为了解决大规模预测的这一问题,diviner
的指标和参数被提取为按分组键索引的 Pandas DataFrame
,如下所示(浮点值因可见性而截断)
grouping_key_columns | country | city | mse | rmse | mae | mape | mdape | smape |
---|---|---|---|---|---|---|---|---|
("country", "city") | CA | Toronto | 8276851.6 | 2801.7 | 2417.7 | 0.16 | 0.16 | 0.159 |
("country", "city") | MX | MexicoCity | 3548872.4 | 1833.8 | 1584.5 | 0.15 | 0.16 | 0.159 |
("country", "city") | US | NewYork | 3167846.4 | 1732.4 | 1498.2 | 0.15 | 0.16 | 0.158 |
("country", "city") | US | Boston | 14082666.4 | 3653.2 | 3156.2 | 0.15 | 0.16 | 0.159 |
有两种推荐的方法来记录 diviner
模型的指标和参数
- 将 DataFrames 写入本地存储并使用
mlflow.log_artifacts()
import os
import mlflow
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
params = model.extract_model_params()
metrics = model.cross_validate_and_score(
horizon="72 hours",
period="240 hours",
initial="480 hours",
parallel="threads",
rolling_window=0.1,
monthly=False,
)
params.to_csv(f"{tmpdir}/params.csv", index=False, header=True)
metrics.to_csv(f"{tmpdir}/metrics.csv", index=False, header=True)
mlflow.log_artifacts(tmpdir, artifact_path="data")
- 使用
mlflow.log_dict()
直接写入为 JSON 工件
如果使用 pd.DataFrame.to_dict()
方法,从 diviner
模型提取的参数可能需要类型转换(或删除列),因为此方法无法序列化对象。
import mlflow
params = model.extract_model_params()
metrics = model.cross_validate_and_score(
horizon="72 hours",
period="240 hours",
initial="480 hours",
parallel="threads",
rolling_window=0.1,
monthly=False,
)
params["t_scale"] = params["t_scale"].astype(str)
params["start"] = params["start"].astype(str)
params = params.drop("stan_backend", axis=1)
mlflow.log_dict(params.to_dict(), "params.json")
mlflow.log_dict(metrics.to_dict(), "metrics.json")
模型工件的记录如下面的 pyfunc
示例所示。
Diviner pyfunc 用法
MLflow Diviner 风格包含 Diviner 模型的 pyfunc
接口实现。要控制预测行为,您可以在 Pandas DataFrame 输入的第一行中指定配置参数。
由于此配置取决于底层模型类型(即 diviner.GroupedProphet.forecast()
方法的签名与 diviner.GroupedPmdarima.predict()
不同),Diviner pyfunc 实现会尝试将参数强制转换为底层模型期望的类型。
Diviner 模型支持“全组”和“部分组”预测。如果配置 DataFrame
中存在名为 "groups"
的列,且该 DataFrame 提交给 pyfunc
风格,则第一行的分组键值将用于生成预测的子集。如果只需要少数(或一个)组的结果,此功能消除了从所有组预测的完整输出中过滤子集的需要。
对于 GroupedPmdarima
模型,pyfunc
predict()
方法的一个示例配置是
import mlflow
import pandas as pd
from pmdarima.arima.auto import AutoARIMA
from diviner import GroupedPmdarima
with mlflow.start_run():
base_model = AutoARIMA(out_of_sample_size=96, maxiter=200)
model = GroupedPmdarima(model_template=base_model).fit(
df=df,
group_key_columns=["country", "city"],
y_col="watts",
datetime_col="datetime",
silence_warnings=True,
)
mlflow.diviner.save_model(diviner_model=model, path="/tmp/diviner_model")
diviner_pyfunc = mlflow.pyfunc.load_model(model_uri="/tmp/diviner_model")
predict_conf = pd.DataFrame(
{
"n_periods": 120,
"groups": [
("US", "NewYork"),
("CA", "Toronto"),
("MX", "MexicoCity"),
], # NB: List of tuples required.
"predict_col": "wattage_forecast",
"alpha": 0.1,
"return_conf_int": True,
"on_error": "warn",
},
index=[0],
)
subset_forecasts = diviner_pyfunc.predict(predict_conf)
在几种情况下,提交给 pyfunc
predict()
方法的配置 DataFrame
会导致引发 MlflowException
- 如果未提供
horizon
或n_periods
。 n_periods
或horizon
的值不是整数。- 如果模型类型为
GroupedProphet
,则必须提供字符串类型的frequency
。 - 如果同时提供了
horizon
和n_periods
且值不同。
Transformers (transformers
)(实验性)
使用 transformers
风格的完整指南,包括教程和详细文档,可在此处查看。
SentenceTransformers (sentence_transformers
)(实验性)
sentence_transformers
风格正在积极开发中,并被标记为实验性。公共 API 可能会发生变化,随着更多功能添加到该风格,新功能可能会有所增加。
sentence_transformers
模型风格通过 mlflow.sentence_transformers.save_model()
和 mlflow.sentence_transformers.log_model()
函数,支持以 MLflow 格式记录 sentence-transformers 模型。使用这些函数还会为它们生成的 MLflow 模型添加 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。您还可以使用 mlflow.sentence_transformers.load_model()
函数以原生 sentence-transformers
模型加载保存或记录的具有 sentence_transformers
风格的 MLflow 模型。
示例
from sentence_transformers import SentenceTransformer
import mlflow
import mlflow.sentence_transformers
model = SentenceTransformer("all-MiniLM-L6-v2")
example_sentences = ["This is a sentence.", "This is another sentence."]
# Define the signature
signature = mlflow.models.infer_signature(
model_input=example_sentences,
model_output=model.encode(example_sentences),
)
# Log the model using mlflow
with mlflow.start_run():
logged_model = mlflow.sentence_transformers.log_model(
model=model,
artifact_path="sbert_model",
signature=signature,
input_example=example_sentences,
)
# Load option 1: mlflow.pyfunc.load_model returns a PyFuncModel
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
embeddings1 = loaded_model.predict(["hello world", "i am mlflow"])
# Load option 2: mlflow.sentence_transformers.load_model returns a SentenceTransformer
loaded_model = mlflow.sentence_transformers.load_model(logged_model.model_uri)
embeddings2 = loaded_model.encode(["hello world", "i am mlflow"])
print(embeddings1)
"""
>> [[-3.44772562e-02 3.10232025e-02 6.73496164e-03 2.61089969e-02
...
2.37922110e-02 -2.28897743e-02 3.89375277e-02 3.02067865e-02]
[ 4.81191138e-03 -9.33756605e-02 6.95968643e-02 8.09735525e-03
...
6.57437667e-02 -2.72239652e-02 4.02687863e-02 -1.05599344e-01]]
"""
Promptflow (promptflow
)(实验性)
promptflow
风格正在积极开发中,并被标记为实验性。公共 API 可能会发生变化,随着更多功能添加到该风格,新功能可能会有所增加。
promptflow
模型风格能够通过 mlflow.promptflow.save_model()
和 mlflow.promptflow.log_model()
函数将您的流程打包为 MLflow 格式。目前,要求流程目录中存在 flow.dag.yaml
文件。这些函数还会为 MLflow 模型添加 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。您还可以使用 mlflow.promptflow.load_model()
方法以原生 promptflow 格式加载具有 promptflow
模型风格的 MLflow 模型。
请注意,MLmodel
文件中的 signature
将不会自动从流程本身推断。要使用签名保存模型,您可以传递 input_example
或手动指定输入签名。
示例
通过 MLflow GitHub 仓库中的示例 获取流程源。
import os
from pathlib import Path
from promptflow import load_flow
import mlflow
assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."
# The example flow will write a simple code snippet that displays the greeting message with specific language.
flow_folder = Path(__file__).parent / "basic"
flow = load_flow(flow_folder)
with mlflow.start_run():
logged_model = mlflow.promptflow.log_model(flow, artifact_path="promptflow_model")
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict({"text": "Python Hello World!"}))
模型评估
构建和训练 MLflow 模型后,您可以使用 mlflow.evaluate()
API 在您选择的一个或多个数据集上评估其性能。mlflow.evaluate()
目前支持对具有 python_function (pyfunc) 模型风格 的 MLflow 模型进行分类、回归和多种语言建模任务的评估(参见使用 LLM 进行评估),计算各种任务特定性能指标、模型性能图表和模型解释。评估结果将记录到 MLflow Tracking 中。
以下 MLflow GitHub 仓库中的示例 使用 mlflow.evaluate()
评估分类器在 UCI Adult Data Set 上的性能,记录了全面的 MLflow Metrics 和 Artifacts 集合,提供了对模型性能和行为的深入了解
import xgboost
import shap
import mlflow
from mlflow.models import infer_signature
from sklearn.model_selection import train_test_split
# Load the UCI Adult Dataset
X, y = shap.datasets.adult()
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train)
# Create a model signature
signature = infer_signature(X_test, model.predict(X_test))
# Build the Evaluation Dataset from the test set
eval_data = X_test
eval_data["label"] = y_test
with mlflow.start_run() as run:
# Log the baseline model to MLflow
mlflow.sklearn.log_model(model, "model", signature=signature)
model_uri = mlflow.get_artifact_uri("model")
# Evaluate the logged model
result = mlflow.evaluate(
model_uri,
eval_data,
targets="label",
model_type="classifier",
evaluators=["default"],
)


使用 LLM 进行评估
从 MLflow 2.4.0 开始,mlflow.evaluate()
内置支持各种 LLM 任务,包括文本摘要、文本分类、问答和文本生成。以下示例使用 mlflow.evaluate()
评估一个回答有关 MLflow 问题(注意:您必须在当前系统环境中设置 OPENAI_API_TOKEN
环境变量才能运行此示例)的模型
import os
import pandas as pd
import mlflow
import openai
# Create a question answering model using prompt engineering with OpenAI. Log the
# prompt and the model to MLflow Tracking
mlflow.start_run()
system_prompt = (
"Your job is to answer questions about MLflow. When you are asked a question about MLflow,"
" respond to it. Make sure to include code examples. If the question is not related to"
" MLflow, refuse to answer and say that the question is unrelated."
)
mlflow.log_param("system_prompt", system_prompt)
logged_model = mlflow.openai.log_model(
model="gpt-4o-mini",
task=openai.chat.completions,
artifact_path="model",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "{question}"},
],
)
# Evaluate the model on some example questions
questions = pd.DataFrame(
{
"question": [
"How do you create a run with MLflow?",
"How do you log a model with MLflow?",
"What is the capital of France?",
]
}
)
mlflow.evaluate(
model=logged_model.model_uri,
model_type="question-answering",
data=questions,
)
# Load and inspect the evaluation results
results: pd.DataFrame = mlflow.load_table(
"eval_results_table.json", extra_columns=["run_id", "params.system_prompt"]
)
print("Evaluation results:")
print(results)
MLflow 还提供了一个 Artifact View UI,用于比较使用 LLM 构建的多个模型的输入和输出。例如,在评估多个问答提示(参见 MLflow OpenAI 问答完整示例)后,您可以导航到 Artifact View 查看问题并比较每个模型的答案
有关演示如何将 mlflow.evaluate()
与 LLM 结合使用的其他示例,请查看 MLflow LLM 示例仓库。
使用额外指标进行评估
如果默认指标集不够充分,您可以向 mlflow.evaluate()
提供 extra_metrics
和 custom_artifacts
,以生成正在评估的模型(或多个模型)的额外指标和工件。
要定义额外指标,您应该定义一个 eval_fn
函数,该函数接受 predictions
和 targets
作为参数,并输出一个 MetricValue
对象。predictions
和 targets
是 pandas.Series
对象。如果在 mlflow.evaluate()
中指定的 predictions
或 targets
是 numpy.array
或 List
,它们将被转换为 pandas.Series
。
要使用其他指标的值来计算自定义指标,请将指标名称包含为 eval_fn
的参数。此参数将包含一个 MetricValue
对象,其中包含从指定指标计算的值,可用于计算自定义指标。
{
"accuracy_score": MetricValue(
scores=None, justifications=None, aggregate_results={"accuracy_score": 1.0}
)
}
MetricValue
类有三个属性
scores
: 一个列表,包含每行的指标。aggregate_results
: 一个字典,将聚合方法名称映射到相应的聚合值。这旨在用于聚合scores
。justifications
: 一个列表,包含scores
中值的逐行解释。这是可选的,通常与 genai 指标一起使用。
下面的代码块演示了如何定义自定义指标评估函数
from mlflow.metrics import MetricValue
def my_metric_eval_fn(predictions, targets):
scores = np.abs(predictions - targets)
return MetricValue(
scores=list(scores),
aggregate_results={
"mean": np.mean(scores),
"variance": np.var(scores),
"median": np.median(scores),
},
)
定义 eval_fn
后,然后使用 make_metric()
将此 eval_fn
函数封装为指标。除了 eval_fn
,make_metric()
还需要一个额外的参数 greater_is_better
,用于优化目的。此参数指示这是一个我们想要最大化还是最小化的指标。
from mlflow.metrics import make_metric
mymetric = make_metric(eval_fn=my_metric_eval_fn, greater_is_better=False)
额外指标允许您直接评估模型,或评估输出 dataframe。
要直接评估模型,您必须向 mlflow.evaluate()
提供 pyfunc 模型实例、指向 pyfunc 模型的 URI 或一个可调用函数,该函数将数据作为输入并输出预测。
def model(x):
return x["inputs"]
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
mlflow.evaluate(model, eval_dataset, targets="targets", extra_metrics=[mymetric])
要直接评估输出 dataframe,您可以省略 model
参数。但是,您需要设置 mlflow.evaluate()
中的 predictions
参数以评估推理输出 dataframe。
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"predictions": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
result = mlflow.evaluate(
data=eval_dataset,
predictions="predictions",
targets="targets",
extra_metrics=[mymetric],
)
当模型有多个输出时,模型必须返回一个具有多个列的 pandas DataFrame。您必须使用 predictions
参数指定模型输出列中的一列作为预测列,模型的其他输出列将可通过 eval_fn
根据其列名进行访问,通过 col_mapping
进行映射。例如,如果模型有两个输出 retrieved_context
和 answer
,您可以指定 answer
作为预测列,并且 retrieved_context
列将可通过 eval_fn
作为 context
参数访问,通过 col_mapping
进行映射
def eval_fn(predictions, targets, context):
scores = (predictions == targets) + context
return MetricValue(
scores=list(scores),
aggregate_results={"mean": np.mean(scores), "sum": np.sum(scores)},
)
mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False, name="mymetric")
def model(x):
return pd.DataFrame({"retrieved_context": x["inputs"] + 1, "answer": x["inputs"]})
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
config = {"col_mapping": {"context": "retrieved_context"}}
result = mlflow.evaluate(
model,
eval_dataset,
predictions="answer",
targets="targets",
extra_metrics=[mymetric],
evaluator_config=config,
)
但是,如果 eval_fn
的参数与模型的输出列名相同,您也可以避免使用 col_mapping
。
def eval_fn(predictions, targets, retrieved_context):
scores = (predictions == targets) + retrieved_context
return MetricValue(
scores=list(scores),
aggregate_results={"mean": np.mean(scores), "sum": np.sum(scores)},
)
mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False, name="mymetric")
def model(x):
return pd.DataFrame({"retrieved_context": x["inputs"] + 1, "answer": x["inputs"]})
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
result = mlflow.evaluate(
model,
eval_dataset,
predictions="answer",
targets="targets",
extra_metrics=[mymetric],
)
col_mapping
还允许您向额外指标函数传递附加参数,在此示例中传递值 k
。
def eval_fn(predictions, targets, k):
scores = k * (predictions == targets)
return MetricValue(scores=list(scores), aggregate_results={"mean": np.mean(scores)})
weighted_mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False)
def model(x):
return x["inputs"]
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
config = {"col_mapping": {"k": 5}}
mlflow.evaluate(
model,
eval_dataset,
targets="targets",
extra_metrics=[weighted_mymetric],
evaluator_config=config,
)
您还可以将其他指标的名称添加为额外指标函数的参数,该参数将传递该指标计算的 MetricValue
。
def eval_fn(predictions, targets, retrieved_context):
scores = (predictions == targets) + retrieved_context
return MetricValue(
scores=list(scores),
aggregate_results={"mean": np.mean(scores), "sum": np.sum(scores)},
)
mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False, name="mymetric")
def eval_fn_2(predictions, targets, mymetric):
scores = ["true" if score else "false" for score in mymetric.scores]
return MetricValue(
scores=list(scores),
)
mymetric2 = make_metric(eval_fn=eval_fn_2, greater_is_better=False, name="mymetric2")
def model(x):
return pd.DataFrame({"retrieved_context": x["inputs"] + 1, "answer": x["inputs"]})
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
result = mlflow.evaluate(
model,
eval_dataset,
predictions="answer",
targets="targets",
extra_metrics=[mymetric, mymetric2],
)
以下 MLflow GitHub 仓库中的短示例 使用具有额外指标函数的 mlflow.evaluate()
评估回归器在 California Housing Dataset 上的性能。
import os
import matplotlib.pyplot as plt
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.models import infer_signature, make_metric
# loading the California housing dataset
cali_housing = fetch_california_housing(as_frame=True)
# split the dataset into train and test partitions
X_train, X_test, y_train, y_test = train_test_split(
cali_housing.data, cali_housing.target, test_size=0.2, random_state=123
)
# train the model
lin_reg = LinearRegression().fit(X_train, y_train)
# Infer model signature
predictions = lin_reg.predict(X_train)
signature = infer_signature(X_train, predictions)
# creating the evaluation dataframe
eval_data = X_test.copy()
eval_data["target"] = y_test
def squared_diff_plus_one(eval_df, _builtin_metrics):
"""
This example custom metric function creates a metric based on the ``prediction`` and
``target`` columns in ``eval_df`.
"""
return np.sum(np.abs(eval_df["prediction"] - eval_df["target"] + 1) ** 2)
def sum_on_target_divided_by_two(_eval_df, builtin_metrics):
"""
This example custom metric function creates a metric derived from existing metrics in
``builtin_metrics``.
"""
return builtin_metrics["sum_on_target"] / 2
def prediction_target_scatter(eval_df, _builtin_metrics, artifacts_dir):
"""
This example custom artifact generates and saves a scatter plot to ``artifacts_dir`` that
visualizes the relationship between the predictions and targets for the given model to a
file as an image artifact.
"""
plt.scatter(eval_df["prediction"], eval_df["target"])
plt.xlabel("Targets")
plt.ylabel("Predictions")
plt.title("Targets vs. Predictions")
plot_path = os.path.join(artifacts_dir, "example_scatter_plot.png")
plt.savefig(plot_path)
return {"example_scatter_plot_artifact": plot_path}
with mlflow.start_run() as run:
mlflow.sklearn.log_model(lin_reg, "model", signature=signature)
model_uri = mlflow.get_artifact_uri("model")
result = mlflow.evaluate(
model=model_uri,
data=eval_data,
targets="target",
model_type="regressor",
evaluators=["default"],
extra_metrics=[
make_metric(
eval_fn=squared_diff_plus_one,
greater_is_better=False,
),
make_metric(
eval_fn=sum_on_target_divided_by_two,
greater_is_better=True,
),
],
custom_artifacts=[prediction_target_scatter],
)
print(f"metrics:\n{result.metrics}")
print(f"artifacts:\n{result.artifacts}")
有关更全面的额外指标用法示例,请参阅 MLflow GitHub 仓库中的此示例。
使用函数进行评估
从 MLflow 2.8.0 开始,mlflow.evaluate()
支持评估 Python 函数,而无需将模型记录到 MLflow 中。当您不想记录模型而只想评估模型时,这很有用。函数的输入和输出要求与模型的输入和输出要求相同。
以下示例使用 mlflow.evaluate()
评估函数
import shap
import xgboost
from sklearn.model_selection import train_test_split
import mlflow
# Load the UCI Adult Dataset
X, y = shap.datasets.adult()
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train)
# Build the Evaluation Dataset from the test set
eval_data = X_test
eval_data["label"] = y_test
# Define a function that calls the model's predict method
def fn(X):
return model.predict(X)
with mlflow.start_run() as run:
# Evaluate the function without logging the model
result = mlflow.evaluate(
fn,
eval_data,
targets="label",
model_type="classifier",
evaluators=["default"],
)
print(f"metrics:\n{result.metrics}")
print(f"artifacts:\n{result.artifacts}")
使用静态数据集进行评估
从 MLflow 2.8.0 开始,mlflow.evaluate()
支持评估静态数据集,而无需指定模型。当您将模型输出保存到 Pandas DataFrame 或 MLflow PandasDataset 中的某一列,并希望评估静态数据集而无需重新运行模型时,这很有用。
如果您使用 Pandas DataFrame,您必须在 mlflow.evaluate()
中使用顶级 predictions
参数指定包含模型输出的列名
# Assume that the model output is saved to the pandas_df["model_output"] column
mlflow.evaluate(data=pandas_df, predictions="model_output", ...)
如果您使用 MLflow PandasDataset,您必须在 mlflow.data.from_pandas()
中使用 predictions
参数指定包含模型输出的列名,并在 mlflow.evaluate()
中为 predictions
参数指定 None
# Assume that the model output is saved to the pandas_df["model_output"] column
dataset = mlflow.data.from_pandas(pandas_df, predictions="model_output")
mlflow.evaluate(data=pandas_df, predictions=None, ...)
当模型有多个输出时,您必须在模型输出列中指定一列作为预测列。模型的其他输出列将被视为“输入”列。例如,如果模型有两个名为 retrieved_context
和 answer
的输出,您可以指定 answer
作为预测列。在计算指标时,retrieved_context
列将被视为“输入”列。
以下示例使用 mlflow.evaluate()
评估静态数据集
import shap
import xgboost
from sklearn.model_selection import train_test_split
import mlflow
# Load the UCI Adult Dataset
X, y = shap.datasets.adult()
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train)
# Build the Evaluation Dataset from the test set
y_test_pred = model.predict(X=X_test)
eval_data = X_test
eval_data["label"] = y_test
eval_data["predictions"] = y_test_pred
with mlflow.start_run() as run:
# Evaluate the static dataset without providing a model
result = mlflow.evaluate(
data=eval_data,
targets="label",
predictions="predictions",
model_type="classifier",
)
print(f"metrics:\n{result.metrics}")
print(f"artifacts:\n{result.artifacts}")
执行模型验证
MLflow 2.18.0 将模型验证功能从 mlflow.evaluate()
API 移至专用的 mlflow.validate_evaluation_results()
API。相关的参数(例如 baseline_model)已被弃用,并将在未来版本中从旧 API 中移除。
使用 mlflow.validate_evaluation_results()
API,您可以验证在模型评估期间生成的指标,以根据基准评估模型的质量。为此,首先使用 mlflow.evaluate()
评估候选模型和基准模型(或从本地存储加载持久化的评估结果)。然后,传递结果以及将指标名称映射到 mlflow.models.MetricThreshold
对象的 validation_thresholds 字典。如果您的模型未能满足指定的阈值,mlflow.validate_evaluation_results()
将引发 ModelValidationFailedException
,并附带验证失败的详细信息。
有关模型评估行为和输出的更多信息,请参阅 mlflow.evaluate()
API 文档。
根据基准模型验证模型
下面是如何使用预定义阈值根据基准验证候选模型性能的示例。
import xgboost
import shap
from sklearn.model_selection import train_test_split
from sklearn.dummy import DummyClassifier
import mlflow
from mlflow.models import MetricThreshold
# load UCI Adult Data Set; segment it into training and test sets
X, y = shap.datasets.adult()
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
# construct an evaluation dataset from the test set
eval_data = X_test
eval_data["label"] = y_test
# Log and evaluate the candidate model
candidate_model = xgboost.XGBClassifier().fit(X_train, y_train)
with mlflow.start_run(run_name="candidate") as run:
candidate_model_uri = mlflow.sklearn.log_model(
candidate_model, "candidate_model", signature=signature
).model_uri
candidate_result = mlflow.evaluate(
candidate_model_uri,
eval_data,
targets="label",
model_type="classifier",
)
# Log and evaluate the baseline model
baseline_model = DummyClassifier(strategy="uniform").fit(X_train, y_train)
with mlflow.start_run(run_name="baseline") as run:
baseline_model_uri = mlflow.sklearn.log_model(
baseline_model, "baseline_model", signature=signature
).model_uri
baseline_result = mlflow.evaluate(
baseline_model_uri,
eval_data,
targets="label",
model_type="classifier",
)
# Define criteria for model to be validated against
thresholds = {
"accuracy_score": MetricThreshold(
threshold=0.8, # accuracy should be >=0.8
min_absolute_change=0.05, # accuracy should be at least 0.05 greater than baseline model accuracy
min_relative_change=0.05, # accuracy should be at least 5 percent greater than baseline model accuracy
greater_is_better=True,
),
}
# Validate the candidate model against baseline
mlflow.validate_evaluation_results(
candidate_result=candidate_result,
baseline_result=baseline_result,
validation_thresholds=thresholds,
)
有关如何指定和检查阈值的详细信息,请参阅 mlflow.models.MetricThreshold
。
MLflow UI 中全面示例的记录输出如下所示。
模型验证结果不包含在活动的 MLflow 运行中。
根据静态阈值验证模型
mlflow.validate_evaluation_results()
API 还可以用于根据静态阈值验证候选模型,而不是将其与基准模型进行比较,只需将 None
传递给 baseline_result
参数。
import mlflow
from mlflow.models import MetricThreshold
thresholds = {
"accuracy_score": MetricThreshold(
threshold=0.8, # accuracy should be >=0.8
greater_is_better=True,
),
}
# Validate the candidate model against static threshold
mlflow.validate_evaluation_results(
candidate_result=candidate_result,
baseline_result=None,
validation_thresholds=thresholds,
)
重用基准结果进行多次验证
mlflow.models.EvaluationResult
对象由 mlflow.evaluate()
返回,可以保存到本地存储并从中加载。此功能允许您在不同的候选模型之间重用相同的基准结果,这对于自动化模型质量监控尤其有用。
import mlflow
from mlflow.models.evaluation import EvaluationResult
baseline_result = mlflow.evaluate(
baseline_model_uri,
eval_data,
targets="label",
model_type="classifier",
)
baseline_result.save("RESULT_PATH")
# Load the evaluation result for validation
baseline_result = EvaluationResult.load("RESULT_PATH")
有一些插件支持深度模型验证,其功能不是 MLflow 直接支持的。要了解更多信息,请参阅
多类和二元分类器之间曲线下面积-精确率-召回率分数(指标名称 precision_recall_auc
)计算的差异
评估多类分类器模型时,会利用 sklearn 的标准评分指标:sklearn.metrics.roc_auc_score
来计算精确率-召回率曲线下的面积。该算法利用梯形法则执行线性插值计算,以估计精确率-召回率曲线下的面积。它非常适合用于评估多类分类模型,以提供拟合质量的单个数值。
另一方面,二元分类器模型使用 sklearn.metrics.average_precision_score
,以避免在应用于二元分类中严重不平衡的类别时 roc_auc_score
实现的缺点。对不平衡数据集使用 roc_auc_score
可能会给出误导性的结果(比模型实际准确预测少数类成员的能力乐观地更好)。
有关为何对此采用不同算法的更多信息,以及指向影响 sklearn.metrics
模块中这些指标实现的论文链接,请参阅文档。
为简单起见,两种评估方法(无论是多类还是二元分类)的评估指标结果都统一在单个指标中:precision_recall_auc
。
使用 Giskard 插件进行模型验证
为了扩展 MLflow 的验证能力并在模型投入生产之前预测问题,Giskard 构建了一个插件,允许用户执行以下操作:
- 扫描模型以检测隐藏的漏洞,例如性能偏差、鲁棒性不足、过度自信、信心不足、伦理偏差、数据泄露、随机性、虚假相关等
- 探索数据中突出显示发现的漏洞的样本
- 将漏洞记录为定义明确且量化的指标
- 比较不同模型之间的指标
请参阅以下插件示例笔记本进行演示
有关插件的更多信息,请参阅 giskard-mlflow 文档。
使用 Trubrics 插件进行模型验证
为了扩展 MLflow 的验证功能,Trubrics 构建了一个插件,允许用户
- 使用大量现成的验证
- 使用任何自定义 Python 函数验证运行
- 在 .json 文件中查看所有验证结果,以诊断 MLflow 运行失败的原因
请参阅 插件示例笔记本进行演示。
有关插件的更多信息,请参阅 trubrics-mlflow 文档。
模型定制
虽然 MLflow 的内置模型持久化工具方便地将各种流行机器学习库的模型打包为 MLflow 模型格式,但它们不能涵盖所有用例。例如,您可能希望使用 MLflow 内置风格未明确支持的 ML 库中的模型。或者,您可能希望打包自定义推理代码和数据以创建 MLflow 模型。幸运的是,MLflow 提供了两种可以完成这些任务的解决方案:自定义 Python 模型 和 自定义风格。
本节内容
自定义 Python 模型
mlflow.pyfunc
模块提供了 save_model()
和 log_model()
工具,用于创建带有 python_function
风格、包含用户指定代码和工件(文件)依赖项的 MLflow 模型。这些工件依赖项可以包括由任何 Python ML 库生成的序列化模型。
由于这些自定义模型包含 python_function
风格,它们可以部署到 MLflow 支持的任何生产环境,例如 SageMaker、AzureML 或本地 REST 端点。
以下示例演示了如何使用 mlflow.pyfunc
模块创建自定义 Python 模型。有关使用 MLflow 的 python_function
工具进行模型定制的更多信息,请参阅 python_function 自定义模型文档。
示例:创建带类型提示的模型
此示例演示如何创建带有类型提示的自定义 Python 模型,使 MLflow 能够根据为模型输入指定的类型提示执行数据验证。有关 PythonModel 类型提示支持的更多信息,请参阅 PythonModel 类型提示指南。
PythonModel 支持带有类型提示的数据验证,从 MLflow 2.20.0 版本开始。
import pydantic
import mlflow
from mlflow.pyfunc import PythonModel
# Define the pydantic model input
class Message(pydantic.BaseModel):
role: str
content: str
class CustomModel(PythonModel):
# Define the model_input type hint
# NB: it must be list[...], check the python model type hints guide for more information
def predict(self, model_input: list[Message], params=None) -> list[str]:
return [m.content for m in model_input]
# Construct the model and test
model = CustomModel()
# The input_example can be a list of Message objects as defined in the type hint
input_example = [
Message(role="system", content="Hello"),
Message(role="user", content="Hi"),
]
assert model.predict(input_example) == ["Hello", "Hi"]
# The input example can also be a list of dictionaries that match the Message schema
input_example = [
{"role": "system", "content": "Hello"},
{"role": "user", "content": "Hi"},
]
assert model.predict(input_example) == ["Hello", "Hi"]
# Log the model
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
artifact_path="model",
python_model=model,
input_example=input_example,
)
# Load the model as pyfunc
pyfunc_model = mlflow.pyfunc.load_model(model_info.model_uri)
assert pyfunc_model.predict(input_example) == ["Hello", "Hi"]
示例:创建自定义“添加 n”模型
此示例定义了一个自定义模型类,该模型将指定的数值 n
添加到 Pandas DataFrame 输入的所有列。然后,它使用 mlflow.pyfunc
API 以 MLflow 模型格式保存此模型的一个实例,其中 n = 5
。最后,它以 python_function
格式加载模型并用于评估样本输入。
import mlflow.pyfunc
# Define the model class
class AddN(mlflow.pyfunc.PythonModel):
def __init__(self, n):
self.n = n
def predict(self, context, model_input, params=None):
return model_input.apply(lambda column: column + self.n)
# Construct and save the model
model_path = "add_n_model"
add5_model = AddN(n=5)
mlflow.pyfunc.save_model(path=model_path, python_model=add5_model)
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(model_path)
# Evaluate the model
import pandas as pd
model_input = pd.DataFrame([range(10)])
model_output = loaded_model.predict(model_input)
assert model_output.equals(pd.DataFrame([range(5, 15)]))
示例:以 MLflow 格式保存 XGBoost 模型
此示例首先使用 XGBoost 库训练并保存一个梯度提升树模型。接下来,它定义了一个包装类,该类封装了 XGBoost 模型并符合 MLflow 的 python_function
推理 API。然后,它使用包装类和保存的 XGBoost 模型构建一个 MLflow 模型,该模型使用梯度提升树执行推理。最后,它以 python_function
格式加载 MLflow 模型并用于评估测试数据。
# Load training and test datasets
from sys import version_info
import xgboost as xgb
from sklearn import datasets
from sklearn.model_selection import train_test_split
PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
iris = datasets.load_iris()
x = iris.data[:, 2:]
y = iris.target
x_train, x_test, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=42)
dtrain = xgb.DMatrix(x_train, label=y_train)
# Train and save an XGBoost model
xgb_model = xgb.train(params={"max_depth": 10}, dtrain=dtrain, num_boost_round=10)
xgb_model_path = "xgb_model.pth"
xgb_model.save_model(xgb_model_path)
# Create an `artifacts` dictionary that assigns a unique name to the saved XGBoost model file.
# This dictionary will be passed to `mlflow.pyfunc.save_model`, which will copy the model file
# into the new MLflow Model's directory.
artifacts = {"xgb_model": xgb_model_path}
# Define the model class
import mlflow.pyfunc
class XGBWrapper(mlflow.pyfunc.PythonModel):
def load_context(self, context):
import xgboost as xgb
self.xgb_model = xgb.Booster()
self.xgb_model.load_model(context.artifacts["xgb_model"])
def predict(self, context, model_input, params=None):
input_matrix = xgb.DMatrix(model_input.values)
return self.xgb_model.predict(input_matrix)
# Create a Conda environment for the new MLflow Model that contains all necessary dependencies.
import cloudpickle
conda_env = {
"channels": ["defaults"],
"dependencies": [
f"python={PYTHON_VERSION}",
"pip",
{
"pip": [
f"mlflow=={mlflow.__version__}",
f"xgboost=={xgb.__version__}",
f"cloudpickle=={cloudpickle.__version__}",
],
},
],
"name": "xgb_env",
}
# Save the MLflow Model
mlflow_pyfunc_model_path = "xgb_mlflow_pyfunc"
mlflow.pyfunc.save_model(
path=mlflow_pyfunc_model_path,
python_model=XGBWrapper(),
artifacts=artifacts,
conda_env=conda_env,
)
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)
# Evaluate the model
import pandas as pd
test_predictions = loaded_model.predict(pd.DataFrame(x_test))
print(test_predictions)
示例:使用 hf:/ 模式记录 transformers 模型以避免复制大型文件
此示例展示了如何使用特殊的模式 hf:/
直接从 huggingface hub 记录 transformers 模型。当模型太大并且特别希望直接服务模型时,这很有用,但如果您希望在本地下载和测试模型,它不会节省额外空间。
import mlflow
from mlflow.models import infer_signature
import numpy as np
import transformers
# Define a custom PythonModel
class QAModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
"""
This method initializes the tokenizer and language model
using the specified snapshot location from model context.
"""
snapshot_location = context.artifacts["bert-tiny-model"]
# Initialize tokenizer and language model
tokenizer = transformers.AutoTokenizer.from_pretrained(snapshot_location)
model = transformers.BertForQuestionAnswering.from_pretrained(snapshot_location)
self.pipeline = transformers.pipeline(
task="question-answering", model=model, tokenizer=tokenizer
)
def predict(self, context, model_input, params=None):
question = model_input["question"][0]
if isinstance(question, np.ndarray):
question = question.item()
ctx = model_input["context"][0]
if isinstance(ctx, np.ndarray):
ctx = ctx.item()
return self.pipeline(question=question, context=ctx)
# Log the model
data = {"question": "Who's house?", "context": "The house is owned by Run."}
pyfunc_artifact_path = "question_answering_model"
with mlflow.start_run() as run:
model_info = mlflow.pyfunc.log_model(
artifact_path=pyfunc_artifact_path,
python_model=QAModel(),
artifacts={"bert-tiny-model": "hf:/prajjwal1/bert-tiny"},
input_example=data,
signature=infer_signature(data, ["Run"]),
extra_pip_requirements=["torch", "accelerate", "transformers", "numpy"],
)
自定义风格
您还可以通过编写自定义风格来创建自定义 MLflow 模型。
如 模型 API 和 存储格式 部分所述,MLflow 模型由包含 MLmodel
配置文件的文件目录定义。此 MLmodel
文件描述了各种模型属性,包括模型可以解释的风格。MLmodel
文件包含每个风格名称的条目;每个条目是一个 YAML 格式的风格特定属性集合。
要创建新风格以支持自定义模型,您需要定义要包含在 MLmodel
配置文件中的风格特定属性集,以及可以解释模型目录内容和风格属性的代码。下面展示了构建自定义模型风格及其用法的详细示例。不考虑正式包含在 MLflow 中的新自定义风格应作为单独的 GitHub 仓库引入,并在 社区模型风格 页面中提供文档。
示例:创建自定义“sktime”风格
此示例说明了为 sktime 时间序列库创建自定义风格。该库为包括时间序列预测在内的多种学习任务提供统一接口。虽然此示例中的自定义风格在 sktime
推理 API 和模型序列化格式方面是特定的,但其接口设计类似于许多现有的内置风格。特别是,利用加载为 python_function
风格的自定义模型生成预测的接口使用单行 Pandas DataFrame
配置参数来公开 sktime
推理 API 的参数。此示例的完整代码包含在 sktime
示例目录的 flavor.py 模块中。
让我们更详细地检查自定义风格模块。第一步是导入几个模块,包括 sktime
库、各种 MLflow 工具以及添加 pyfunc
规范到 MLflow 模型配置所需的 MLflow pyfunc
模块。还要注意导入 flavor
模块本身。这将传递给 mlflow.models.Model.log()
方法,以便将模型记录为当前 MLflow 运行的工件。
import logging
import os
import pickle
import flavor
import mlflow
import numpy as np
import pandas as pd
import sktime
import yaml
from mlflow import pyfunc
from mlflow.exceptions import MlflowException
from mlflow.models import Model
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INTERNAL_ERROR, INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.environment import (
_CONDA_ENV_FILE_NAME,
_CONSTRAINTS_FILE_NAME,
_PYTHON_ENV_FILE_NAME,
_REQUIREMENTS_FILE_NAME,
_mlflow_conda_env,
_process_conda_env,
_process_pip_requirements,
_PythonEnv,
_validate_env_arguments,
)
from mlflow.utils.file_utils import write_to
from mlflow.utils.model_utils import (
_add_code_from_conf_to_system_path,
_get_flavor_configuration,
_validate_and_copy_code_paths,
_validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from sktime.utils.multiindex import flatten_multiindex
_logger = logging.getLogger(__name__)
我们继续定义一组在以下代码中使用的重要变量。
每个自定义风格都需要提供风格名称,并且应反映要支持的库的名称。它作为风格特定属性的一部分保存到 MLmodel
配置文件中。此示例还定义了一些 sktime
特定变量。出于说明目的,仅包含通过 _SktimeModelWrapper
类公开的可用预测方法的一个子集,当以 python_function
风格加载模型时(可以以类似方式添加其他方法)。此外,定义了模型序列化格式,即 pickle
(默认)和 cloudpickle
。请注意,这两个序列化模块都需要在使用此模型的任何环境中(无论是用于推理)使用相同的 python 环境(版本),以确保模型将加载适当版本的 pickle (cloudpickle)。
FLAVOR_NAME = "sktime"
SKTIME_PREDICT = "predict"
SKTIME_PREDICT_INTERVAL = "predict_interval"
SKTIME_PREDICT_QUANTILES = "predict_quantiles"
SKTIME_PREDICT_VAR = "predict_var"
SUPPORTED_SKTIME_PREDICT_METHODS = [
SKTIME_PREDICT,
SKTIME_PREDICT_INTERVAL,
SKTIME_PREDICT_QUANTILES,
SKTIME_PREDICT_VAR,
]
SERIALIZATION_FORMAT_PICKLE = "pickle"
SERIALIZATION_FORMAT_CLOUDPICKLE = "cloudpickle"
SUPPORTED_SERIALIZATION_FORMATS = [
SERIALIZATION_FORMAT_PICKLE,
SERIALIZATION_FORMAT_CLOUDPICKLE,
]
类似于 MLflow 内置风格,自定义风格通过 save_model()
和 log_model()
函数以 MLflow 格式记录模型。在 save_model()
函数中,sktime
模型保存到指定的输出目录。此外,save_model()
利用 mlflow.models.Model.add_flavor()
和 mlflow.models.Model.save()
方法生成包含 sktime
和 python_function
风格的 MLmodel
配置。生成的配置具有几个风格特定属性,例如风格名称和 sktime_version
,它表示用于训练模型的 sktime
库的版本。自定义 sktime
模型的输出目录示例如下所示
# Directory written by flavor.save_model(model, "my_model")
my_model/
├── MLmodel
├── conda.yaml
├── model.pkl
├── python_env.yaml
└── requirements.txt
其 YAML 格式的 MLmodel
文件描述了两种风格
flavors:
python_function:
env:
conda: conda.yaml
virtualenv: python_env.yaml
loader_module: flavor
model_path: model.pkl
python_version: 3.8.15
sktime:
code: null
pickled_model: model.pkl
serialization_format: pickle
sktime_version: 0.16.0
save_model()
函数还提供了添加附加参数的灵活性,这些参数可以作为风格特定属性添加到模型配置中。在此示例中,只有一个风格特定参数用于指定模型序列化格式。所有其他参数都是非风格特定的(有关这些参数的详细描述,请参阅 mlflow.sklearn.save_model)。注意:创建自己的自定义风格时,请务必在 save_model()
和 log_model()
函数中将 sktime_model
参数重命名,以反映自定义模型风格的名称。
def save_model(
sktime_model,
path,
conda_env=None,
code_paths=None,
mlflow_model=None,
signature=None,
input_example=None,
pip_requirements=None,
extra_pip_requirements=None,
serialization_format=SERIALIZATION_FORMAT_PICKLE,
):
_validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements)
if serialization_format not in SUPPORTED_SERIALIZATION_FORMATS:
raise MlflowException(
message=(
f"Unrecognized serialization format: {serialization_format}. "
"Please specify one of the following supported formats: "
"{SUPPORTED_SERIALIZATION_FORMATS}."
),
error_code=INVALID_PARAMETER_VALUE,
)
_validate_and_prepare_target_save_path(path)
code_dir_subpath = _validate_and_copy_code_paths(code_paths, path)
if mlflow_model is None:
mlflow_model = Model()
if signature is not None:
mlflow_model.signature = signature
if input_example is not None:
_save_example(mlflow_model, input_example, path)
model_data_subpath = "model.pkl"
model_data_path = os.path.join(path, model_data_subpath)
_save_model(
sktime_model, model_data_path, serialization_format=serialization_format
)
pyfunc.add_to_model(
mlflow_model,
loader_module="flavor",
model_path=model_data_subpath,
conda_env=_CONDA_ENV_FILE_NAME,
python_env=_PYTHON_ENV_FILE_NAME,
code=code_dir_subpath,
)
mlflow_model.add_flavor(
FLAVOR_NAME,
pickled_model=model_data_subpath,
sktime_version=sktime.__version__,
serialization_format=serialization_format,
code=code_dir_subpath,
)
mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME))
if conda_env is None:
if pip_requirements is None:
include_cloudpickle = (
serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE
)
default_reqs = get_default_pip_requirements(include_cloudpickle)
inferred_reqs = mlflow.models.infer_pip_requirements(
path, FLAVOR_NAME, fallback=default_reqs
)
default_reqs = sorted(set(inferred_reqs).union(default_reqs))
else:
default_reqs = None
conda_env, pip_requirements, pip_constraints = _process_pip_requirements(
default_reqs, pip_requirements, extra_pip_requirements
)
else:
conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env)
with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f:
yaml.safe_dump(conda_env, stream=f, default_flow_style=False)
if pip_constraints:
write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints))
write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements))
_PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))
def _save_model(model, path, serialization_format):
with open(path, "wb") as out:
if serialization_format == SERIALIZATION_FORMAT_PICKLE:
pickle.dump(model, out)
else:
import cloudpickle
cloudpickle.dump(model, out)
save_model()
函数还将模型依赖项写入模型输出目录中的 requirements.txt
和 conda.yaml
文件。为此,此风格产生的 pip
依赖项集需要添加到 get_default_pip_requirements()
函数中。在此示例中,仅提供最低必需的依赖项。实际上,可以包含预处理或后处理步骤所需的附加要求。请注意,对于任何自定义风格,save_model()
函数中的 mlflow.models.infer_pip_requirements()
方法将返回 get_default_pip_requirements()
中定义的默认要求,因为仅对内置风格推断包导入。
def get_default_pip_requirements(include_cloudpickle=False):
pip_deps = [_get_pinned_requirement("sktime")]
if include_cloudpickle:
pip_deps += [_get_pinned_requirement("cloudpickle")]
return pip_deps
def get_default_conda_env(include_cloudpickle=False):
return _mlflow_conda_env(
additional_pip_deps=get_default_pip_requirements(include_cloudpickle)
)
接下来,我们添加 log_model()
函数。此函数只是 mlflow.models.Model.log()
方法的一个包装器,用于将自定义模型作为工件记录到当前的 MLflow 运行中。在 save_model()
函数中引入的任何风格特定参数(例如 serialization_format
)也需要添加到 log_model()
函数中。我们还需要将 flavor
模块传递给 mlflow.models.Model.log()
方法,该方法内部调用上面的 save_model()
函数来持久化模型。
def log_model(
sktime_model,
artifact_path,
conda_env=None,
code_paths=None,
registered_model_name=None,
signature=None,
input_example=None,
await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS,
pip_requirements=None,
extra_pip_requirements=None,
serialization_format=SERIALIZATION_FORMAT_PICKLE,
**kwargs,
):
return Model.log(
artifact_path=artifact_path,
flavor=flavor,
registered_model_name=registered_model_name,
sktime_model=sktime_model,
conda_env=conda_env,
code_paths=code_paths,
signature=signature,
input_example=input_example,
await_registration_for=await_registration_for,
pip_requirements=pip_requirements,
extra_pip_requirements=extra_pip_requirements,
serialization_format=serialization_format,
**kwargs,
)
为了解释 save_model()
生成的模型目录,自定义风格还必须定义一个 load_model()
函数。load_model()
函数从指定的模型目录读取 MLmodel
配置,并使用配置属性从其序列化表示加载并返回 sktime
模型。
def load_model(model_uri, dst_path=None):
local_model_path = _download_artifact_from_uri(
artifact_uri=model_uri, output_path=dst_path
)
flavor_conf = _get_flavor_configuration(
model_path=local_model_path, flavor_name=FLAVOR_NAME
)
_add_code_from_conf_to_system_path(local_model_path, flavor_conf)
sktime_model_file_path = os.path.join(
local_model_path, flavor_conf["pickled_model"]
)
serialization_format = flavor_conf.get(
"serialization_format", SERIALIZATION_FORMAT_PICKLE
)
return _load_model(
path=sktime_model_file_path, serialization_format=serialization_format
)
def _load_model(path, serialization_format):
with open(path, "rb") as pickled_model:
if serialization_format == SERIALIZATION_FORMAT_PICKLE:
return pickle.load(pickled_model)
elif serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE:
import cloudpickle
return cloudpickle.load(pickled_model)
_load_pyfunc()
函数将由 mlflow.pyfunc.load_model()
方法调用,以将自定义模型风格加载为 pyfunc
类型。MLmodel 风格配置用于将任何风格特定属性传递给 _load_model()
函数(即模型目录中 python_function
风格的路径和模型序列化格式)。
def _load_pyfunc(path):
try:
sktime_flavor_conf = _get_flavor_configuration(
model_path=path, flavor_name=FLAVOR_NAME
)
serialization_format = sktime_flavor_conf.get(
"serialization_format", SERIALIZATION_FORMAT_PICKLE
)
except MlflowException:
_logger.warning(
"Could not find sktime flavor configuration during model "
"loading process. Assuming 'pickle' serialization format."
)
serialization_format = SERIALIZATION_FORMAT_PICKLE
pyfunc_flavor_conf = _get_flavor_configuration(
model_path=path, flavor_name=pyfunc.FLAVOR_NAME
)
path = os.path.join(path, pyfunc_flavor_conf["model_path"])
return _SktimeModelWrapper(
_load_model(path, serialization_format=serialization_format)
)
最后一步是创建定义 python_function
风格的模型包装类。包装类的设计决定了在使用 python_function
风格进行预测时,风格的推理 API 如何公开。就像内置风格一样,sktime
包装类的 predict()
方法接受单行 Pandas DataFrame
配置参数。有关如何构造此配置 DataFrame 的示例,请参阅下一节中的用法示例。有关支持的参数和输入格式的详细描述,请参阅 flavor.py 模块文档字符串。
class _SktimeModelWrapper:
def __init__(self, sktime_model):
self.sktime_model = sktime_model
def predict(self, dataframe, params=None) -> pd.DataFrame:
df_schema = dataframe.columns.values.tolist()
if len(dataframe) > 1:
raise MlflowException(
f"The provided prediction pd.DataFrame contains {len(dataframe)} rows. "
"Only 1 row should be supplied.",
error_code=INVALID_PARAMETER_VALUE,
)
# Convert the configuration dataframe into a dictionary to simplify the
# extraction of parameters passed to the sktime predcition methods.
attrs = dataframe.to_dict(orient="index").get(0)
predict_method = attrs.get("predict_method")
if not predict_method:
raise MlflowException(
f"The provided prediction configuration pd.DataFrame columns ({df_schema}) do not "
"contain the required column `predict_method` for specifying the prediction method.",
error_code=INVALID_PARAMETER_VALUE,
)
if predict_method not in SUPPORTED_SKTIME_PREDICT_METHODS:
raise MlflowException(
"Invalid `predict_method` value."
f"The supported prediction methods are {SUPPORTED_SKTIME_PREDICT_METHODS}",
error_code=INVALID_PARAMETER_VALUE,
)
# For inference parameters 'fh', 'X', 'coverage', 'alpha', and 'cov'
# the respective sktime default value is used if the value was not
# provided in the configuration dataframe.
fh = attrs.get("fh", None)
# Any model that is trained with exogenous regressor elements will need
# to provide `X` entries as a numpy ndarray to the predict method.
X = attrs.get("X", None)
# When the model is served via REST API the exogenous regressor must be
# provided as a list to the configuration DataFrame to be JSON serializable.
# Below we convert the list back to ndarray type as required by sktime
# predict methods.
if isinstance(X, list):
X = np.array(X)
# For illustration purposes only a subset of the available sktime prediction
# methods is exposed. Additional methods (e.g. predict_proba) could be added
# in a similar fashion.
if predict_method == SKTIME_PREDICT:
predictions = self.sktime_model.predict(fh=fh, X=X)
if predict_method == SKTIME_PREDICT_INTERVAL:
coverage = attrs.get("coverage", 0.9)
predictions = self.sktime_model.predict_interval(
fh=fh, X=X, coverage=coverage
)
if predict_method == SKTIME_PREDICT_QUANTILES:
alpha = attrs.get("alpha", None)
predictions = self.sktime_model.predict_quantiles(fh=fh, X=X, alpha=alpha)
if predict_method == SKTIME_PREDICT_VAR:
cov = attrs.get("cov", False)
predictions = self.sktime_model.predict_var(fh=fh, X=X, cov=cov)
# Methods predict_interval() and predict_quantiles() return a pandas
# MultiIndex column structure. As MLflow signature inference does not
# support MultiIndex column structure the columns must be flattened.
if predict_method in [SKTIME_PREDICT_INTERVAL, SKTIME_PREDICT_QUANTILES]:
predictions.columns = flatten_multiindex(predictions)
return predictions
示例:使用自定义“sktime”风格
此示例使用 Longley 数据集训练一个 sktime
NaiveForecaster 模型,用于带外生变量的预测。它展示了一个自定义模型类型实现,该实现记录训练超参数、评估指标和训练的模型作为工件。此示例的单行配置 DataFrame 定义了一个区间预测,名义覆盖值为 [0.9,0.95]
,未来预测 horizons 为四个周期,并包含一个外生回归器。
import json
import flavor
import pandas as pd
from sktime.datasets import load_longley
from sktime.forecasting.model_selection import temporal_train_test_split
from sktime.forecasting.naive import NaiveForecaster
from sktime.performance_metrics.forecasting import (
mean_absolute_error,
mean_absolute_percentage_error,
)
import mlflow
ARTIFACT_PATH = "model"
with mlflow.start_run() as run:
y, X = load_longley()
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)
forecaster = NaiveForecaster()
forecaster.fit(
y_train,
X=X_train,
)
# Extract parameters
parameters = forecaster.get_params()
# Evaluate model
y_pred = forecaster.predict(fh=[1, 2, 3, 4], X=X_test)
metrics = {
"mae": mean_absolute_error(y_test, y_pred),
"mape": mean_absolute_percentage_error(y_test, y_pred),
}
print(f"Parameters: \n{json.dumps(parameters, indent=2)}")
print(f"Metrics: \n{json.dumps(metrics, indent=2)}")
# Log parameters and metrics
mlflow.log_params(parameters)
mlflow.log_metrics(metrics)
# Log model using custom model flavor with pickle serialization (default).
flavor.log_model(
sktime_model=forecaster,
artifact_path=ARTIFACT_PATH,
serialization_format="pickle",
)
model_uri = mlflow.get_artifact_uri(ARTIFACT_PATH)
# Load model in native sktime flavor and pyfunc flavor
loaded_model = flavor.load_model(model_uri=model_uri)
loaded_pyfunc = flavor.pyfunc.load_model(model_uri=model_uri)
# Convert test data to 2D numpy array so it can be passed to pyfunc predict using
# a single-row Pandas DataFrame configuration argument
X_test_array = X_test.to_numpy()
# Create configuration DataFrame
predict_conf = pd.DataFrame(
[
{
"fh": [1, 2, 3, 4],
"predict_method": "predict_interval",
"coverage": [0.9, 0.95],
"X": X_test_array,
}
]
)
# Generate interval forecasts with native sktime flavor and pyfunc flavor
print(
f"\nNative sktime 'predict_interval':\n${loaded_model.predict_interval(fh=[1, 2, 3], X=X_test, coverage=[0.9, 0.95])}"
)
print(f"\nPyfunc 'predict_interval':\n${loaded_pyfunc.predict(predict_conf)}")
# Print the run id which is used for serving the model to a local REST API endpoint
print(f"\nMLflow run id:\n{run.info.run_id}")
打开 MLflow 运行详情页面时,将显示序列化模型工件,例如
要将模型服务到本地 REST API 端点,请运行以下 MLflow CLI 命令,替换上一块执行期间打印的运行 ID(有关更多详细信息,请参阅 部署 MLflow 模型 部分)
mlflow models serve -m runs:/<run_id>/model --env-manager local --host 127.0.0.1
下面显示了请求从服务模型进行预测的示例。外生回归器需要作为列表提供,以便可 JSON 序列化。包装器实例将把列表转换回 numpy ndarray
类型,这是 sktime
推理 API 所需的。
import pandas as pd
import requests
from sktime.datasets import load_longley
from sktime.forecasting.model_selection import temporal_train_test_split
y, X = load_longley()
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)
# Define local host and endpoint url
host = "127.0.0.1"
url = f"http://{host}:5000/invocations"
# Create configuration DataFrame
X_test_list = X_test.to_numpy().tolist()
predict_conf = pd.DataFrame(
[
{
"fh": [1, 2, 3, 4],
"predict_method": "predict_interval",
"coverage": [0.9, 0.95],
"X": X_test_list,
}
]
)
# Create dictionary with pandas DataFrame in the split orientation
json_data = {"dataframe_split": predict_conf.to_dict(orient="split")}
# Score model
response = requests.post(url, json=json_data)
print(f"\nPyfunc 'predict_interval':\n${response.json()}")
部署前验证模型
使用 MLflow Tracking 记录模型后,强烈建议在将其部署到生产环境之前在本地验证模型。mlflow.models.predict()
API 提供了一种便捷的方法,可在虚拟环境中测试您的模型,提供隔离执行并具有以下几个优点
- 模型依赖项验证:API 通过在虚拟环境中执行模型并使用输入示例,帮助确保与模型一起记录的依赖项正确且充分。有关更多详细信息,请参阅 验证预测环境。
- 输入数据验证:API 可用于验证输入数据与模型的交互是否符合预期,通过模拟模型服务期间的相同数据处理过程。确保输入数据是与 pyfunc 模型预测函数要求对齐的有效示例。
- 额外环境变量验证:通过指定
extra_envs
参数,您可以测试模型成功运行是否需要额外的环境变量。请注意,os.environ
中所有现有环境变量都会自动传递到虚拟环境中。
import mlflow
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
return model_input
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
"model",
python_model=MyModel(),
input_example=["a", "b", "c"],
)
mlflow.models.predict(
model_uri=model_info.model_uri,
input_data=["a", "b", "c"],
pip_requirements_override=["..."],
extra_envs={"MY_ENV_VAR": "my_value"},
)
环境管理器
mlflow.models.predict()
API 支持以下环境管理器创建预测的虚拟环境
- virtualenv:默认环境管理器。
- uv:一个用 Rust 编写的极速环境管理器。这是 MLflow 2.20.0 以来的一项实验性功能。
- conda:使用 conda 创建环境。
local
:使用当前环境运行模型。请注意,此模式不支持pip_requirements_override
。
从 MLflow 2.20.0 开始,uv
可用,并且速度极快。运行 pip install uv
安装 uv,或参阅 uv 安装指南了解其他安装方法。
使用 uv
创建预测虚拟环境的示例
import mlflow
mlflow.models.predict(
model_uri="runs:/<run_id>/<model_path>",
input_data="your_data",
env_manager="uv",
)
内置部署工具
此信息已移至 MLflow Deployment 页面。
将 python_function
模型导出为 Apache Spark UDF
如果您使用的模型具有非常长的推理延迟(即,transformers
模型),可能需要比默认超时 60 秒更长的时间,您可以在为 MLflow 模型定义 spark_udf
实例时使用 extra_env
参数,指定覆盖环境变量 MLFLOW_SCORING_SERVER_REQUEST_TIMEOUT
。有关进一步指导,请参阅 :py:func:mlflow.pyfunc.spark_udf
。
您可以将 python_function
模型输出为 Apache Spark UDF,该 UDF 可以上传到 Spark 集群并用于为模型评分。
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model>")
df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))
如果模型包含签名,则可以在不指定列名参数的情况下调用 UDF。在这种情况下,将使用签名中的列名调用 UDF,因此评估 DataFrame 的列名必须与模型签名的列名匹配。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model-with-signature>")
df = spark_df.withColumn("prediction", pyfunc_udf())
如果模型包含具有张量规范输入的签名,则需要传递一列数组类型作为相应的 UDF 参数。此列中的值必须由一维数组组成。UDF 将按“C”顺序(即使用类似于 C 的索引顺序读/写元素)将数组值重塑为所需形状,并将值强制转换为所需的张量规范类型。例如,假设模型需要形状为 (-1, 2, 3) 的输入“a”和形状为 (-1, 4, 5) 的输入“b”。为了对此数据执行推理,我们需要准备一个 Spark DataFrame,其中列“a”包含长度为 6 的数组,列“b”包含长度为 20 的数组。然后我们可以像以下示例代码一样调用 UDF
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Assuming the model requires input 'a' of shape (-1, 2, 3) and input 'b' of shape (-1, 4, 5)
model_path = "<path-to-model-requiring-multidimensional-inputs>"
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_path)
# The `spark_df` has column 'a' containing arrays of length 6 and
# column 'b' containing arrays of length 20
df = spark_df.withColumn("prediction", pyfunc_udf(struct("a", "b")))
生成的 UDF 基于 Spark 的 Pandas UDF,目前仅限于每个观测生成单个值、值的数组或包含多个同类型字段值的结构。默认情况下,我们将第一个数值列作为双精度返回。您可以通过提供 result_type
参数来控制返回的结果。支持以下值
'int'
或 IntegerType:返回适合int32
结果的最左边的整数,如果没有则引发异常。'long'
或 LongType:返回适合int64
结果的最左边的长整数,如果没有则引发异常。- ArrayType (IntegerType | LongType):返回所有适合所请求大小的整数列。
'float'
或 FloatType:返回强制转换为float32
的最左边的数值结果,如果没有数值列则引发异常。'double'
或 DoubleType:返回强制转换为double
的最左边的数值结果,如果没有数值列则引发异常。- ArrayType ( FloatType | DoubleType ):返回强制转换为所请求类型的所有数值列。如果没有数值列则引发异常。
'string'
或 StringType:结果是强制转换为字符串的最左边的列。- ArrayType ( StringType ):返回强制转换为字符串的所有列。
'bool'
或'boolean'
或 BooleanType:返回强制转换为bool
的最左边的列,如果值不能被强制转换则引发异常。'field1 FIELD1_TYPE, field2 FIELD2_TYPE, ...'
:一个结构体类型,包含多个字段,用逗号分隔,每个字段类型必须是上面列出的类型之一。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Suppose the PyFunc model `predict` method returns a dict like:
# `{'prediction': 1-dim_array, 'probability': 2-dim_array}`
# You can supply result_type to be a struct type containing
# 2 fields 'prediction' and 'probability' like following.
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, "<path-to-model>", result_type="prediction float, probability: array<float>"
)
df = spark_df.withColumn("prediction", pyfunc_udf())
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, "path/to/model", result_type=ArrayType(FloatType())
)
# The prediction column will contain all the numeric columns returned by the model as floats
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))
如果要使用 conda 恢复用于训练模型的 Python 环境,请在调用 mlflow.pyfunc.spark_udf()
时设置 env_manager
参数。
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark,
"path/to/model",
result_type=ArrayType(FloatType()),
env_manager="conda", # Use conda to restore the environment used in training
)
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))
如果您想通过远程客户端中的 Databricks connect 调用 mlflow.pyfunc.spark_udf()
,则需要首先在 Databricks 运行时中构建模型环境。
from mlflow.pyfunc import build_model_env
# Build the model env and save it as an archive file to the provided UC volume directory
# and print the saved model env archive file path (like '/Volumes/.../.../XXXXX.tar.gz')
print(build_model_env(model_uri, "/Volumes/..."))
# print the cluster id. Databricks Connect client needs to use the cluster id.
print(spark.conf.get("spark.databricks.clusterUsageTags.clusterId"))
一旦预构建了模型环境,您就可以通过远程客户端中的 Databricks connect 使用 'prebuilt_model_env' 参数运行 mlflow.pyfunc.spark_udf()
,
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.remote(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
cluster_id="<cluster id>", # get cluster id by spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
).getOrCreate()
# The path generated by `build_model_env` in Databricks runtime.
model_env_uc_uri = "dbfs:/Volumes/.../.../XXXXX.tar.gz"
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, model_uri, prebuilt_env_uri=model_env_uc_uri
)
部署到自定义目标
除了内置部署工具外,MLflow 还提供了可插入的 mlflow.deployments()
和 mlflow deployments CLI,用于将模型部署到自定义目标和环境。要部署到自定义目标,您必须首先安装相应的第三方 Python 插件。请参阅此处列出的已知社区维护的插件。
命令
mlflow deployments
CLI 包含以下命令,这些命令也可以使用 mlflow.deployments Python API 以编程方式调用
- Create:将 MLflow 模型部署到指定的自定义目标
- Delete:删除部署
- Update:更新现有部署,例如部署新的模型版本或更改部署的配置(例如,增加副本数量)
- List:列出所有部署的 ID
- Get:打印特定部署的详细描述
- Run Local:在本地部署模型进行测试
- Help:显示指定目标的帮助字符串
有关更多信息,请参阅
mlflow deployments --help
mlflow deployments create --help
mlflow deployments delete --help
mlflow deployments update --help
mlflow deployments list --help
mlflow deployments get --help
mlflow deployments run-local --help
mlflow deployments help --help
社区模型风格
转到社区模型风格页面,获取 MLflow 社区开发和维护的其他有用 MLflow 风格的概述。