MLflow Prophet 集成
简介
Prophet 是 Meta 公司推出的开源时间序列预测库,专为商业预测任务设计。它将时间序列分解为趋势、季节性和节假日效应,能够处理缺失数据和异常值,并生成可解释的预测结果。
MLflow 的 Prophet 集成提供了实验跟踪、模型版本管理和部署功能,适用于时间序列预测工作流。
Prophet 无自动日志记录功能
Prophet 不支持自动日志记录,以避免给跟踪服务器带来过大负担。时间序列预测通常涉及训练数百甚至数千个模型(例如,每个产品或地点一个模型),如果启用自动日志记录,将对跟踪服务器造成过度的负载。对于大规模预测工作流,请使用批量 API 进行手动日志记录。
为什么选择 MLflow + Prophet?
模型跟踪
记录 Prophet 模型及其参数、交叉验证指标和预测组件,以实现全面的实验跟踪。
实验比较
比较预测实验中的不同季节性配置、节假日效应和超参数组合。
预测验证
将 Prophet 的交叉验证指标直接集成到 MLflow 跟踪中,以实现可复现的模型评估。
模型注册表
使用 MLflow 的模型注册表和部署基础设施来管理 Prophet 预测模型的版本和部署。
基本模型日志记录
使用 MLflow 记录 Prophet 模型以跟踪预测实验
python
import mlflow
import mlflow.prophet
import pandas as pd
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
# Load time series data (Prophet requires 'ds' and 'y' columns)
url = "https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv"
df = pd.read_csv(url)
with mlflow.start_run():
# Create and fit Prophet model
model = Prophet(
changepoint_prior_scale=0.05,
seasonality_prior_scale=10,
yearly_seasonality=True,
weekly_seasonality=True,
)
model.fit(df)
# Log model parameters
mlflow.log_params(
{
"changepoint_prior_scale": 0.05,
"seasonality_prior_scale": 10,
"yearly_seasonality": True,
"weekly_seasonality": True,
}
)
# Cross-validation
cv_results = cross_validation(
model,
initial="730 days",
period="180 days",
horizon="365 days",
)
# Log performance metrics
metrics = performance_metrics(cv_results)
mlflow.log_metrics(metrics[["mse", "rmse", "mae", "mape"]].mean().to_dict())
# Log model
mlflow.prophet.log_model(
pr_model=model, name="prophet_model", input_example=df[["ds"]].head()
)
交叉验证跟踪
Prophet 的交叉验证结果与 MLflow 集成,用于全面的预测评估
python
def validate_prophet_model(model, df):
"""Track cross-validation across multiple forecast horizons."""
with mlflow.start_run():
# Multiple validation configurations
cv_configs = [
{
"name": "short",
"initial": "365 days",
"period": "90 days",
"horizon": "90 days",
},
{
"name": "medium",
"initial": "730 days",
"period": "180 days",
"horizon": "180 days",
},
{
"name": "long",
"initial": "1095 days",
"period": "180 days",
"horizon": "365 days",
},
]
for config in cv_configs:
cv_results = cross_validation(
model,
initial=config["initial"],
period=config["period"],
horizon=config["horizon"],
)
metrics = performance_metrics(cv_results)
avg_metrics = metrics[["mse", "rmse", "mae", "mape"]].mean()
# Log with horizon prefix
for metric, value in avg_metrics.items():
mlflow.log_metric(f"{config['name']}_{metric}", value)
超参数优化
使用 MLflow 跟踪 Prophet 超参数调优实验
python
import optuna
def objective(trial, df):
"""Optuna objective for Prophet hyperparameter tuning."""
with mlflow.start_run(nested=True):
# Define hyperparameter search space
params = {
"changepoint_prior_scale": trial.suggest_float(
"changepoint_prior_scale", 0.001, 0.5
),
"seasonality_prior_scale": trial.suggest_float(
"seasonality_prior_scale", 0.01, 10
),
"holidays_prior_scale": trial.suggest_float(
"holidays_prior_scale", 0.01, 10
),
"seasonality_mode": trial.suggest_categorical(
"seasonality_mode", ["additive", "multiplicative"]
),
}
# Train model
model = Prophet(**params)
model.fit(df)
# Cross-validation
cv_results = cross_validation(
model, initial="730 days", period="180 days", horizon="365 days"
)
metrics = performance_metrics(cv_results)
mape = metrics["mape"].mean()
# Log parameters and metrics
mlflow.log_params(params)
mlflow.log_metric("mape", mape)
return mape
# Run optimization
with mlflow.start_run(run_name="Prophet HPO"):
study = optuna.create_study(direction="minimize")
study.optimize(lambda trial: objective(trial, df), n_trials=50)
# Log best parameters
mlflow.log_params({f"best_{k}": v for k, v in study.best_params.items()})
mlflow.log_metric("best_mape", study.best_value)
模型注册表集成
注册 Prophet 模型以进行版本控制和部署
python
from mlflow import MlflowClient
client = MlflowClient()
with mlflow.start_run():
# Train and log model
model = Prophet()
model.fit(df)
model_info = mlflow.prophet.log_model(
pr_model=model,
name="prophet_model",
registered_model_name="sales_forecast_model",
)
# Tag for deployment tracking
mlflow.set_tags(
{
"model_type": "prophet",
"forecast_horizon": "365_days",
"data_frequency": "daily",
}
)
# Transition to production
client.transition_model_version_stage(
name="sales_forecast_model",
version=model_info.registered_model_version,
stage="Production",
)
模型加载和推理
加载和使用已记录的 Prophet 模型
python
# Load as native Prophet model
model_uri = "runs:/<run_id>/prophet_model"
loaded_model = mlflow.prophet.load_model(model_uri)
# Generate forecast
future = loaded_model.make_future_dataframe(periods=365)
forecast = loaded_model.predict(future)
# Load as PyFunc for generic inference
pyfunc_model = mlflow.pyfunc.load_model(model_uri)
predictions = pyfunc_model.predict(pd.DataFrame({"ds": future_dates}))
批量预测工作流
跟踪多个 Prophet 模型以进行分层预测
python
def train_hierarchical_forecasts(data_dict):
"""Train separate Prophet models for multiple series."""
with mlflow.start_run(run_name="Hierarchical Forecasting"):
for series_name, series_data in data_dict.items():
with mlflow.start_run(run_name=f"Series_{series_name}", nested=True):
model = Prophet()
model.fit(series_data)
# Log series-specific info
mlflow.log_param("series_name", series_name)
mlflow.log_param("data_points", len(series_data))
# Cross-validation
cv_results = cross_validation(
model, initial="365 days", period="90 days", horizon="180 days"
)
metrics = performance_metrics(cv_results)
mlflow.log_metrics(metrics[["mape", "rmse"]].mean().to_dict())
# Log model
mlflow.prophet.log_model(pr_model=model, name=f"model_{series_name}")
高量级模型训练
在训练大量 Prophet 模型时(例如,针对数千个产品),请使用批量日志记录以减少跟踪服务器的负载
python
# Collect metrics in batch
metrics_batch = {}
params_batch = {}
for series_name, series_data in data_dict.items():
model = Prophet()
model.fit(series_data)
# Collect metrics
cv_results = cross_validation(
model, initial="365 days", period="45 days", horizon="90 days"
)
perf_metrics = performance_metrics(cv_results)
metrics_batch[f"{series_name}_mape"] = perf_metrics["mape"].mean()
params_batch[f"{series_name}_n_points"] = len(series_data)
# Bulk log after collection
with mlflow.start_run():
mlflow.log_metrics(metrics_batch)
mlflow.log_params(params_batch)
预测组件日志记录
将 Prophet 预测组件记录为工件
python
with mlflow.start_run():
model = Prophet()
model.fit(df)
# Generate forecast
future = model.make_future_dataframe(periods=365)
forecast = model.predict(future)
# Log component plots
fig_components = model.plot_components(forecast)
mlflow.log_figure(fig_components, "forecast_components.png")
# Log forecast plot
fig_forecast = model.plot(forecast)
mlflow.log_figure(fig_forecast, "forecast_plot.png")
# Log model
mlflow.prophet.log_model(pr_model=model, name="prophet_model")