初始化 MLflow 客户端
根据您运行此 notebook 的位置,您初始化 MLflow 客户端的配置可能会有所不同。
在此示例中,我们使用的是本地运行的跟踪服务器,但还有其他选项可用(最简单的方法是使用Databricks 免费试用版中的免费托管服务)。
有关设置跟踪服务器 URI 和配置对托管或自托管 MLflow 跟踪服务器的访问权限的更多信息,请参阅此处的运行 notebook 指南。
# NOTE: review the links mentioned above for guidance on connecting to a managed tracking server, such as the Databricks Managed MLflow
from mlflow import MlflowClient
client = MlflowClient(tracking_uri="http://127.0.0.1:8080")
使用 MLflow 客户端 API 搜索实验
让我们来看看为我们创建的默认实验。
如果我们不指定新的实验,这个安全的“回退”实验将存储我们创建的 Runs。
# Search experiments without providing query terms behaves effectively as a 'list' action
all_experiments = client.search_experiments()
print(all_experiments)
[<Experiment: artifact_location='./mlruns/0', creation_time=None, experiment_id='0', last_update_time=None, lifecycle_stage='active', name='Default', tags={}>]
from pprint import pprint
# Extract the experiment name and lifecycle_stage
default_experiment = [
{"name": experiment.name, "lifecycle_stage": experiment.lifecycle_stage}
for experiment in all_experiments
if experiment.name == "Default"
][0]
pprint(default_experiment)
{'lifecycle_stage': 'active', 'name': 'Default'}
创建一个新实验
在本节中,我们将
- 创建一个新的 MLflow 实验
- 应用元数据,以实验标签的形式
experiment_description = (
"This is the grocery forecasting project. "
"This experiment contains the produce models for apples."
)
experiment_tags = {
"project_name": "grocery-forecasting",
"store_dept": "produce",
"team": "stores-ml",
"project_quarter": "Q3-2023",
"mlflow.note.content": experiment_description,
}
produce_apples_experiment = client.create_experiment(name="Apple_Models", tags=experiment_tags)
# Use search_experiments() to search on the project_name tag key
apples_experiment = client.search_experiments(
filter_string="tags.`project_name` = 'grocery-forecasting'"
)
pprint(apples_experiment[0])
<Experiment: artifact_location='mlflow-artifacts:/977454266300166282', creation_time=1696346036899, experiment_id='977454266300166282', last_update_time=1696346036899, lifecycle_stage='active', name='Apple_Models', tags={'mlflow.note.content': 'This is the grocery forecasting project. This ' 'experiment contains the produce models for apples.', 'project_name': 'grocery-forecasting', 'project_quarter': 'Q3-2023', 'store_dept': 'produce', 'team': 'stores-ml'}>
# Access individual tag data
print(apples_experiment[0].tags["team"])
stores-ml
运行我们的第一个模型训练
在本节中,我们将
- 创建一个与简单需求预测任务相关的综合数据集
- 启动 MLflow 运行
- 将指标、参数和标签记录到运行中
- 将模型保存到运行中
- 在模型记录期间注册模型
苹果需求的合成数据生成器
请记住,这纯粹是为了演示目的。
需求值纯粹是人为的,并且故意与特征协变。 这不是一个特别现实的现实世界场景(如果是的话,我们就不需要数据科学家了!)。
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
def generate_apple_sales_data_with_promo_adjustment(base_demand: int = 1000, n_rows: int = 5000):
"""
Generates a synthetic dataset for predicting apple sales demand with seasonality and inflation.
This function creates a pandas DataFrame with features relevant to apple sales.
The features include date, average_temperature, rainfall, weekend flag, holiday flag,
promotional flag, price_per_kg, and the previous day's demand. The target variable,
'demand', is generated based on a combination of these features with some added noise.
Args:
base_demand (int, optional): Base demand for apples. Defaults to 1000.
n_rows (int, optional): Number of rows (days) of data to generate. Defaults to 5000.
Returns:
pd.DataFrame: DataFrame with features and target variable for apple sales prediction.
Example:
>>> df = generate_apple_sales_data_with_seasonality(base_demand=1200, n_rows=6000)
>>> df.head()
"""
# Set seed for reproducibility
np.random.seed(9999)
# Create date range
dates = [datetime.now() - timedelta(days=i) for i in range(n_rows)]
dates.reverse()
# Generate features
df = pd.DataFrame(
{
"date": dates,
"average_temperature": np.random.uniform(10, 35, n_rows),
"rainfall": np.random.exponential(5, n_rows),
"weekend": [(date.weekday() >= 5) * 1 for date in dates],
"holiday": np.random.choice([0, 1], n_rows, p=[0.97, 0.03]),
"price_per_kg": np.random.uniform(0.5, 3, n_rows),
"month": [date.month for date in dates],
}
)
# Introduce inflation over time (years)
df["inflation_multiplier"] = 1 + (df["date"].dt.year - df["date"].dt.year.min()) * 0.03
# Incorporate seasonality due to apple harvests
df["harvest_effect"] = np.sin(2 * np.pi * (df["month"] - 3) / 12) + np.sin(
2 * np.pi * (df["month"] - 9) / 12
)
# Modify the price_per_kg based on harvest effect
df["price_per_kg"] = df["price_per_kg"] - df["harvest_effect"] * 0.5
# Adjust promo periods to coincide with periods lagging peak harvest by 1 month
peak_months = [4, 10] # months following the peak availability
df["promo"] = np.where(
df["month"].isin(peak_months),
1,
np.random.choice([0, 1], n_rows, p=[0.85, 0.15]),
)
# Generate target variable based on features
base_price_effect = -df["price_per_kg"] * 50
seasonality_effect = df["harvest_effect"] * 50
promo_effect = df["promo"] * 200
df["demand"] = (
base_demand
+ base_price_effect
+ seasonality_effect
+ promo_effect
+ df["weekend"] * 300
+ np.random.normal(0, 50, n_rows)
) * df["inflation_multiplier"] # adding random noise
# Add previous day's demand
df["previous_days_demand"] = df["demand"].shift(1)
df["previous_days_demand"].fillna(method="bfill", inplace=True) # fill the first row
# Drop temporary columns
df.drop(columns=["inflation_multiplier", "harvest_effect", "month"], inplace=True)
return df
# Generate the dataset!
data = generate_apple_sales_data_with_promo_adjustment(base_demand=1_000, n_rows=1_000)
data[-20:]
日期 | 平均温度 | 降雨量 | 周末 | 假期 | 每公斤价格 | 促销 | 需求 | 前几天的需求 | |
---|---|---|---|---|---|---|---|---|---|
980 | 2023-09-14 11:13:56.948267 | 34.130183 | 1.454065 | 0 | 0 | 1.449177 | 0 | 971.802447 | 1001.085782 |
981 | 2023-09-15 11:13:56.948267 | 32.353643 | 9.462859 | 0 | 0 | 2.856503 | 0 | 818.951553 | 971.802447 |
982 | 2023-09-16 11:13:56.948266 | 18.816833 | 0.391470 | 1 | 0 | 1.326429 | 0 | 1281.352029 | 818.951553 |
983 | 2023-09-17 11:13:56.948265 | 34.533012 | 2.120477 | 1 | 0 | 0.970131 | 0 | 1357.385504 | 1281.352029 |
984 | 2023-09-18 11:13:56.948265 | 23.057202 | 2.365705 | 0 | 0 | 1.049931 | 0 | 991.427049 | 1357.385504 |
985 | 2023-09-19 11:13:56.948264 | 34.810165 | 3.089005 | 0 | 0 | 2.035149 | 0 | 974.971149 | 991.427049 |
986 | 2023-09-20 11:13:56.948263 | 29.208905 | 3.673292 | 0 | 0 | 2.518098 | 0 | 1056.249547 | 974.971149 |
987 | 2023-09-21 11:13:56.948263 | 16.428676 | 4.077782 | 0 | 0 | 1.268979 | 0 | 1063.118915 | 1056.249547 |
988 | 2023-09-22 11:13:56.948262 | 32.067512 | 2.734454 | 0 | 0 | 0.762317 | 0 | 1040.492007 | 1063.118915 |
989 | 2023-09-23 11:13:56.948261 | 31.938203 | 13.883486 | 1 | 0 | 1.153301 | 0 | 1285.040470 | 1040.492007 |
990 | 2023-09-24 11:13:56.948261 | 18.024055 | 7.544061 | 1 | 0 | 0.610703 | 0 | 1366.644564 | 1285.040470 |
991 | 2023-09-25 11:13:56.948260 | 20.681067 | 18.820490 | 0 | 0 | 1.533488 | 0 | 973.934924 | 1366.644564 |
992 | 2023-09-26 11:13:56.948259 | 16.010132 | 7.705941 | 0 | 0 | 1.632498 | 1 | 1188.291256 | 973.934924 |
993 | 2023-09-27 11:13:56.948259 | 18.766455 | 6.274840 | 0 | 0 | 2.806554 | 0 | 930.089438 | 1188.291256 |
994 | 2023-09-28 11:13:56.948258 | 27.948793 | 23.705246 | 0 | 0 | 0.829464 | 0 | 1060.576311 | 930.089438 |
995 | 2023-09-29 11:13:56.948257 | 28.661072 | 10.329865 | 0 | 0 | 2.290591 | 0 | 910.690776 | 1060.576311 |
996 | 2023-09-30 11:13:56.948256 | 10.821693 | 3.575645 | 1 | 0 | 0.897473 | 0 | 1306.363801 | 910.690776 |
997 | 2023-10-01 11:13:56.948256 | 21.108560 | 6.221089 | 1 | 0 | 1.093864 | 1 | 1564.422372 | 1306.363801 |
998 | 2023-10-02 11:13:56.948254 | 29.451301 | 5.021463 | 0 | 0 | 2.493085 | 1 | 1164.303256 | 1564.422372 |
999 | 2023-10-03 11:13:56.948248 | 19.261458 | 0.438381 | 0 | 0 | 2.610422 | 1 | 1067.963448 | 1164.303256 |
训练并记录模型
我们现在可以导入我们的模型类并训练一个 RandomForestRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
import mlflow
# Use the fluent API to set the tracking uri and the active experiment
mlflow.set_tracking_uri("http://127.0.0.1:8080")
# Sets the current active experiment to the "Apple_Models" experiment and returns the Experiment metadata
apple_experiment = mlflow.set_experiment("Apple_Models")
# Define a run name for this iteration of training.
# If this is not set, a unique name will be auto-generated for your run.
run_name = "apples_rf_test"
# Define an artifact path that the model will be saved to.
artifact_path = "rf_apples"
# Split the data into features and target and drop irrelevant date field and target field
X = data.drop(columns=["date", "demand"])
y = data["demand"]
# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
params = {
"n_estimators": 100,
"max_depth": 6,
"min_samples_split": 10,
"min_samples_leaf": 4,
"bootstrap": True,
"oob_score": False,
"random_state": 888,
}
# Train the RandomForestRegressor
rf = RandomForestRegressor(**params)
# Fit the model on the training data
rf.fit(X_train, y_train)
# Predict on the validation set
y_pred = rf.predict(X_val)
# Calculate error metrics
mae = mean_absolute_error(y_val, y_pred)
mse = mean_squared_error(y_val, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_val, y_pred)
# Assemble the metrics we're going to write into a collection
metrics = {"mae": mae, "mse": mse, "rmse": rmse, "r2": r2}
# Initiate the MLflow run context
with mlflow.start_run(run_name=run_name) as run:
# Log the parameters used for the model fit
mlflow.log_params(params)
# Log the error metrics that were calculated during validation
mlflow.log_metrics(metrics)
# Log an instance of the trained model for later use
mlflow.sklearn.log_model(sk_model=rf, input_example=X_val, name=artifact_path)
/Users/benjamin.wilson/miniconda3/envs/mlflow-dev-env/lib/python3.8/site-packages/mlflow/models/signature.py:333: UserWarning: Hint: Inferred schema contains integer column(s). Integer columns in Python cannot represent missing values. If your input data contains missing values at inference time, it will be encoded as floats and will cause a schema enforcement error. The best way to avoid this problem is to infer the model schema based on a realistic data sample (training dataset) that includes missing values. Alternatively, you can declare integer columns as doubles (float64) whenever these columns may have missing values. See `Handling Integers With Missing Values <https://www.mlflow.org/docs/latest/models.html#handling-integers-with-missing-values>`_ for more details. input_schema = _infer_schema(input_ex) /Users/benjamin.wilson/miniconda3/envs/mlflow-dev-env/lib/python3.8/site-packages/_distutils_hack/__init__.py:30: UserWarning: Setuptools is replacing distutils. warnings.warn("Setuptools is replacing distutils.")
成功!
您刚刚记录了您的第一个 MLflow 模型!
导航到 MLflow UI 以查看刚刚创建的运行(命名为“apples_rf_test”,记录到实验“Apple_Models”)。