PyFunc 实践
如果您希望充分利用 mlflow.pyfunc 的功能,并了解如何将其用于机器学习项目中,那么本博文将指导您完成整个过程。MLflow PyFunc 提供了创作自由和灵活性,允许将复杂的系统封装为 MLflow 中的模型,这些模型遵循与传统模型相同的生命周期。本博文将展示如何创建多模型设置、无缝连接到数据库以及在 MLflow PyFunc 模型中实现自己的自定义拟合方法。
引言
本博文演示了 MLflow PyFunc 的功能及其在 MLflow 中封装为 PyFunc 风格模型的构建多模型设置中的应用。这种方法允许集成模型遵循 MLflow 中传统 内置模型风格 的相同生命周期。
但在那之前,让我们用一个类比来熟悉集成模型的概念,以及为什么您应该在下一个机器学习项目中考虑这种解决方案。
想象一下,您正在市场上购买一套房子。您会仅根据您访问的第一套房子和一位房地产经纪人的建议来做决定吗?当然不会!购房过程涉及考虑多个因素并从各种来源收集信息,以便做出明智的决定。
购房流程解释
- 确定您的需求:确定您想要新房还是二手房、房屋类型、款式和建造年份。
- 研究:查找可用房屋列表,查看折扣和优惠,阅读客户评论,并征求朋友和家人的意见。
- 评估:考虑性能、位置、社区便利设施和价格范围。
- 比较:比较多套房屋,以找到最适合您需求和预算的选择。
简而言之,您不会直接得出结论,而是会在考虑上述所有因素后,再决定最佳选择。
机器学习中的集成模型遵循类似的想法。集成学习通过组合多个模型来提高机器学习结果,与单个模型相比,可以提高预测性能。性能的提高可能归因于多种因素,例如通过平均多个模型来减少方差,或通过关注先前模型的错误来减少偏差。存在几种集成学习技术,例如
- 平均法
- 加权平均法
- Bagging
- Boosting
- Stacking
然而,开发此类系统需要仔细管理集成模型的生命周期,因为集成不同模型可能非常复杂。这就是 MLflow PyFunc 变得无价的地方。它提供了构建复杂系统的灵活性,将整个集成视为一个模型,遵循与传统模型相同的生命周期过程。从本质上讲,MLflow PyFunc 允许创建针对集成模型量身定制的自定义方法,作为 scikit-learn、PyTorch 和 LangChain 等流行框架的内置 MLflow 风格的替代方案。
本博文利用来自 Kaggle 的房价数据集来演示如何通过 MLflow 开发和管理集成模型。
我们将利用各种工具和技术来突出 MLflow PyFunc 模型的功能。在深入研究集成模型本身之前,我们将探讨这些组件如何集成以创建强大而高效的机器学习管道。
项目组成部分
DuckDB
DuckDB 是一种高性能分析数据库系统,设计为快速、可靠、便携且易于使用。在此项目中,它展示了数据库连接在模型上下文中的集成,有助于直接在模型内高效处理数据。了解有关 DuckDB 的更多信息。
scikit-learn (sklearn)
scikit-learn 是一个用于 Python 的机器学习库,它为数据分析和建模提供了高效的工具。在此项目中,它用于开发和评估各种集成到我们的集成模型中的机器学习模型。了解有关 scikit-learn 的更多信息。
MLflow
MLflow 是一个开源平台,用于管理端到端的机器学习生命周期,包括实验、可重现性和部署。在此项目中,它以我们熟悉的方式跟踪实验、管理模型版本并促进 MLflow PyFunc 模型的部署,类似于我们对单个风格所做的方式。了解有关 MLflow 的更多信息。
注意: 要重现此项目,请参阅官方 MLflow 文档,了解有关设置简单的本地 MLflow 跟踪服务器 的更多详细信息。
创建集成模型
创建 MLflow PyFunc 集成模型与使用内置风格来记录和处理流行的机器学习框架相比,需要额外的步骤。
要实现集成模型,您需要定义一个 mlflow.pyfunc 模型,这涉及创建一个继承自 PythonModel 类的 Python 类,并实现其构造函数和类方法。虽然创建 PyFunc 模型的基本要求仅是实现 predict 方法,但集成模型需要额外的方法来管理模型并获得多模型预测。实例化集成模型后,您必须使用自定义的 fit 方法来训练集成模型的子模型。与开箱即用的 MLflow 模型类似,您需要在训练运行时与模型一起记录模型及其工件,然后将模型注册到 MLflow 模型注册表中。还将向模型添加一个名为 production 的模型别名,以简化模型更新和推理。模型别名允许您将可变的命名引用分配给注册模型的特定版本。通过将别名分配给特定的模型版本,可以通过模型 URI 或模型注册表 API 轻松引用它。这种设置允许在不更改服务工作负载代码的情况下,无缝更新用于推理的模型版本。有关更多详细信息,请参阅 使用别名和标签部署和组织模型。
以下部分,如图所示,详细介绍了集成模型的每个方法的实现,从而全面了解如何使用 MLflow PyFunc 定义、管理和利用集成模型。

