跳到主要内容

PyFunc 实践

·阅读约23分钟
Hugo Carvalho
adidas 机器学习分析师
Joana Ferreira
adidas 机器学习工程师
Rahul Pandey
Adidas 高级解决方案架构师
Filipe Miranda
adidas 高级数据工程师

如果您希望充分利用 mlflow.pyfunc 的功能并了解如何在机器学习项目中运用它,本篇博文将指导您完成整个过程。MLflow PyFunc 提供了创造性的自由和灵活性,允许将复杂系统封装为 MLflow 模型,并遵循与传统模型相同的生命周期。本博客将展示如何创建多模型设置,无缝连接到数据库,并在您的 MLflow PyFunc 模型中实现自定义的 fit 方法。

简介

本篇博文演示了 MLflow PyFunc 的功能,以及如何将其用于构建封装为 MLflow 中 PyFunc 类型模型的多模型设置。这种方法使集成模型能够遵循与 MLflow 中传统 内置模型类型 相同的生命周期。

但首先,让我们用一个类比来帮助您熟悉集成模型的概念,以及为什么您应该在下一个机器学习项目中考虑这种解决方案。

想象一下您正在市场上购买一套房子。您会仅仅根据您看的第一套房子和一位房地产经纪人的建议就做出决定吗?当然不会!购房过程需要考虑多种因素,并从各种来源收集信息,才能做出明智的决定。

购房过程解析

  • 明确您的需求:确定您想要新房还是二手房,房屋类型、型号和建造年份。
  • 研究:查找可用房屋列表,查看折扣和优惠,阅读客户评价,并征求亲友的意见。
  • 评估:考虑性能、位置、社区设施和价格范围。
  • 比较:比较多套房屋,找到最符合您需求和预算的。

简而言之,您不会直接得出结论,而是会考虑所有上述因素,然后才做出最佳选择。

机器学习中的集成模型也基于类似的思想。集成学习通过结合多个模型来提高预测性能,优于单一模型,从而改善机器学习结果。性能提升可能归因于多种因素,例如通过平均多个模型来降低方差,或通过关注先前模型的误差来减少偏差。存在多种类型的集成学习技术,例如:

  • 平均法
  • 加权平均法
  • Bagging
  • Boosting
  • Stacking

然而,开发此类系统需要仔细管理集成模型的生命周期,因为集成多样化的模型可能非常复杂。这正是 MLflow PyFunc 的宝贵之处。它提供了构建复杂系统的灵活性,将整个集成模型视为一个遵循与传统模型相同生命周期过程的模型。本质上,MLflow PyFunc 允许创建为集成模型量身定制的自定义方法,作为流行的框架(如 scikit-learn、PyTorch 和 LangChain)提供的内置 MLflow 类型的替代方案。

本博客利用 Kaggle 的房价数据集,通过 MLflow 演示集成模型的开发和管理。

我们将利用各种工具和技术来突出 MLflow PyFunc 模型的功能。在深入研究集成模型本身之前,我们将探讨这些组件如何集成以创建强大而高效的机器学习管道。

项目组件

DuckDB
DuckDB 是一个高性能的分析型数据库系统,设计宗旨是快速、可靠、便携且易于使用。在此项目中,它展示了在模型上下文内集成数据库连接,从而直接在模型内部高效处理数据。了解更多关于 DuckDB 的信息

scikit-learn (sklearn)
scikit-learn 是一个用于 Python 的机器学习库,提供了高效的数据分析和建模工具。在此项目中,它用于开发和评估集成模型中集成的各种机器学习模型。了解更多关于 scikit-learn 的信息

MLflow
MLflow 是一个开源平台,用于管理端到端的机器学习生命周期,包括实验、可复现性和部署。在此项目中,它跟踪实验、管理模型版本,并以我们熟悉单个类型的方式,促进 MLflow PyFunc 模型的部署。了解更多关于 MLflow 的信息

注意:要复现此项目,请参阅 MLflow 官方文档,了解有关设置简单本地 MLflow 追踪服务器 的更多详细信息。

创建集成模型

