使用 MLflow 的 Prophet
在本综合指南中,我们将探讨如何将 Prophet 与 MLflow 结合使用,进行时间序列预测、实验跟踪和模型部署。我们将涵盖从基本预测工作流程到高级业务场景建模和生产部署模式的所有内容。
Prophet 和 MLflow 快速入门
Prophet 与 MLflow 无缝协作,以跟踪您的预测实验
import mlflow
import mlflow.prophet
import pandas as pd
import numpy as np
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
# Load sample time series data (Prophet expects 'ds' and 'y' columns)
# This example uses the classic Peyton Manning Wikipedia page views dataset
url = "https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv"
df = pd.read_csv(url)
print(f"Data shape: {df.shape}")
print(f"Date range: {df['ds'].min()} to {df['ds'].max()}")
print(f"Data preview:\n{df.head()}")
with mlflow.start_run(run_name="Basic Prophet Forecast"):
# Create Prophet model with specific parameters
model = Prophet(
changepoint_prior_scale=0.05, # Flexibility of trend changes
seasonality_prior_scale=10, # Strength of seasonality
holidays_prior_scale=10, # Strength of holiday effects
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
)
# Fit the model
model.fit(df)
# Extract and log model parameters
def extract_prophet_params(prophet_model):
"""Extract Prophet model parameters for logging."""
from prophet.serialize import SIMPLE_ATTRIBUTES
params = {}
for attr in SIMPLE_ATTRIBUTES:
if hasattr(prophet_model, attr):
value = getattr(prophet_model, attr)
if isinstance(value, (int, float, str, bool)):
params[attr] = value
return params
params = extract_prophet_params(model)
mlflow.log_params(params)
# Create future dataframe for forecasting
future = model.make_future_dataframe(periods=365) # Forecast 1 year ahead
forecast = model.predict(future)
# Cross-validation for model evaluation
cv_results = cross_validation(
model,
initial="730 days", # Initial training period
period="180 days", # Spacing between cutoff dates
horizon="365 days", # Forecast horizon
parallel="threads", # Use threading for speed
)
# Calculate performance metrics
metrics = performance_metrics(cv_results)
avg_metrics = metrics[["mse", "rmse", "mae", "mape"]].mean().to_dict()
mlflow.log_metrics(avg_metrics)
# Log the model with input example
mlflow.prophet.log_model(
pr_model=model, name="prophet_model", input_example=df[["ds"]].head(10)
)
print(f"Model trained and logged successfully!")
print(f"Average MAPE: {avg_metrics['mape']:.2f}%")
此示例自动捕获
- 所有 Prophet 模型参数和配置
- 交叉验证性能指标
- 准备好部署的训练模型
- 用于模型文档的样本输入数据
了解 Prophet 的数据要求
Prophet 有特定的数据格式要求,理解这些要求非常重要
数据格式和准备
import pandas as pd
from datetime import datetime, timedelta
def prepare_prophet_data(data, date_col, value_col, freq="D"):
"""
Prepare data for Prophet training.
Args:
data: DataFrame with time series data
date_col: Name of date column
value_col: Name of value column
freq: Frequency of the time series
"""
# Prophet requires columns named 'ds' (datestamp) and 'y' (value)
prophet_df = data[[date_col, value_col]].copy()
prophet_df.columns = ["ds", "y"]
# Ensure ds is datetime
prophet_df["ds"] = pd.to_datetime(prophet_df["ds"])
# Sort by date
prophet_df = prophet_df.sort_values("ds").reset_index(drop=True)
# Handle missing dates if needed
if freq:
full_date_range = pd.date_range(
start=prophet_df["ds"].min(), end=prophet_df["ds"].max(), freq=freq
)
# Reindex to fill missing dates
prophet_df = prophet_df.set_index("ds").reindex(full_date_range).reset_index()
prophet_df.columns = ["ds", "y"]
# Log data quality metrics
missing_dates = prophet_df["y"].isna().sum()
print(f"Missing dates filled: {missing_dates}")
return prophet_df
# Example usage
# Assuming you have a DataFrame with 'date' and 'sales' columns
# df_prepared = prepare_prophet_data(raw_data, 'date', 'sales', freq='D')
处理不同的时间序列模式
数据准备模式
多个时间序列
def prepare_multiple_series(data, date_col, value_col, series_col):
"""Prepare multiple time series for separate Prophet models."""
results = {}
for series_name in data[series_col].unique():
series_data = data[data[series_col] == series_name]
prophet_data = prepare_prophet_data(series_data, date_col, value_col)
results[series_name] = prophet_data
return results
# Train separate models for each series
def train_multiple_prophet_models(series_dict):
"""Train Prophet models for multiple time series."""
models = {}
with mlflow.start_run(run_name="Multiple Series Forecasting"):
for series_name, data in series_dict.items():
with mlflow.start_run(run_name=f"Series_{series_name}", nested=True):
model = Prophet()
model.fit(data)
# Log series-specific metrics
mlflow.log_param("series_name", series_name)
mlflow.log_param("data_points", len(data))
models[series_name] = model
# Log individual model
mlflow.prophet.log_model(pr_model=model, name=f"model_{series_name}")
return models
不规则时间序列
def handle_irregular_timeseries(df, min_frequency="W"):
"""Handle irregular time series data."""
# Aggregate to regular frequency if needed
df["ds"] = pd.to_datetime(df["ds"])
df.set_index("ds", inplace=True)
# Resample to regular frequency
if min_frequency == "W":
df_regular = (
df.resample("W")
.agg(
{
"y": "sum", # or 'mean' depending on your use case
}
)
.reset_index()
)
elif min_frequency == "M":
df_regular = (
df.resample("M")
.agg(
{
"y": "sum",
}
)
.reset_index()
)
# Remove any remaining NaN values
df_regular = df_regular.dropna()
return df_regular
高级 Prophet 配置
季节性和趋势配置
def advanced_prophet_configuration():
"""Demonstrate advanced Prophet configuration options."""
with mlflow.start_run(run_name="Advanced Prophet Configuration"):
# Create Prophet model with advanced settings
model = Prophet(
# Trend configuration
growth="logistic", # or 'linear'
changepoints=None, # Let Prophet auto-detect, or specify dates
n_changepoints=25, # Number of potential changepoints
changepoint_range=0.8, # Proportion of history for changepoints
changepoint_prior_scale=0.05, # Flexibility of trend changes
# Seasonality configuration
yearly_seasonality="auto", # or True/False/number
weekly_seasonality="auto",
daily_seasonality="auto",
seasonality_mode="additive", # or 'multiplicative'
seasonality_prior_scale=10,
# Holiday configuration
holidays_prior_scale=10,
# Uncertainty configuration
interval_width=0.80, # Width of uncertainty intervals
uncertainty_samples=1000, # Monte Carlo samples for uncertainty
# Stan configuration
mcmc_samples=0, # Use MAP instead of MCMC
stan_backend="CMDSTANPY", # Stan backend
)
# For logistic growth, need to specify capacity
if model.growth == "logistic":
df["cap"] = df["y"].max() * 1.2 # Set capacity 20% above max observed
df["floor"] = 0 # Optional floor
# Fit the model
model.fit(df)
# Log configuration parameters
config_params = {
"growth": model.growth,
"n_changepoints": model.n_changepoints,
"changepoint_range": model.changepoint_range,
"seasonality_mode": model.seasonality_mode,
"interval_width": model.interval_width,
}
mlflow.log_params(config_params)
return model
# Usage
advanced_model = advanced_prophet_configuration()
自定义季节性和事件
def add_custom_components(model, df):
"""Add custom seasonalities and regressors to Prophet model."""
with mlflow.start_run(run_name="Custom Prophet Components"):
# Add custom seasonalities
model.add_seasonality(
name="monthly",
period=30.5, # Monthly seasonality
fourier_order=5, # Number of Fourier terms
)
model.add_seasonality(
name="quarterly", period=91.25, fourier_order=8 # Quarterly seasonality
)
# Add conditional seasonalities (e.g., different patterns for weekdays/weekends)
def is_weekend(ds):
date = pd.to_datetime(ds)
return date.weekday() >= 5
df["weekend"] = df["ds"].apply(is_weekend)
model.add_seasonality(
name="weekend_seasonality",
period=7,
fourier_order=3,
condition_name="weekend",
)
# Add external regressors
# Example: Add economic indicator or marketing spend
np.random.seed(42)
df["marketing_spend"] = np.random.normal(1000, 200, len(df))
df["economic_indicator"] = np.random.normal(50, 10, len(df))
model.add_regressor("marketing_spend", prior_scale=0.5)
model.add_regressor("economic_indicator", prior_scale=0.3)
# Fit model with custom components
model.fit(df)
# Log custom component information
mlflow.log_params(
{
"custom_seasonalities": ["monthly", "quarterly", "weekend_seasonality"],
"external_regressors": ["marketing_spend", "economic_indicator"],
"total_components": len(model.extra_seasonalities)
+ len(model.extra_regressors),
}
)
return model, df
# Usage
model_with_custom = Prophet()
model_with_custom, enhanced_df = add_custom_components(model_with_custom, df.copy())
假日和事件建模
内置假日支持
from prophet.make_holidays import make_holidays_df
def add_holiday_effects():
"""Add holiday effects to Prophet model."""
with mlflow.start_run(run_name="Holiday Modeling"):
# Create holidays dataframe for specific countries
holidays = make_holidays_df(
year_list=range(2010, 2025),
country="US", # Built-in support for many countries
)
# Add custom holidays/events
custom_events = pd.DataFrame(
{
"holiday": "black_friday",
"ds": pd.to_datetime(
["2020-11-27", "2021-11-26", "2022-11-25", "2023-11-24"]
),
"lower_window": -1, # Effect starts 1 day before
"upper_window": 2, # Effect lasts 2 days after
}
)
# Combine built-in and custom holidays
all_holidays = pd.concat([holidays, custom_events])
# Create model with holidays
model = Prophet(
holidays=all_holidays,
holidays_prior_scale=15, # Increase for stronger holiday effects
)
model.fit(df)
# Log holiday information
mlflow.log_params(
{
"country_holidays": "US",
"custom_events": ["black_friday"],
"total_holidays": len(all_holidays),
"holidays_prior_scale": 15,
}
)
return model
业务日历集成
def create_business_calendar(start_date, end_date):
"""Create business-specific calendar events."""
business_events = []
# Quarterly business reviews
for year in range(start_date.year, end_date.year + 1):
for quarter in [1, 2, 3, 4]:
if quarter == 1:
date = f"{year}-03-31"
elif quarter == 2:
date = f"{year}-06-30"
elif quarter == 3:
date = f"{year}-09-30"
else:
date = f"{year}-12-31"
business_events.append(
{
"holiday": "quarterly_review",
"ds": pd.to_datetime(date),
"lower_window": -7, # Week before
"upper_window": 0,
}
)
# Annual planning periods
for year in range(start_date.year, end_date.year + 1):
business_events.append(
{
"holiday": "annual_planning",
"ds": pd.to_datetime(f"{year}-11-15"),
"lower_window": -14, # Two weeks of planning
"upper_window": 14,
}
)
return pd.DataFrame(business_events)
模型验证和性能评估
交叉验证最佳实践
def comprehensive_model_validation(model, df):
"""Perform comprehensive Prophet model validation."""
with mlflow.start_run(run_name="Comprehensive Model Validation"):
# Multiple cross-validation configurations
cv_configs = [
{
"name": "short_horizon",
"initial": "365 days",
"period": "90 days",
"horizon": "90 days",
},
{
"name": "medium_horizon",
"initial": "730 days",
"period": "180 days",
"horizon": "180 days",
},
{
"name": "long_horizon",
"initial": "1095 days",
"period": "180 days",
"horizon": "365 days",
},
]
all_metrics = {}
for config in cv_configs:
try:
# Perform cross-validation
cv_results = cross_validation(
model,
initial=config["initial"],
period=config["period"],
horizon=config["horizon"],
parallel="threads",
)
# Calculate metrics
metrics = performance_metrics(cv_results)
avg_metrics = metrics[["mse", "rmse", "mae", "mape", "coverage"]].mean()
# Store metrics with configuration prefix
for metric, value in avg_metrics.items():
metric_name = f"{config['name']}_{metric}"
all_metrics[metric_name] = value
mlflow.log_metric(metric_name, value)
# Log additional statistics
mlflow.log_metrics(
{
f"{config['name']}_cv_folds": len(cv_results),
f"{config['name']}_mape_std": metrics["mape"].std(),
}
)
except Exception as e:
print(f"Cross-validation failed for {config['name']}: {e}")
mlflow.log_param(f"{config['name']}_error", str(e))
return all_metrics
# Usage
validation_metrics = comprehensive_model_validation(model, df)
预测质量评估
import matplotlib.pyplot as plt
import seaborn as sns
def analyze_forecast_quality(model, df):
"""Analyze forecast quality with visualizations."""
with mlflow.start_run(run_name="Forecast Quality Analysis"):
# Generate forecast
future = model.make_future_dataframe(periods=365)
if model.growth == "logistic":
future["cap"] = df["cap"].iloc[-1] # Use last known capacity
future["floor"] = df["floor"].iloc[-1] if "floor" in df.columns else 0
forecast = model.predict(future)
# Component analysis
fig = model.plot_components(forecast, figsize=(12, 8))
plt.tight_layout()
plt.savefig("forecast_components.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("forecast_components.png")
plt.close()
# Forecast plot
fig = model.plot(forecast, figsize=(12, 6))
plt.title("Prophet Forecast")
plt.tight_layout()
plt.savefig("forecast_plot.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("forecast_plot.png")
plt.close()
# Residual analysis
# Get historical predictions
historical_forecast = forecast[forecast["ds"] <= df["ds"].max()]
residuals = (
df.set_index("ds")["y"] - historical_forecast.set_index("ds")["yhat"]
)
# Plot residuals
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Residuals over time
axes[0, 0].plot(residuals.index, residuals.values)
axes[0, 0].set_title("Residuals Over Time")
axes[0, 0].set_xlabel("Date")
axes[0, 0].set_ylabel("Residual")
# Residual distribution
axes[0, 1].hist(residuals.values, bins=30, alpha=0.7)
axes[0, 1].set_title("Residual Distribution")
axes[0, 1].set_xlabel("Residual")
axes[0, 1].set_ylabel("Frequency")
# Q-Q plot
from scipy import stats
stats.probplot(residuals.values, dist="norm", plot=axes[1, 0])
axes[1, 0].set_title("Q-Q Plot")
# Residuals vs fitted
axes[1, 1].scatter(historical_forecast["yhat"], residuals.values, alpha=0.6)
axes[1, 1].set_title("Residuals vs Fitted")
axes[1, 1].set_xlabel("Fitted Values")
axes[1, 1].set_ylabel("Residuals")
plt.tight_layout()
plt.savefig("residual_analysis.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("residual_analysis.png")
plt.close()
# Calculate residual statistics
residual_stats = {
"residual_mean": residuals.mean(),
"residual_std": residuals.std(),
"residual_skewness": stats.skew(residuals.values),
"residual_kurtosis": stats.kurtosis(residuals.values),
"ljung_box_pvalue": stats.diagnostic.acorr_ljungbox(
residuals.values, lags=10, return_df=True
)["lb_pvalue"].iloc[-1],
}
mlflow.log_metrics(residual_stats)
return forecast, residual_stats
# Usage
forecast_analysis, residual_stats = analyze_forecast_quality(model, df)
超参数优化
系统参数调整
import itertools
from sklearn.metrics import mean_absolute_percentage_error
def optimize_prophet_hyperparameters(df, param_grid=None):
"""Systematic hyperparameter optimization for Prophet."""
if param_grid is None:
param_grid = {
"changepoint_prior_scale": [0.001, 0.01, 0.1, 0.5],
"seasonality_prior_scale": [0.01, 0.1, 1.0, 10.0],
"holidays_prior_scale": [0.01, 0.1, 1.0, 10.0],
"seasonality_mode": ["additive", "multiplicative"],
}
# Generate all parameter combinations
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
param_combinations = list(itertools.product(*param_values))
results = []
with mlflow.start_run(run_name="Prophet Hyperparameter Optimization"):
mlflow.log_param("total_combinations", len(param_combinations))
for i, param_combo in enumerate(param_combinations):
param_dict = dict(zip(param_names, param_combo))
with mlflow.start_run(run_name=f"Config_{i+1}", nested=True):
try:
# Create model with current parameters
model = Prophet(**param_dict)
# Time series split for validation
train_size = int(len(df) * 0.8)
train_df = df.iloc[:train_size]
test_df = df.iloc[train_size:]
# Fit model
model.fit(train_df)
# Predict on test set
future = model.make_future_dataframe(periods=len(test_df))
if model.growth == "logistic":
future["cap"] = df["cap"].iloc[-1]
forecast = model.predict(future)
test_forecast = forecast.iloc[-len(test_df) :]
# Calculate metrics
mape = mean_absolute_percentage_error(
test_df["y"], test_forecast["yhat"]
)
mae = np.mean(np.abs(test_df["y"] - test_forecast["yhat"]))
rmse = np.sqrt(np.mean((test_df["y"] - test_forecast["yhat"]) ** 2))
# Log parameters and metrics
mlflow.log_params(param_dict)
mlflow.log_metrics(
{"test_mape": mape, "test_mae": mae, "test_rmse": rmse}
)
# Store results
result = {**param_dict, "mape": mape, "mae": mae, "rmse": rmse}
results.append(result)
print(f"Config {i+1}/{len(param_combinations)}: MAPE = {mape:.4f}")
except Exception as e:
print(f"Error in configuration {i+1}: {e}")
mlflow.log_param("error", str(e))
# Find best configuration
best_result = min(results, key=lambda x: x["mape"])
# Log best configuration
mlflow.log_params({f"best_{k}": v for k, v in best_result.items()})
# Train final model with best parameters
best_params = {
k: v for k, v in best_result.items() if k not in ["mape", "mae", "rmse"]
}
final_model = Prophet(**best_params)
final_model.fit(df)
# Log final model
mlflow.prophet.log_model(
pr_model=final_model,
name="best_model",
input_example=df[["ds"]].head(),
)
return final_model, best_result, results
# Usage
best_model, best_config, all_results = optimize_prophet_hyperparameters(df)
使用 Optuna 进行高级优化
贝叶斯超参数优化
import optuna
def objective(trial, df):
"""Optuna objective function for Prophet optimization."""
# Define hyperparameter search space
params = {
"changepoint_prior_scale": trial.suggest_float(
"changepoint_prior_scale", 0.001, 1.0, log=True
),
"seasonality_prior_scale": trial.suggest_float(
"seasonality_prior_scale", 0.01, 50.0, log=True
),
"holidays_prior_scale": trial.suggest_float(
"holidays_prior_scale", 0.01, 50.0, log=True
),
"seasonality_mode": trial.suggest_categorical(
"seasonality_mode", ["additive", "multiplicative"]
),
"yearly_seasonality": trial.suggest_categorical(
"yearly_seasonality", [True, False, "auto"]
),
"weekly_seasonality": trial.suggest_categorical(
"weekly_seasonality", [True, False, "auto"]
),
"daily_seasonality": trial.suggest_categorical(
"daily_seasonality", [True, False, "auto"]
),
}
with mlflow.start_run(nested=True):
try:
# Create and train model
model = Prophet(**params)
# Time series cross-validation
cv_results = cross_validation(
model.fit(df),
initial="730 days",
period="180 days",
horizon="90 days",
parallel="threads",
)
# Calculate performance metric
metrics = performance_metrics(cv_results)
mape = metrics["mape"].mean()
# Log trial results
mlflow.log_params(params)
mlflow.log_metric("cv_mape", mape)
return mape
except Exception as e:
print(f"Trial failed: {e}")
return float("inf")
def optuna_prophet_optimization(df, n_trials=100):
"""Run Optuna optimization for Prophet."""
with mlflow.start_run(run_name="Optuna Prophet Optimization"):
# Create study
study = optuna.create_study(
direction="minimize", sampler=optuna.samplers.TPESampler(seed=42)
)
# Optimize
study.optimize(
lambda trial: objective(trial, df),
n_trials=n_trials,
show_progress_bar=True,
)
# Log best results
best_params = study.best_params
best_value = study.best_value
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_mape", best_value)
# Train final model
final_model = Prophet(**best_params)
final_model.fit(df)
mlflow.prophet.log_model(pr_model=final_model, name="optuna_best_model")
return final_model, study
# Usage
# optimized_model, study = optuna_prophet_optimization(df, n_trials=50)
模型部署和服务
模型加载和预测
def load_and_predict_prophet_model(model_uri, future_periods=30):
"""Load Prophet model and generate predictions."""
# Load model
loaded_model = mlflow.prophet.load_model(model_uri)
# Generate future dataframe
future = loaded_model.make_future_dataframe(periods=future_periods)
# Add any required regressors or caps
if hasattr(loaded_model, "extra_regressors") and loaded_model.extra_regressors:
# You would need to provide values for external regressors
# This is a simplified example
for regressor in loaded_model.extra_regressors:
future[regressor] = np.random.normal(1000, 100, len(future))
if loaded_model.growth == "logistic":
future["cap"] = 10000 # Set appropriate capacity
# Generate predictions
forecast = loaded_model.predict(future)
return forecast
# Usage
# run_id = "your_run_id_here"
# model_uri = f"runs:/{run_id}/prophet_model"
# predictions = load_and_predict_prophet_model(model_uri, future_periods=365)
生产部署模式
class ProphetForecaster:
"""Production-ready Prophet forecaster class."""
def __init__(self, model_uri):
self.model_uri = model_uri
self.model = None
self.last_training_date = None
def load_model(self):
"""Load the Prophet model."""
self.model = mlflow.prophet.load_model(self.model_uri)
if hasattr(self.model, "history"):
self.last_training_date = self.model.history["ds"].max()
def predict(self, periods=30, frequency="D", include_history=False):
"""Generate predictions."""
if self.model is None:
self.load_model()
# Generate future dataframe
future = self.model.make_future_dataframe(
periods=periods, freq=frequency, include_history=include_history
)
# Handle logistic growth
if self.model.growth == "logistic":
future["cap"] = future.get("cap", 10000) # Default capacity
# Generate forecast
forecast = self.model.predict(future)
# Return relevant columns
columns = ["ds", "yhat", "yhat_lower", "yhat_upper"]
if not include_history:
# Return only future predictions
forecast = forecast.tail(periods)
return forecast[columns]
def get_components(self, periods=30):
"""Get forecast components."""
if self.model is None:
self.load_model()
future = self.model.make_future_dataframe(periods=periods)
if self.model.growth == "logistic":
future["cap"] = future.get("cap", 10000)
forecast = self.model.predict(future)
# Extract components
components = {}
for component in ["trend", "yearly", "weekly"]:
if component in forecast.columns:
components[component] = forecast[["ds", component]].tail(periods)
return components
def check_model_freshness(self, current_date=None):
"""Check if model needs retraining."""
if current_date is None:
current_date = pd.Timestamp.now()
if self.last_training_date is None:
return False, "No training date available"
days_since_training = (current_date - self.last_training_date).days
# Define freshness threshold (e.g., 30 days)
freshness_threshold = 30
is_fresh = days_since_training < freshness_threshold
message = f"Model is {days_since_training} days old"
return is_fresh, message
# Usage
forecaster = ProphetForecaster("models:/ProphetForecastModel/Production")
predictions = forecaster.predict(periods=90)
components = forecaster.get_components(periods=90)
is_fresh, message = forecaster.check_model_freshness()
批量预测管道
def batch_prophet_predictions(model_registry_name, stage="Production"):
"""Run batch predictions for multiple time series."""
with mlflow.start_run(run_name="Batch Prophet Predictions"):
# Load production model
model_uri = f"models:/{model_registry_name}/{stage}"
model = mlflow.prophet.load_model(model_uri)
# Generate predictions for different horizons
horizons = [30, 90, 365] # Days
predictions = {}
for horizon in horizons:
future = model.make_future_dataframe(periods=horizon)
if model.growth == "logistic":
future["cap"] = 10000 # Set capacity
forecast = model.predict(future)
# Store predictions
predictions[f"{horizon}_days"] = forecast[
["ds", "yhat", "yhat_lower", "yhat_upper"]
].tail(horizon)
# Log prediction summary
pred_summary = {
f"{horizon}d_mean_forecast": forecast["yhat"].tail(horizon).mean(),
f"{horizon}d_forecast_range": forecast["yhat"].tail(horizon).max()
- forecast["yhat"].tail(horizon).min(),
}
mlflow.log_metrics(pred_summary)
# Save predictions as artifacts
for horizon, pred_df in predictions.items():
filename = f"predictions_{horizon}.csv"
pred_df.to_csv(filename, index=False)
mlflow.log_artifact(filename)
# Log batch prediction metadata
mlflow.log_params(
{
"model_uri": model_uri,
"prediction_date": pd.Timestamp.now().isoformat(),
"horizons": horizons,
}
)
return predictions
模型监控和维护
预测准确性监控
def monitor_forecast_accuracy(model_uri, actuals_df, prediction_horizon_days=30):
"""Monitor Prophet model accuracy against actual values."""
with mlflow.start_run(run_name="Forecast Accuracy Monitoring"):
# Load model
model = mlflow.prophet.load_model(model_uri)
# Generate historical predictions for comparison
cutoff_date = actuals_df["ds"].max() - pd.Timedelta(
days=prediction_horizon_days
)
historical_data = actuals_df[actuals_df["ds"] <= cutoff_date]
# Refit model on historical data
temp_model = Prophet()
temp_model.fit(historical_data)
# Generate predictions for the monitoring period
future = temp_model.make_future_dataframe(periods=prediction_horizon_days)
if temp_model.growth == "logistic":
future["cap"] = (
historical_data["cap"].iloc[-1]
if "cap" in historical_data.columns
else 10000
)
forecast = temp_model.predict(future)
# Get actual values for the prediction period
actual_values = actuals_df[actuals_df["ds"] > cutoff_date]
forecast_values = forecast[forecast["ds"] > cutoff_date]
# Align dates
merged = actual_values.merge(
forecast_values[["ds", "yhat", "yhat_lower", "yhat_upper"]], on="ds"
)
if len(merged) > 0:
# Calculate accuracy metrics
mae = np.mean(np.abs(merged["y"] - merged["yhat"]))
mape = np.mean(np.abs((merged["y"] - merged["yhat"]) / merged["y"])) * 100
rmse = np.sqrt(np.mean((merged["y"] - merged["yhat"]) ** 2))
# Coverage (percentage of actuals within prediction intervals)
coverage = (
np.mean(
(merged["y"] >= merged["yhat_lower"])
& (merged["y"] <= merged["yhat_upper"])
)
* 100
)
# Log metrics
accuracy_metrics = {
"monitoring_mae": mae,
"monitoring_mape": mape,
"monitoring_rmse": rmse,
"prediction_coverage": coverage,
}
mlflow.log_metrics(accuracy_metrics)
# Create accuracy visualization
plt.figure(figsize=(12, 6))
plt.plot(merged["ds"], merged["y"], label="Actual", marker="o")
plt.plot(merged["ds"], merged["yhat"], label="Predicted", marker="s")
plt.fill_between(
merged["ds"],
merged["yhat_lower"],
merged["yhat_upper"],
alpha=0.3,
label="Prediction Interval",
)
plt.title(f"Forecast Accuracy Monitoring (MAPE: {mape:.2f}%)")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("accuracy_monitoring.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("accuracy_monitoring.png")
plt.close()
return accuracy_metrics
else:
print("No overlapping dates found for accuracy assessment")
return None
# Usage
# accuracy_metrics = monitor_forecast_accuracy(model_uri, new_actuals_df, prediction_horizon_days=30)
自动模型重新训练
生产模型更新管道
def automated_prophet_retraining(
current_model_name, new_data, performance_threshold_mape=10.0, min_data_points=100
):
"""Automated Prophet model retraining pipeline."""
with mlflow.start_run(run_name="Automated Prophet Retraining"):
# Load current production model
current_model_uri = f"models:/{current_model_name}/Production"
try:
current_model = mlflow.prophet.load_model(current_model_uri)
mlflow.log_param("current_model_loaded", True)
except Exception as e:
print(f"Could not load current model: {e}")
current_model = None
mlflow.log_param("current_model_loaded", False)
# Data quality checks
data_quality_passed = True
quality_issues = []
# Check data quantity
if len(new_data) < min_data_points:
data_quality_passed = False
quality_issues.append(
f"Insufficient data: {len(new_data)} < {min_data_points}"
)
# Check for missing values
missing_values = new_data[["ds", "y"]].isnull().sum().sum()
if missing_values > 0:
quality_issues.append(f"Missing values found: {missing_values}")
# Check date continuity
new_data = new_data.sort_values("ds")
date_gaps = pd.to_datetime(new_data["ds"]).diff().dt.days
large_gaps = (date_gaps > 7).sum() # Gaps larger than 7 days
if large_gaps > 0:
quality_issues.append(f"Large date gaps found: {large_gaps}")
mlflow.log_params(
{
"data_quality_passed": data_quality_passed,
"data_points": len(new_data),
"quality_issues": "; ".join(quality_issues),
}
)
if not data_quality_passed:
print("Data quality checks failed. Skipping retraining.")
return None
# Train new model
new_model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
changepoint_prior_scale=0.05,
)
new_model.fit(new_data)
# Evaluate new model performance
cv_results = cross_validation(
new_model,
initial="365 days",
period="90 days",
horizon="30 days",
parallel="threads",
)
metrics = performance_metrics(cv_results)
new_mape = metrics["mape"].mean()
mlflow.log_metric("new_model_mape", new_mape)
# Compare with current model if available
should_deploy = True
if current_model is not None:
try:
# Test current model on new data
current_cv = cross_validation(
current_model,
initial="365 days",
period="90 days",
horizon="30 days",
)
current_metrics = performance_metrics(current_cv)
current_mape = current_metrics["mape"].mean()
mlflow.log_metric("current_model_mape", current_mape)
# Deploy if new model is significantly better
improvement = (current_mape - new_mape) / current_mape * 100
mlflow.log_metric("performance_improvement_percent", improvement)
should_deploy = improvement > 5.0 # Deploy if >5% improvement
except Exception as e:
print(f"Could not evaluate current model: {e}")
should_deploy = new_mape < performance_threshold_mape
else:
should_deploy = new_mape < performance_threshold_mape
mlflow.log_params(
{
"should_deploy": should_deploy,
"performance_threshold": performance_threshold_mape,
}
)
# Log and potentially deploy new model
model_info = mlflow.prophet.log_model(
pr_model=new_model,
name="retrained_model",
registered_model_name=current_model_name if should_deploy else None,
)
if should_deploy:
# Transition to production
client = mlflow.MlflowClient()
latest_version = client.get_latest_versions(
current_model_name, stages=["None"]
)[0]
client.transition_model_version_stage(
name=current_model_name,
version=latest_version.version,
stage="Production",
)
print(f"New model deployed to production with MAPE: {new_mape:.2f}%")
else:
print(
f"New model not deployed. MAPE: {new_mape:.2f}% did not meet criteria."
)
return new_model, should_deploy
最佳实践和技巧
数据准备最佳实践
def prophet_data_best_practices():
"""Demonstrate Prophet data preparation best practices."""
best_practices = {
"data_frequency": "Use consistent frequency (daily, weekly, monthly)",
"missing_values": "Prophet handles missing values, but document them",
"outliers": "Consider outlier detection and handling",
"data_volume": "Minimum 2-3 seasonal cycles (2-3 years for yearly seasonality)",
"column_names": "Always use 'ds' for dates and 'y' for values",
"date_format": "Ensure dates are properly parsed as datetime",
"timezone_handling": "Be consistent with timezone handling",
}
print("Prophet Data Preparation Best Practices:")
for practice, description in best_practices.items():
print(f"- {practice}: {description}")
return best_practices
# Data validation function
def validate_prophet_data(df):
"""Validate data for Prophet modeling."""
issues = []
recommendations = []
# Check required columns
if not all(col in df.columns for col in ["ds", "y"]):
issues.append("Missing required columns 'ds' and/or 'y'")
# Check data types
if "ds" in df.columns and not pd.api.types.is_datetime64_any_dtype(df["ds"]):
issues.append("Column 'ds' is not datetime type")
recommendations.append(
"Convert 'ds' to datetime: df['ds'] = pd.to_datetime(df['ds'])"
)
# Check for sufficient data
if len(df) < 100:
issues.append(f"Insufficient data points: {len(df)} (recommend >100)")
# Check for missing values
missing_y = df["y"].isnull().sum()
if missing_y > 0:
recommendations.append(f"Consider handling {missing_y} missing values in 'y'")
# Check for duplicate dates
if "ds" in df.columns:
duplicates = df["ds"].duplicated().sum()
if duplicates > 0:
issues.append(f"Found {duplicates} duplicate dates")
# Check data range
if "ds" in df.columns and len(df) > 0:
date_range = (df["ds"].max() - df["ds"].min()).days
if date_range < 365:
recommendations.append(
"Less than 1 year of data may limit seasonality detection"
)
return {
"issues": issues,
"recommendations": recommendations,
"data_points": len(df),
"date_range_days": date_range if "ds" in df.columns and len(df) > 0 else 0,
}
# Usage
validation_results = validate_prophet_data(df)
print("Validation Results:", validation_results)
性能优化
Prophet 性能技巧
def optimize_prophet_performance():
"""Tips for optimizing Prophet performance."""
optimization_tips = {
"parallel_processing": {
"cross_validation": "Use parallel='threads' or 'processes' in cross_validation()",
"multiple_models": "Use joblib.Parallel for training multiple models",
},
"model_configuration": {
"mcmc_samples": "Set mcmc_samples=0 for faster MAP estimation",
"uncertainty_samples": "Reduce uncertainty_samples for faster predictions",
"stan_backend": "Use 'CMDSTANPY' backend for better performance",
},
"data_preprocessing": {
"frequency": "Aggregate to appropriate frequency (daily vs hourly)",
"outliers": "Remove extreme outliers before training",
"data_size": "Consider sampling for very large datasets",
},
}
return optimization_tips
# Example of parallel cross-validation
def parallel_prophet_evaluation(models_dict, df):
"""Evaluate multiple Prophet models in parallel."""
from joblib import Parallel, delayed
def evaluate_single_model(name, model):
try:
cv_results = cross_validation(
model.fit(df),
initial="365 days",
period="90 days",
horizon="30 days",
parallel="threads",
)
metrics = performance_metrics(cv_results)
return name, metrics["mape"].mean()
except Exception as e:
return name, float("inf")
# Parallel evaluation
results = Parallel(n_jobs=-1)(
delayed(evaluate_single_model)(name, model)
for name, model in models_dict.items()
)
return dict(results)
结论
MLflow 的 Prophet 集成为时间序列预测、实验跟踪和模型部署提供了全面的解决方案。 无论您是预测业务指标、规划资源还是预测未来趋势,Prophet 直观的预测功能与 MLflow 强大的实验管理相结合,都为专业时间序列分析创建了一个强大的平台。
将 MLflow 与 Prophet 结合使用的主要优势包括
- 简化的预测工作流程: 易于模型记录和实验跟踪,适用于时间序列项目
- 全面验证: 内置的交叉验证和性能评估工具
- 业务就绪功能: 假日建模、自定义季节性和可解释的组件
- 生产部署: 模型注册表与自动重新训练功能集成
- 协作开发: 团队友好的实验共享和模型治理
本指南中的模式和示例为构建可扩展、可靠的时间序列预测系统奠定了坚实的基础。 从基本的 Prophet 模型开始以获得直接的见解,然后随着预测需求的演变,逐步采用高级功能,如自定义季节性、自动超参数调整和生产监控。
Prophet 使业务用户可以访问预测的理念,与 MLflow 的企业级实验管理相结合,为通过准确、可解释的时间序列预测做出数据驱动的决策创建了理想的平台。