在深入研究每个方法的详细实现之前,我们首先回顾一下 EnsembleModel 类的骨架。此骨架作为理解集成模型结构的蓝图。后续部分将概述和提供 MLflow PyFunc 提供的默认方法和为集成模型实现的自定义方法的代码。
以下是 EnsembleModel 类的骨架
import mlflow
class EnsembleModel(mlflow.pyfunc.PythonModel):
"""Ensemble model class leveraging Pyfunc for multi-model integration in MLflow."""
def __init__(self):
"""Initialize the EnsembleModel instance."""
...
def add_strategy_and_save_to_db(self):
"""Add strategies to the DuckDB database."""
...
def feature_engineering(self):
"""Perform feature engineering on input data."""
...
def initialize_models(self):
"""Initialize models and their hyperparameter grids."""
...
def fit(self):
"""Train the ensemble of models."""
...
def predict(self):
"""Predict using the ensemble of models."""
...
def load_context(self):
"""Load the preprocessor and models from the MLflow context."""
...
初始化 EnsembleModel
集成模型中的构造函数方法对于设置其基本元素至关重要。它建立关键属性,例如预处理器、用于存储已训练模型的字典、DuckDB 数据库的路径以及用于管理不同集成策略的 pandas DataFrame。此外,它还利用 initialize_models 方法来定义集成到集成中的子模型。
import pandas as pd
def __init__(self):
"""
Initializes the EnsembleModel instance.
Sets up an empty preprocessing pipeline, a dictionary for fitted models,
and a DataFrame to store strategies. Also calls the method to initialize sub-models.
"""
self.preprocessor = None
self.fitted_models = {}
self.db_path = None
self.strategies = pd.DataFrame(columns=["strategy", "model_list", "weights"])
self.initialize_models()
添加策略并保存到数据库
自定义定义的 add_strategy_and_save_to_db 方法支持将新的集成策略添加到模型中,并将其存储在 DuckDB 数据库中。此方法接受包含策略的 pandas DataFrame 和数据库路径作为输入。它将新策略附加到现有策略并将其保存在集成模型初始化期间指定的数据库中。此方法有助于管理各种集成策略,并确保其持久存储以供将来使用。
import duckdb
import pandas as pd
def add_strategy_and_save_to_db(self, strategy_df: pd.DataFrame, db_path: str) -> None:
"""Add strategies from a DataFrame and save them to the DuckDB database.
Args:
strategy_df (pd.DataFrame): DataFrame containing strategies.
db_path (str): Path to the DuckDB database.
"""
# Update the instance-level database path for the current object
self.db_path = db_path
# Attempt to concatenate new strategies with the existing DataFrame
try:
self.strategies = pd.concat([self.strategies, strategy_df], ignore_index=True)
except Exception as e:
# Print an error message if any exceptions occur during concatenation
print(f"Error concatenating DataFrames: {e}")
return # Exit early to prevent further errors
# Use context manager for the database connection
try:
with duckdb.connect(self.db_path) as con:
# Register the strategies DataFrame as a temporary table in DuckDB
con.register("strategy_df", self.strategies)
# Drop any existing strategies table and create a new one with updated strategies
con.execute("DROP TABLE IF EXISTS strategies")
con.execute("CREATE TABLE strategies AS SELECT * FROM strategy_df")
except Exception as e:
# Print an error message if any exceptions occur during database operations
print(f"Error executing database operations: {e}")
以下示例演示了如何使用此方法将策略添加到数据库中。
import pandas as pd
# Initialize ensemble model
ensemble_model = EnsembleModel()
# Define strategies for the ensemble model
strategy_data = {
"strategy": ["average_1"],
"model_list": ["random_forest,xgboost,decision_tree,gradient_boosting,adaboost"],
"weights": ["1"],
}
# Create a DataFrame to hold the strategy information
strategies_df = pd.DataFrame(strategy_data)
# Add strategies to the database
ensemble_model.add_strategy_and_save_to_db(strategies_df, "models/strategies.db")
DataFrame strategy_data 包含
- strategy:用于模型预测的策略名称。
- model_list:包含在策略中的模型名称的逗号分隔列表。
- weights:分配给
model_list中每个模型的权重的逗号分隔列表。如果未提供,则表示权重相等或使用默认值。
| strategy | model_list | weights |
|---|---|---|
| average_1 | random_forest,xgboost,decision_tree,gradient_boosting,adaboost | 1 |
特征工程
feature_engineering 方法通过处理缺失值、缩放数值特征和编码分类特征来预处理输入数据。它对数值特征和分类特征应用不同的转换,并将处理后的特征作为 NumPy 数组返回。此方法对于以适合模型训练的格式准备数据至关重要,确保一致性并提高模型性能。
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
def feature_engineering(self, X: pd.DataFrame) -> np.ndarray:
"""
Applies feature engineering to the input data X, including imputation, scaling, and encoding.
Args:
X (pd.DataFrame): Input features with potential categorical and numerical columns.
Returns:
np.ndarray: Processed feature array after transformations.
"""
# Convert columns with 'object' dtype to 'category' dtype for proper handling of categorical features
X = X.apply(
lambda col: col.astype("category") if col.dtypes == "object" else col
)
# Identify categorical and numerical features from the DataFrame
categorical_features = X.select_dtypes(include=["category"]).columns
numerical_features = X.select_dtypes(include=["number"]).columns
# Define the pipeline for numerical features: imputation followed by scaling
numeric_transformer = Pipeline(
steps=[
(
"imputer",
SimpleImputer(strategy="median"),
), # Replace missing values with the median
(
"scaler",
StandardScaler(),
), # Standardize features by removing the mean and scaling to unit variance
]
)
# Define the pipeline for categorical features: imputation followed by one-hot encoding
categorical_transformer = Pipeline(
steps=[
(
"imputer",
SimpleImputer(strategy="most_frequent"),
), # Replace missing values with the most frequent value
(
"onehot",
OneHotEncoder(handle_unknown="ignore"),
), # Encode categorical features as a one-hot numeric array
]
)
# Create a ColumnTransformer to apply the appropriate pipelines to the respective feature types
preprocessor = ColumnTransformer(
transformers=[
(
"num",
numeric_transformer,
numerical_features,
), # Apply the numeric pipeline to numerical features
(
"cat",
categorical_transformer,
categorical_features,
), # Apply the categorical pipeline to categorical features
]
)
# Fit and transform the input data using the preprocessor
X_processed = preprocessor.fit_transform(X)
# Store the preprocessor for future use in the predict method
self.preprocessor = preprocessor
return X_processed
初始化模型
initialize_models 方法设置一个字典,其中包含各种机器学习模型及其超参数网格。这包括 RandomForest、XGBoost、DecisionTree、GradientBoosting 和 AdaBoost 等模型。此步骤对于准备集成的子模型和指定在训练期间调整的超参数至关重要,确保每个模型配置正确并准备好进行训练。
from sklearn.ensemble import (
AdaBoostRegressor,
GradientBoostingRegressor,
RandomForestRegressor,
)
from sklearn.tree import DecisionTreeRegressor
from xgboost import XGBRegressor
def initialize_models(self) -> None:
"""
Initializes a dictionary of models along with their hyperparameter grids for grid search.
"""
# Define various regression models with their respective hyperparameter grids for tuning
self.models = {
"random_forest": (
RandomForestRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [None, 10, 20]},
),
"xgboost": (
XGBRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [3, 6, 10]},
),
"decision_tree": (
DecisionTreeRegressor(random_state=42),
{"max_depth": [None, 10, 20]},
),
"gradient_boosting": (
GradientBoostingRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [3, 5, 7]},
),
"adaboost": (
AdaBoostRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "learning_rate": [0.01, 0.1, 1.0]},
),
}
定义自定义 fit 方法以训练和保存多模型
如前一个方法中已经强调的,MLflow PyFunc 模型的一个关键特性是能够定义自定义方法,为各种任务提供显著的灵活性和定制性。在多模型 PyFunc 设置中,fit 方法对于自定义和优化多个子模型至关重要。它管理 RandomForestRegressor、XGBRegressor、DecisionTreeRegressor、GradientBoostingRegressor 和 AdaBoostRegressor 等算法的训练和微调。为演示目的,使用了网格搜索(Grid Search),尽管它很简单,但计算量很大且耗时,特别是对于集成模型。为提高效率,建议使用更先进的优化方法,例如贝叶斯优化。像 Optuna 和 Hyperopt 这样的工具利用概率模型智能地导航搜索空间,显著减少了识别最佳配置所需的评估次数。
import os
import joblib
import pandas as pd
from sklearn.model_selection import GridSearchCV
def fit(
self, X_train_processed: pd.DataFrame, y_train: pd.Series, save_path: str
) -> None:
"""
Trains the ensemble of models using the provided preprocessed training data.
Args:
X_train_processed (pd.DataFrame): Preprocessed feature matrix for training.
y_train (pd.Series): Target variable for training.
save_path (str): Directory path where trained models will be saved.
"""
# Create the directory for saving models if it does not exist
os.makedirs(save_path, exist_ok=True)
# Iterate over each model and its parameter grid
for model_name, (model, param_grid) in self.models.items():
# Perform GridSearchCV to find the best hyperparameters for the current model
grid_search = GridSearchCV(
model, param_grid, cv=5, n_jobs=-1, scoring="neg_mean_squared_error"
)
grid_search.fit(
X_train_processed, y_train
) # Fit the model with the training data
# Save the best estimator from GridSearchCV
best_model = grid_search.best_estimator_
self.fitted_models[model_name] = best_model
# Save the trained model to disk
joblib.dump(best_model, os.path.join(save_path, f"{model_name}.pkl"))
定义自定义 predict 方法以聚合多模型预测
为了简化推理过程,每个 PyFunc 模型都应定义一个自定义 predict 方法作为推理的唯一入口点。这种方法在推理时抽象了模型的内部工作原理,无论是处理自定义 PyFunc 模型还是流行的 ML 框架的开箱即用的 MLflow 内置风格。
集成模型的自定义 predict 方法旨在收集和组合来自子模型的预测,支持各种聚合策略(例如,平均、加权)。该过程涉及以下步骤
- 根据用户定义的方案加载子模型预测的聚合策略。
- 加载用于推理的模型。
- 预处理输入数据。
- 从各个模型收集预测。
- 根据指定的策略聚合模型预测。
import duckdb
import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
def predict(self, context, model_input: pd.DataFrame) -> np.ndarray:
"""
Predicts the target variable using the ensemble of models based on the selected strategy.
Args:
context: MLflow context object.
model_input (pd.DataFrame): Input features for prediction.
Returns:
np.ndarray: Array of predicted values.
Raises:
ValueError: If the strategy is unknown or no models are fitted.
"""
# Check if the 'strategy' column is present in the input DataFrame
if "strategy" in model_input.columns:
# Extract the strategy and drop it from the input features
print(f"Strategy: {model_input['strategy'].iloc[0]}")
strategy = model_input["strategy"].iloc[0]
model_input.drop(columns=["strategy"], inplace=True)
else:
# Default to 'average' strategy if none is provided
strategy = "average"
# Load the strategy details from the pre-loaded strategies DataFrame
loaded_strategy = self.strategies[self.strategies["strategy"] == strategy]
if loaded_strategy.empty:
# Raise an error if the specified strategy is not found
raise ValueError(
f"Strategy '{strategy}' not found in the pre-loaded strategies."
)
# Parse the list of models to be used for prediction
model_list = loaded_strategy["model_list"].iloc[0].split(",")
# Transform input features using the preprocessor, if available
if self.preprocessor is None:
# Feature engineering is required if the preprocessor is not set
X_processed = self.feature_engineering(model_input)
else:
# Use the existing preprocessor to transform the features
X_processed = self.preprocessor.transform(model_input)
if not self.fitted_models:
# Raise an error if no models are fitted
raise ValueError("No fitted models found. Please fit the models first.")
# Collect predictions from all models specified in the strategy
predictions = np.array(
[self.fitted_models[model].predict(X_processed) for model in model_list]
)
# Apply the specified strategy to combine the model predictions
if "average" in strategy:
# Calculate the average of predictions from all models
return np.mean(predictions, axis=0)
elif "weighted" in strategy:
# Extract weights from the strategy and normalize them
weights = [float(w) for w in loaded_strategy["weights"].iloc[0].split(",")]
weights = np.array(weights)
weights /= np.sum(weights) # Ensure weights sum to 1
# Compute the weighted average of predictions
return np.average(predictions, axis=0, weights=weights)
else:
# Raise an error if an unknown strategy is encountered
raise ValueError(f"Unknown strategy: {strategy}")
定义 load_context 自定义方法以初始化 Ensemble Model
使用 mlflow.pyfunc.load_model 加载集成模型时,将执行自定义的 load_context 方法,以在推理之前处理所需的模型初始化步骤。
此初始化过程包括
- 使用包含工件引用的上下文对象加载模型工件,包括预训练的模型和预处理器。
- 从 DuckDB 数据库中获取策略定义。
import duckdb
import joblib
import pandas as pd
def load_context(self, context) -> None:
"""
Loads the preprocessor and models from the MLflow context.
Args:
context: MLflow context object which provides access to saved artifacts.
"""
# Load the preprocessor if its path is specified in the context artifacts
preprocessor_path = context.artifacts.get("preprocessor", None)
if preprocessor_path:
self.preprocessor = joblib.load(preprocessor_path)
# Load each model from the context artifacts and store it in the fitted_models dictionary
for model_name in self.models.keys():
model_path = context.artifacts.get(model_name, None)
if model_path:
self.fitted_models[model_name] = joblib.load(model_path)
else:
# Print a warning if a model is not found in the context artifacts
print(
f"Warning: {model_name} model not found in artifacts. Initialized but not fitted."
)
# Reconnect to the DuckDB database to load the strategies
conn = duckdb.connect(self.db_path)
# Fetch strategies from the DuckDB database into the strategies DataFrame
self.strategies = conn.execute("SELECT * FROM strategies").fetchdf()
# Close the database connection
conn.close()
将所有内容整合在一起
在详细探索了每个方法之后,下一步是将它们集成起来,以观察完整的实现过程。这将提供一个全面的视图,了解组件如何相互作用以实现项目的目标。
您可以使用 创建集成模型 部分中提供的骨架来组装整个 EnsembleModel 类。每个方法都已附带其特定的依赖项进行了演示。现在,您只需将这些方法组合到类定义中,遵循给出的轮廓。请随时添加任何适合您的特定用例或增强集成模型功能的自定义逻辑。
在所有内容都封装到 PyFunc 模型中之后,集成模型的生命周期就与传统 MLflow 模型非常相似。下图描绘了模型的生命周期。