与使用内置类型记录和处理流行机器学习框架相比,创建 MLflow PyFunc 集成模型需要额外的步骤。

要实现集成模型,您需要定义一个 mlflow.pyfunc 模型,这涉及创建一个继承自 PythonModel 类并实现其构造函数和类方法的 Python 类。虽然 PyFunc 模型的基本创建只需要实现 predict 方法,但集成模型需要额外的管理模型和获取多模型预测的方法。实例化集成模型后,您必须使用自定义的 fit 方法来训练集成模型的子模型。与开箱即用的 MLflow 模型类似,您需要在训练运行期间记录模型及其工件,然后将模型注册到 MLflow 模型注册表。模型的 production 别名也将被添加,以简化模型更新和推理。模型别名允许您将一个可变的、命名的引用分配给已注册模型的特定版本。通过将别名分配给特定的模型版本,可以通过模型 URI 或模型注册表 API 轻松引用它。这种设置允许无缝更新用于推理的模型版本,而无需更改服务工作负载代码。欲了解更多详情,请参阅 使用别名和标签部署和组织模型

如下所示图表中的后续章节将详细介绍集成模型中每个方法的实现,全面阐释如何使用 MLflow PyFunc 定义、管理和利用集成模型。

Ensemble Model Architecture

在深入探讨每个方法的详细实现之前,让我们首先回顾一下 EnsembleModel 类的骨架。这个骨架是理解集成模型结构的蓝图。随后的章节将概述 MLflow PyFunc 提供的默认方法以及为集成模型实现的自定义方法,并提供相关代码。

以下是 EnsembleModel 类的骨架

import mlflow

class EnsembleModel(mlflow.pyfunc.PythonModel):
"""Ensemble model class leveraging Pyfunc for multi-model integration in MLflow."""

def __init__(self):
"""Initialize the EnsembleModel instance."""
...

def add_strategy_and_save_to_db(self):
"""Add strategies to the DuckDB database."""
...

def feature_engineering(self):
"""Perform feature engineering on input data."""
...

def initialize_models(self):
"""Initialize models and their hyperparameter grids."""
...

def fit(self):
"""Train the ensemble of models."""
...

def predict(self):
"""Predict using the ensemble of models."""
...

def load_context(self):
"""Load the preprocessor and models from the MLflow context."""
...

初始化 EnsembleModel

集成模型中的构造方法对于设置其基本元素至关重要。它建立关键属性,例如预处理器、用于存储训练模型的字典、DuckDB 数据库的路径以及用于管理不同集成策略的 pandas DataFrame。此外,它还利用 initialize_models 方法来定义集成中的子模型。

import pandas as pd

def __init__(self):
"""
Initializes the EnsembleModel instance.

Sets up an empty preprocessing pipeline, a dictionary for fitted models,
and a DataFrame to store strategies. Also calls the method to initialize sub-models.
"""
self.preprocessor = None
self.fitted_models = {}
self.db_path = None
self.strategies = pd.DataFrame(columns=["strategy", "model_list", "weights"])
self.initialize_models()

添加策略并保存到数据库

自定义的 add_strategy_and_save_to_db 方法支持向模型添加新的集成策略并将其存储在 DuckDB 数据库中。此方法接受包含策略的 pandas DataFrame 和数据库路径作为输入。它将新策略附加到现有策略中,并将其保存在集成模型初始化时指定的数据库中。此方法有助于管理各种集成策略并确保其持久存储以供将来使用。

import duckdb
import pandas as pd

def add_strategy_and_save_to_db(self, strategy_df: pd.DataFrame, db_path: str) -> None:
"""Add strategies from a DataFrame and save them to the DuckDB database.

Args:
strategy_df (pd.DataFrame): DataFrame containing strategies.
db_path (str): Path to the DuckDB database.
"""
# Update the instance-level database path for the current object
self.db_path = db_path

# Attempt to concatenate new strategies with the existing DataFrame
try:
self.strategies = pd.concat([self.strategies, strategy_df], ignore_index=True)
except Exception as e:
# Print an error message if any exceptions occur during concatenation
print(f"Error concatenating DataFrames: {e}")
return # Exit early to prevent further errors

# Use context manager for the database connection
try:
with duckdb.connect(self.db_path) as con:
# Register the strategies DataFrame as a temporary table in DuckDB
con.register("strategy_df", self.strategies)

# Drop any existing strategies table and create a new one with updated strategies
con.execute("DROP TABLE IF EXISTS strategies")
con.execute("CREATE TABLE strategies AS SELECT * FROM strategy_df")
except Exception as e:
# Print an error message if any exceptions occur during database operations
print(f"Error executing database operations: {e}")

以下示例演示了如何使用此方法将策略添加到数据库。

import pandas as pd

# Initialize ensemble model
ensemble_model = EnsembleModel()

# Define strategies for the ensemble model
strategy_data = {
"strategy": ["average_1"],
"model_list": ["random_forest,xgboost,decision_tree,gradient_boosting,adaboost"],
"weights": ["1"],
}

# Create a DataFrame to hold the strategy information
strategies_df = pd.DataFrame(strategy_data)

# Add strategies to the database
ensemble_model.add_strategy_and_save_to_db(strategies_df, "models/strategies.db")

DataFrame strategy_data 包含:

  • strategy(策略):模型预测策略的名称。
  • model_list(模型列表):策略中包含的模型名称的逗号分隔列表。
  • weights(权重)model_list 中分配给每个模型的逗号分隔权重列表。如果未提供,则表示等权重或默认值。
策略模型列表权重
average_1random_forest,xgboost,decision_tree,gradient_boosting,adaboost1

特征工程

feature_engineering 方法通过处理缺失值、缩放数值特征和编码分类特征来预处理输入数据。它对数值和分类特征应用不同的转换,并将处理后的特征作为 NumPy 数组返回。此方法对于准备适合模型训练的数据格式至关重要,可确保一致性并提升模型性能。

import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

def feature_engineering(self, X: pd.DataFrame) -> np.ndarray:
"""
Applies feature engineering to the input data X, including imputation, scaling, and encoding.

Args:
X (pd.DataFrame): Input features with potential categorical and numerical columns.

Returns:
np.ndarray: Processed feature array after transformations.
"""
# Convert columns with 'object' dtype to 'category' dtype for proper handling of categorical features
X = X.apply(
lambda col: col.astype("category") if col.dtypes == "object" else col
)

# Identify categorical and numerical features from the DataFrame
categorical_features = X.select_dtypes(include=["category"]).columns
numerical_features = X.select_dtypes(include=["number"]).columns

# Define the pipeline for numerical features: imputation followed by scaling
numeric_transformer = Pipeline(
steps=[
(
"imputer",
SimpleImputer(strategy="median"),
), # Replace missing values with the median
(
"scaler",
StandardScaler(),
), # Standardize features by removing the mean and scaling to unit variance
]
)

# Define the pipeline for categorical features: imputation followed by one-hot encoding
categorical_transformer = Pipeline(
steps=[
(
"imputer",
SimpleImputer(strategy="most_frequent"),
), # Replace missing values with the most frequent value
(
"onehot",
OneHotEncoder(handle_unknown="ignore"),
), # Encode categorical features as a one-hot numeric array
]
)

# Create a ColumnTransformer to apply the appropriate pipelines to the respective feature types
preprocessor = ColumnTransformer(
transformers=[
(
"num",
numeric_transformer,
numerical_features,
), # Apply the numeric pipeline to numerical features
(
"cat",
categorical_transformer,
categorical_features,
), # Apply the categorical pipeline to categorical features
]
)

# Fit and transform the input data using the preprocessor
X_processed = preprocessor.fit_transform(X)

# Store the preprocessor for future use in the predict method
self.preprocessor = preprocessor
return X_processed

初始化模型

initialize_models 方法设置了一个包含各种机器学习模型及其超参数网格的字典。这包括 RandomForestXGBoostDecisionTreeGradientBoostingAdaBoost 等模型。这一步对于准备集成的子模型并指定训练期间要调整的超参数至关重要,可确保每个模型都配置正确并为训练做好准备。