MLflow 跟踪
使用 fit 方法训练子模型
数据预处理后,我们使用自定义的 fit 方法来训练集成模型中的所有子模型。此方法应用网格搜索来为每个子模型找到最佳超参数,将它们拟合到训练数据上,并保存训练好的模型以供将来使用。
注意: 对于以下代码块,如果您没有使用托管的 MLflow,可能需要设置 MLflow 跟踪服务器。在 项目组成部分 中,有一条关于设置简单的本地 MLflow 跟踪服务器的注释。对于项目的这一步,您需要将 MLflow 指向已配置并正在运行的服务器 URI。不要忘记设置服务器 URI 变量
remote_server_uri。您可以参考官方 MLflow 文档,了解有关 记录到跟踪服务器 的更多详细信息。
import datetime
import os
import joblib
import mlflow
import pandas as pd
from mlflow.models.signature import infer_signature
from sklearn.model_selection import train_test_split
# Initialize the MLflow client
client = mlflow.MlflowClient()
# Set the URI of your MLflow Tracking Server
remote_server_uri = "..." # Replace with your server URI
# Point MLflow to your MLflow Tracking Server
mlflow.set_tracking_uri(remote_server_uri)
# Set the experiment name for organizing runs in MLflow
mlflow.set_experiment("Ensemble Model")
# Load dataset from the provided URL
data = pd.read_csv(
"https://github.com/zobi123/Machine-Learning-project-with-Python/blob/master/Housing.csv?raw=true"
)
# Separate features and target variable
X = data.drop("price", axis=1)
y = data["price"]
# Split dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Create a directory to save the models and related files
os.makedirs("models", exist_ok=True)
# Initialize and train the EnsembleModel
ensemble_model = EnsembleModel()
# Preprocess the training data using the defined feature engineering method
X_train_processed = ensemble_model.feature_engineering(X_train)
# Fit the models with the preprocessed training data and save them
ensemble_model.fit(X_train_processed, y_train, save_path="models")
# Infer the model signature using a small example from the training data
example_input = X_train[:1] # Use a single sample for signature inference
example_input["strategy"] = "average"
example_output = y_train[:1]
signature = infer_signature(example_input, example_output)
# Save the preprocessing pipeline to disk
joblib.dump(ensemble_model.preprocessor, "models/preprocessor.pkl")
# Define strategies for the ensemble model
strategy_data = {
"strategy": [
"average_1",
"average_2",
"weighted_1",
"weighted_2",
"weighted_3",
"weighted_4",
],
"model_list": [
"random_forest,xgboost,decision_tree,gradient_boosting,adaboost",
"decision_tree",
"random_forest,xgboost,decision_tree,gradient_boosting,adaboost",
"random_forest,xgboost,gradient_boosting",
"decision_tree,adaboost",
"xgboost,gradient_boosting",
],
"weights": ["1", "1", "0.2,0.3,0.1,0.2,0.2", "0.4,0.4,0.2", "0.5,0.5", "0.7,0.3"],
}
# Create a DataFrame to hold the strategy information
strategies_df = pd.DataFrame(strategy_data)
# Add strategies to the database
ensemble_model.add_strategy_and_save_to_db(strategies_df, "models/strategies.db")
# Define the Conda environment configuration for the MLflow model
conda_env = {
"name": "mlflow-env",
"channels": ["conda-forge"],
"dependencies": [
"python=3.8",
"scikit-learn=1.3.0",
"xgboost=2.0.3",
"joblib=1.2.0",
"pandas=1.5.3",
"numpy=1.23.5",
"duckdb=1.0.0",
{
"pip": [
"mlflow==2.14.1",
]
},
],
}
# Get current timestamp
timestamp = datetime.datetime.now().isoformat()
# Log the model using MLflow
with mlflow.start_run(run_name=timestamp) as run:
# Log parameters, artifacts, and model signature
mlflow.log_param("model_type", "EnsembleModel")
artifacts = {
model_name: os.path.join("models", f"{model_name}.pkl")
for model_name in ensemble_model.models.keys()
}
artifacts["preprocessor"] = os.path.join("models", "preprocessor.pkl")
artifacts["strategies_db"] = os.path.join("models", "strategies.db")
mlflow.pyfunc.log_model(
artifact_path="ensemble_model",
python_model=ensemble_model,
artifacts=artifacts,
conda_env=conda_env,
signature=signature,
)
print(f"Model logged in run {run.info.run_id}")
将模型注册到 MLflow
模型训练完成后,下一步是将集成模型注册到 MLflow。此过程包括将训练好的模型、预处理管道和相关策略记录到 MLflow 跟踪服务器。这确保了集成模型的所有组件都得到系统地保存和版本化,从而便于可重现性和可追溯性。
此外,我们将为此模型的初始版本分配一个生产别名。此指定建立了一个基准模型,未来的迭代可以基于此模型进行评估。通过将此版本标记为 production 模型,我们可以有效地对改进进行基准测试,并确认后续版本比此已建立的基线带来了可衡量的进步。
# Register the model in MLflow and assign a production alias
model_uri = f"runs:/{run.info.run_id}/ensemble_model"
model_details = mlflow.register_model(model_uri=model_uri, name="ensemble_model")
client.set_registered_model_alias(
name="ensemble_model", alias="production", version=model_details.version
)
下图展示了我们的集成模型在 MLflow UI 中直到此步骤的完整生命周期。