from sklearn.ensemble import (
AdaBoostRegressor,
GradientBoostingRegressor,
RandomForestRegressor,
)
from sklearn.tree import DecisionTreeRegressor
from xgboost import XGBRegressor

def initialize_models(self) -> None:
"""
Initializes a dictionary of models along with their hyperparameter grids for grid search.
"""
# Define various regression models with their respective hyperparameter grids for tuning
self.models = {
"random_forest": (
RandomForestRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [None, 10, 20]},
),
"xgboost": (
XGBRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [3, 6, 10]},
),
"decision_tree": (
DecisionTreeRegressor(random_state=42),
{"max_depth": [None, 10, 20]},
),
"gradient_boosting": (
GradientBoostingRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [3, 5, 7]},
),
"adaboost": (
AdaBoostRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "learning_rate": [0.01, 0.1, 1.0]},
),
}

定义自定义 fit 方法以训练和保存多模型

正如前面方法中已经强调的,MLflow PyFunc 模型的一个关键特性是能够定义自定义方法,为各种任务提供了显著的灵活性和定制能力。在多模型 PyFunc 设置中,fit 方法对于自定义和优化多个子模型至关重要。它管理着 RandomForestRegressorXGBRegressorDecisionTreeRegressorGradientBoostingRegressorAdaBoostRegressor 等算法的训练和微调。出于演示目的,此处使用了网格搜索(Grid Search),虽然直接,但计算密集且耗时,尤其对于集成模型而言。为了提高效率,建议采用先进的优化方法,如贝叶斯优化。像 OptunaHyperopt 这样的工具利用概率模型智能地导航搜索空间,显著减少了识别最佳配置所需的评估次数。

import os

import joblib
import pandas as pd
from sklearn.model_selection import GridSearchCV

def fit(
self, X_train_processed: pd.DataFrame, y_train: pd.Series, save_path: str
) -> None:
"""
Trains the ensemble of models using the provided preprocessed training data.

Args:
X_train_processed (pd.DataFrame): Preprocessed feature matrix for training.
y_train (pd.Series): Target variable for training.
save_path (str): Directory path where trained models will be saved.
"""
# Create the directory for saving models if it does not exist
os.makedirs(save_path, exist_ok=True)

# Iterate over each model and its parameter grid
for model_name, (model, param_grid) in self.models.items():
# Perform GridSearchCV to find the best hyperparameters for the current model
grid_search = GridSearchCV(
model, param_grid, cv=5, n_jobs=-1, scoring="neg_mean_squared_error"
)
grid_search.fit(
X_train_processed, y_train
) # Fit the model with the training data

# Save the best estimator from GridSearchCV
best_model = grid_search.best_estimator_
self.fitted_models[model_name] = best_model

# Save the trained model to disk
joblib.dump(best_model, os.path.join(save_path, f"{model_name}.pkl"))

定义自定义 predict 方法以聚合多模型预测

为了简化推理过程,每个 PyFunc 模型都应定义一个自定义的 predict 方法作为推理的单一入口点。这种方法在推理时抽象了模型的内部工作原理,无论处理的是自定义 PyFunc 模型还是开箱即用的 MLflow 内置类型模型。

集成模型的自定义 predict 方法旨在收集并结合来自子模型的预测,支持各种聚合策略(例如,平均、加权)。该过程包括以下步骤:

  1. 根据用户定义的方法加载子模型预测聚合策略。
  2. 加载用于推理的模型。
  3. 预处理输入数据。
  4. 收集单个模型的预测。
  5. 根据指定策略聚合模型预测。
import duckdb
import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

def predict(self, context, model_input: pd.DataFrame) -> np.ndarray:
"""
Predicts the target variable using the ensemble of models based on the selected strategy.

Args:
context: MLflow context object.
model_input (pd.DataFrame): Input features for prediction.

Returns:
np.ndarray: Array of predicted values.

Raises:
ValueError: If the strategy is unknown or no models are fitted.
"""
# Check if the 'strategy' column is present in the input DataFrame
if "strategy" in model_input.columns:
# Extract the strategy and drop it from the input features
print(f"Strategy: {model_input['strategy'].iloc[0]}")
strategy = model_input["strategy"].iloc[0]
model_input.drop(columns=["strategy"], inplace=True)
else:
# Default to 'average' strategy if none is provided
strategy = "average"

# Load the strategy details from the pre-loaded strategies DataFrame
loaded_strategy = self.strategies[self.strategies["strategy"] == strategy]

if loaded_strategy.empty:
# Raise an error if the specified strategy is not found
raise ValueError(
f"Strategy '{strategy}' not found in the pre-loaded strategies."
)

# Parse the list of models to be used for prediction
model_list = loaded_strategy["model_list"].iloc[0].split(",")

# Transform input features using the preprocessor, if available
if self.preprocessor is None:
# Feature engineering is required if the preprocessor is not set
X_processed = self.feature_engineering(model_input)
else:
# Use the existing preprocessor to transform the features
X_processed = self.preprocessor.transform(model_input)

if not self.fitted_models:
# Raise an error if no models are fitted
raise ValueError("No fitted models found. Please fit the models first.")

# Collect predictions from all models specified in the strategy
predictions = np.array(
[self.fitted_models[model].predict(X_processed) for model in model_list]
)

# Apply the specified strategy to combine the model predictions
if "average" in strategy:
# Calculate the average of predictions from all models
return np.mean(predictions, axis=0)
elif "weighted" in strategy:
# Extract weights from the strategy and normalize them
weights = [float(w) for w in loaded_strategy["weights"].iloc[0].split(",")]
weights = np.array(weights)
weights /= np.sum(weights) # Ensure weights sum to 1

# Compute the weighted average of predictions
return np.average(predictions, axis=0, weights=weights)
else:
# Raise an error if an unknown strategy is encountered
raise ValueError(f"Unknown strategy: {strategy}")

定义一个自定义的 load context 方法来初始化集成模型

当使用 mlflow.pyfunc.load_model 加载集成模型时,将执行自定义的 load_context 方法,以处理推理前所需的模型初始化步骤。

此初始化过程包括:

  1. 使用包含工件引用的上下文对象,加载模型工件,包括预训练模型和预处理器。
  2. 从 DuckDB 数据库获取策略定义。
import duckdb
import joblib
import pandas as pd

def load_context(self, context) -> None:
"""
Loads the preprocessor and models from the MLflow context.

Args:
context: MLflow context object which provides access to saved artifacts.
"""
# Load the preprocessor if its path is specified in the context artifacts
preprocessor_path = context.artifacts.get("preprocessor", None)
if preprocessor_path:
self.preprocessor = joblib.load(preprocessor_path)

# Load each model from the context artifacts and store it in the fitted_models dictionary
for model_name in self.models.keys():
model_path = context.artifacts.get(model_name, None)
if model_path:
self.fitted_models[model_name] = joblib.load(model_path)
else:
# Print a warning if a model is not found in the context artifacts
print(
f"Warning: {model_name} model not found in artifacts. Initialized but not fitted."
)

# Reconnect to the DuckDB database to load the strategies
conn = duckdb.connect(self.db_path)
# Fetch strategies from the DuckDB database into the strategies DataFrame
self.strategies = conn.execute("SELECT * FROM strategies").fetchdf()
# Close the database connection
conn.close()

整合所有部分

在详细探讨了每个方法之后,下一步是将其整合起来,观察完整的实现运行情况。这将提供一个全面的视角,了解组件如何交互以实现项目目标。

您可以使用 创建集成模型 部分中提供的骨架来组装完整的 EnsembleModel 类。每个方法都已演示并包含了其特定的依赖项。现在,您只需按照给定的大纲将这些方法组合到类定义中。您可以随意添加任何适合您特定用例或增强集成模型功能的自定义逻辑。

将所有内容封装到 PyFunc 模型中后,集成模型的生命周期将与传统 MLflow 模型的生命周期非常相似。下图描绘了模型的生命周期。

Ensemble Model Lifecycle

MLflow 追踪

使用 fit 方法训练子模型