使用 predict 方法执行推理
集成模型注册到 MLflow 模型注册表后,现在可以利用它通过聚合集成中各种子模型的预测来预测房价。
import pandas as pd
import mlflow
from sklearn.metrics import r2_score
# Load the registered model using its alias
loaded_model = mlflow.pyfunc.load_model(
model_uri=f"models:/ensemble_model@production"
)
# Define the different strategies for evaluation
strategies = [
"average_1",
"average_2",
"weighted_1",
"weighted_2",
"weighted_3",
"weighted_4",
]
# Initialize a DataFrame to store the results of predictions
results_df = pd.DataFrame()
# Iterate over each strategy, make predictions, and calculate R^2 scores
for strategy in strategies:
# Create a test DataFrame with the current strategy
X_test_with_params = X_test.copy()
X_test_with_params["strategy"] = strategy
# Use the loaded model to make predictions
y_pred = loaded_model.predict(X_test_with_params)
# Calculate R^2 score for the predictions
r2 = r2_score(y_test, y_pred)
# Store the results and R^2 score in the results DataFrame
results_df[strategy] = y_pred
results_df[f"r2_{strategy}"] = r2
# Add the actual target values to the results DataFrame
results_df["y_test"] = y_test.values
与开箱即用的 MLflow 模型类似,您首先使用 mlflow.pyfunc.load_model 加载集成模型以生成房价预测。定义聚合子模型预测的不同策略并创建包含房屋数据特征和聚合策略的模型输入后,只需调用集成模型的 predict 方法即可获得聚合的房价预测。
使用不同策略评估模型性能
为了评估我们集成模型的性能,我们计算了不同聚合策略的平均 R² 分数。这些策略包括简单的平均法和子模型的加权组合,具有不同的模型配置及其各自的权重。通过比较 R² 分数,我们可以评估哪些策略能提供最准确的预测。
下面的条形图说明了每种策略的平均 R² 分数。较高的值表示较好的预测性能。如图所示,集成策略通常优于如图中所示的第二种策略(依赖单个 DecisionTree (average_2))的单个模型,这证明了聚合来自多个子模型的预测的有效性。这种视觉比较凸显了使用集成方法的优势,特别是对于优化每个子模型贡献的加权策略。

总结
本博文重点介绍了 mlflow.pyfunc 的功能及其在机器学习项目中的应用。MLflow 的这一强大功能提供了创作自由和灵活性,使团队能够构建封装在 MLflow 中的复杂系统作为模型,遵循与传统模型相同的生命周期。该博文展示了集成模型的创建、与 DuckDB 的无缝集成以及使用此多功能模块实现自定义方法。
除了提供实现预期结果的结构化方法外,本博文还基于实践经验讨论了实际可能性,讨论了潜在的挑战及其解决方案。
附加资源
探索以下资源,以更深入地了解 MLflow PyFunc 模型