数据预处理完成后,我们使用自定义的 fit 方法来训练集成模型中的所有子模型。此方法应用网格搜索为每个子模型找到最佳超参数,将其拟合到训练数据,并保存训练好的模型以备将来使用。

注意:对于以下代码块,如果您未使用托管式 MLflow,可能需要设置 MLflow 追踪服务器。在 项目组件 中,有一条关于设置简单本地 MLflow 追踪服务器的说明。对于项目的这一步,您需要将 MLflow 指向已配置且正在运行的服务器 URI。不要忘记设置服务器 URI 变量 remote_server_uri。您可以参考 MLflow 官方文档,了解有关 记录到追踪服务器 的更多详细信息。

import datetime
import os

import joblib
import mlflow
import pandas as pd
from mlflow.models.signature import infer_signature
from sklearn.model_selection import train_test_split

# Initialize the MLflow client
client = mlflow.MlflowClient()

# Set the URI of your MLflow Tracking Server
remote_server_uri = "..." # Replace with your server URI

# Point MLflow to your MLflow Tracking Server
mlflow.set_tracking_uri(remote_server_uri)

# Set the experiment name for organizing runs in MLflow
mlflow.set_experiment("Ensemble Model")

# Load dataset from the provided URL
data = pd.read_csv(
"https://github.com/zobi123/Machine-Learning-project-with-Python/blob/master/Housing.csv?raw=true"
)

# Separate features and target variable
X = data.drop("price", axis=1)
y = data["price"]

# Split dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)

# Create a directory to save the models and related files
os.makedirs("models", exist_ok=True)

# Initialize and train the EnsembleModel
ensemble_model = EnsembleModel()

# Preprocess the training data using the defined feature engineering method
X_train_processed = ensemble_model.feature_engineering(X_train)

# Fit the models with the preprocessed training data and save them
ensemble_model.fit(X_train_processed, y_train, save_path="models")

# Infer the model signature using a small example from the training data
example_input = X_train[:1] # Use a single sample for signature inference
example_input["strategy"] = "average"
example_output = y_train[:1]
signature = infer_signature(example_input, example_output)

# Save the preprocessing pipeline to disk
joblib.dump(ensemble_model.preprocessor, "models/preprocessor.pkl")

# Define strategies for the ensemble model
strategy_data = {
"strategy": [
"average_1",
"average_2",
"weighted_1",
"weighted_2",
"weighted_3",
"weighted_4",
],
"model_list": [
"random_forest,xgboost,decision_tree,gradient_boosting,adaboost",
"decision_tree",
"random_forest,xgboost,decision_tree,gradient_boosting,adaboost",
"random_forest,xgboost,gradient_boosting",
"decision_tree,adaboost",
"xgboost,gradient_boosting",
],
"weights": ["1", "1", "0.2,0.3,0.1,0.2,0.2", "0.4,0.4,0.2", "0.5,0.5", "0.7,0.3"],
}

# Create a DataFrame to hold the strategy information
strategies_df = pd.DataFrame(strategy_data)

# Add strategies to the database
ensemble_model.add_strategy_and_save_to_db(strategies_df, "models/strategies.db")

# Define the Conda environment configuration for the MLflow model
conda_env = {
"name": "mlflow-env",
"channels": ["conda-forge"],
"dependencies": [
"python=3.8",
"scikit-learn=1.3.0",
"xgboost=2.0.3",
"joblib=1.2.0",
"pandas=1.5.3",
"numpy=1.23.5",
"duckdb=1.0.0",
{
"pip": [
"mlflow==2.14.1",
]
},
],
}

# Get current timestamp
timestamp = datetime.datetime.now().isoformat()

# Log the model using MLflow
with mlflow.start_run(run_name=timestamp) as run:
# Log parameters, artifacts, and model signature
mlflow.log_param("model_type", "EnsembleModel")

artifacts = {
model_name: os.path.join("models", f"{model_name}.pkl")
for model_name in ensemble_model.models.keys()
}
artifacts["preprocessor"] = os.path.join("models", "preprocessor.pkl")
artifacts["strategies_db"] = os.path.join("models", "strategies.db")

mlflow.pyfunc.log_model(
artifact_path="ensemble_model",
python_model=ensemble_model,
artifacts=artifacts,
conda_env=conda_env,
signature=signature,
)

print(f"Model logged in run {run.info.run_id}")

在 MLflow 中注册模型

模型训练完成后,下一步是将集成模型注册到 MLflow。此过程涉及将训练好的模型、预处理管道和相关策略记录到 MLflow 追踪服务器中。这确保了集成模型的所有组件都得到系统性保存和版本控制,从而便于复现和追溯。

此外,我们将为此模型的初始版本分配一个生产别名。此指定建立了一个基线模型,未来迭代可以以此为衡量标准。通过将此版本标记为 production 模型,我们可以有效衡量改进,并确认后续版本在此既定基线之上产生了可衡量的进步。

# Register the model in MLflow and assign a production alias
model_uri = f"runs:/{run.info.run_id}/ensemble_model"
model_details = mlflow.register_model(model_uri=model_uri, name="ensemble_model")

client.set_registered_model_alias(
name="ensemble_model", alias="production", version=model_details.version
)

下图展示了在此步骤之前,集成模型在 MLflow UI 中的完整生命周期。

Ensemble Model within MLflow UI

使用 predict 方法执行推理

集成模型注册到 MLflow 模型注册表后,现在可以通过聚合集成中各种子模型的预测来预测房价。

import pandas as pd

import mlflow
from sklearn.metrics import r2_score

# Load the registered model using its alias
loaded_model = mlflow.pyfunc.load_model(
model_uri=f"models:/ensemble_model@production"
)

# Define the different strategies for evaluation
strategies = [
"average_1",
"average_2",
"weighted_1",
"weighted_2",
"weighted_3",
"weighted_4",
]

# Initialize a DataFrame to store the results of predictions
results_df = pd.DataFrame()

# Iterate over each strategy, make predictions, and calculate R^2 scores
for strategy in strategies:
# Create a test DataFrame with the current strategy
X_test_with_params = X_test.copy()
X_test_with_params["strategy"] = strategy

# Use the loaded model to make predictions
y_pred = loaded_model.predict(X_test_with_params)

# Calculate R^2 score for the predictions
r2 = r2_score(y_test, y_pred)

# Store the results and R^2 score in the results DataFrame
results_df[strategy] = y_pred
results_df[f"r2_{strategy}"] = r2

# Add the actual target values to the results DataFrame
results_df["y_test"] = y_test.values

与开箱即用的 MLflow 模型类似,您首先使用 mlflow.pyfunc.load_model 加载集成模型以生成房价预测。在定义了聚合子模型预测的不同策略并创建了包含住房数据特征和聚合策略的模型输入后,只需调用集成模型的 predict 方法即可获得聚合的房价预测。

使用不同策略评估模型性能

为了评估集成模型的性能,我们计算了不同聚合策略的平均 R² 分数。这些策略包括简单平均和子模型的加权组合,具有不同的模型配置及其各自的权重。通过比较 R² 分数,我们可以评估哪些策略提供了最准确的预测。

下面的条形图展示了每种策略的平均 R² 分数。数值越高表示预测性能越好。如图所示,集成策略通常优于单个模型,这在我们的第二种依赖于单一 DecisionTree(average_2)的策略中有所体现,证明了聚合来自多个子模型的预测的有效性。这种视觉比较强调了使用集成方法的优势,尤其是采用优化每个子模型贡献的加权策略时。

Ensemble Model Evaluation

总结

本篇博文强调了 mlflow.pyfunc 的功能及其在机器学习项目中的应用。MLflow 的这一强大功能提供了创造性的自由和灵活性,使团队能够构建封装在 MLflow 中、遵循与传统模型相同生命周期的复杂系统。本文展示了集成模型设置的创建、与 DuckDB 的无缝集成以及使用此多功能模块实现自定义方法。

除了提供实现预期结果的结构化方法外,本博客还根据实践经验展示了实际可能性,并讨论了潜在挑战及其解决方案。

更多资源

浏览以下资源以更深入地了解 MLflow PyFunc 模型: