MLflow 跟踪 API
MLflow 跟踪提供了跨多种编程语言的综合 API,用于捕获您的机器学习实验。无论您喜欢自动插桩还是精细控制,MLflow 都能适应您的工作流程。
选择您的方法
MLflow 提供两种主要的实验跟踪方法,每种方法都针对不同的用例进行了优化
🤖 自动日志记录 - 零配置,最大覆盖率
非常适合快速入门或在使用支持的 ML 库时。只需添加一行代码,MLflow 即可自动捕获所有内容。
import mlflow
mlflow.autolog() # That's it!
# Your existing training code works unchanged
model.fit(X_train, y_train)
自动记录的内容
- 模型参数和超参数
- 训练和验证指标
- 模型工件和检查点
- 训练图表和可视化
- 框架特定元数据
支持的库:Scikit-learn、XGBoost、LightGBM、PyTorch、Keras/TensorFlow、Spark 等。
🛠️ 手动日志记录 - 完全控制,自定义工作流程
适用于自定义训练循环、高级实验,或当您需要精确控制跟踪内容时。
- Python
- Java
- R
import mlflow
with mlflow.start_run():
# Log parameters
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("batch_size", 32)
# Your training logic here
for epoch in range(num_epochs):
train_loss = train_model()
val_loss = validate_model()
# Log metrics with step tracking
mlflow.log_metrics({"train_loss": train_loss, "val_loss": val_loss}, step=epoch)
# Log final model
mlflow.sklearn.log_model(model, name="model")
MlflowClient client = new MlflowClient();
RunInfo run = client.createRun();
// Log parameters
client.logParam(run.getRunId(), "learning_rate", "0.01");
client.logParam(run.getRunId(), "batch_size", "32");
// Log metrics with timesteps
for (int epoch = 0; epoch < numEpochs; epoch++) {
double trainLoss = trainModel();
client.logMetric(run.getRunId(), "train_loss", trainLoss,
System.currentTimeMillis(), epoch);
}
library(mlflow)
with(mlflow_start_run(), {
# Log parameters
mlflow_log_param("learning_rate", 0.01)
mlflow_log_param("batch_size", 32)
# Training loop
for (epoch in 1:num_epochs) {
train_loss <- train_model()
mlflow_log_metric("train_loss", train_loss, step = epoch)
}
})
核心日志记录函数
设置与配置
函数 | 目的 | 示例 |
---|---|---|
mlflow.set_tracking_uri() | 连接到跟踪服务器或数据库 | mlflow.set_tracking_uri("https://:5000") |
mlflow.get_tracking_uri() | 获取当前跟踪 URI | uri = mlflow.get_tracking_uri() |
mlflow.create_experiment() | 创建新实验 | exp_id = mlflow.create_experiment("my-experiment") |
mlflow.set_experiment() | 设置活动实验 | mlflow.set_experiment("fraud-detection") |
运行管理
函数 | 目的 | 示例 |
---|---|---|
mlflow.start_run() | 启动新运行(带上下文管理器) | with mlflow.start_run(): ... |
mlflow.end_run() | 结束当前运行 | mlflow.end_run(status="FINISHED") |
mlflow.active_run() | 获取当前活动运行 | run = mlflow.active_run() |
mlflow.last_active_run() | 获取上次完成的运行 | last_run = mlflow.last_active_run() |
数据日志记录
函数 | 目的 | 示例 |
---|---|---|
mlflow.log_param() / mlflow.log_params() | 记录超参数 | mlflow.log_param("lr", 0.01) |
mlflow.log_metric() / mlflow.log_metrics() | 记录性能指标 | mlflow.log_metric("accuracy", 0.95, step=10) |
mlflow.log_input() | 记录数据集信息 | mlflow.log_input(dataset) |
mlflow.set_tag() / mlflow.set_tags() | 添加元数据标签 | mlflow.set_tag("model_type", "CNN") |
工件管理
函数 | 目的 | 示例 |
---|---|---|
mlflow.log_artifact() | 记录单个文件/目录 | mlflow.log_artifact("model.pkl") |
mlflow.log_artifacts() | 记录整个目录 | mlflow.log_artifacts("./plots/") |
mlflow.get_artifact_uri() | 获取工件存储位置 | uri = mlflow.get_artifact_uri() |
模型管理 (MLflow 3 新增)
函数 | 目的 | 示例 |
---|---|---|
mlflow.initialize_logged_model() | 初始化处于 PENDING 状态的已记录模型 | model = mlflow.initialize_logged_model(name="my_model") |
mlflow.create_external_model() | 创建外部模型(工件存储在 MLflow 之外) | model = mlflow.create_external_model(name="agent") |
mlflow.finalize_logged_model() | 将模型状态更新为 READY 或 FAILED | mlflow.finalize_logged_model(model_id, "READY") |
mlflow.get_logged_model() | 按 ID 检索已记录模型 | model = mlflow.get_logged_model(model_id) |
mlflow.last_logged_model() | 获取最近记录的模型 | model = mlflow.last_logged_model() |
mlflow.search_logged_models() | 搜索已记录模型 | models = mlflow.search_logged_models(filter_string="name='my_model'") |
mlflow.log_model_params() | 将参数记录到特定模型 | mlflow.log_model_params({"param": "value"}, model_id) |
mlflow.set_logged_model_tags() | 设置已记录模型的标签 | mlflow.set_logged_model_tags(model_id, {"key": "value"}) |
mlflow.delete_logged_model_tag() | 从已记录模型中删除标签 | mlflow.delete_logged_model_tag(model_id, "key") |
活动模型管理 (MLflow 3 新增)
函数 | 目的 | 示例 |
---|---|---|
mlflow.set_active_model() | 设置活动模型以进行跟踪链接 | mlflow.set_active_model(name="my_model") |
mlflow.get_active_model_id() | 获取当前活动模型 ID | model_id = mlflow.get_active_model_id() |
mlflow.clear_active_model() | 清除活动模型 | mlflow.clear_active_model() |
特定语言的 API 覆盖范围
功能 | Python | Java | R | REST API |
---|---|---|---|---|
基本日志记录 | ✅ 完整 | ✅ 完整 | ✅ 完整 | ✅ 完整 |
自动日志记录 | ✅ 15+ 库 | ❌ 不可用 | ✅ 有限 | ❌ 不可用 |
模型日志记录 | ✅ 20+ 风格 | ✅ 基本支持 | ✅ 基本支持 | ✅ 通过工件 |
已记录模型管理 | ✅ 完整 (MLflow 3) | ❌ 不可用 | ❌ 不可用 | ✅ 基本 |
数据集跟踪 | ✅ 完整 | ✅ 基本 | ✅ 基本 | ✅ 基本 |
搜索与查询 | ✅ 高级 | ✅ 基本 | ✅ 基本 | ✅ 完整 |
API 对等
Python API 提供了最全面的功能集。Java 和 R API 提供核心功能,并且每个版本都在持续添加新功能。
高级跟踪模式
使用已记录的模型 (MLflow 3 新增)
MLflow 3 引入了强大的已记录模型管理功能,用于独立于运行跟踪模型
创建和管理外部模型
适用于存储在 MLflow 之外的模型(例如部署的代理或外部模型工件)
import mlflow
# Create an external model for tracking without storing artifacts in MLflow
model = mlflow.create_external_model(
name="chatbot_agent",
model_type="agent",
tags={"version": "v1.0", "environment": "production"},
)
# Log parameters specific to this model
mlflow.log_model_params(
{"temperature": "0.7", "max_tokens": "1000"}, model_id=model.model_id
)
# Set as active model for automatic trace linking
mlflow.set_active_model(model_id=model.model_id)
@mlflow.trace
def chat_with_agent(message):
# This trace will be automatically linked to the active model
return agent.chat(message)
# Traces are now linked to your external model
traces = mlflow.search_traces(model_id=model.model_id)
高级模型生命周期管理
适用于需要自定义准备或验证的模型
import mlflow
from mlflow.entities import LoggedModelStatus
# Initialize model in PENDING state
model = mlflow.initialize_logged_model(
name="custom_neural_network",
model_type="neural_network",
tags={"architecture": "transformer", "dataset": "custom"},
)
try:
# Custom model preparation logic
train_model()
validate_model()
# Save model artifacts using standard MLflow model logging
mlflow.pytorch.log_model(
pytorch_model=model_instance,
name="model",
model_id=model.model_id, # Link to the logged model
)
# Finalize model as READY
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.READY)
except Exception as e:
# Mark model as FAILED if issues occur
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.FAILED)
raise
# Retrieve and work with the logged model
final_model = mlflow.get_logged_model(model.model_id)
print(f"Model {final_model.name} is {final_model.status}")
搜索和查询已记录模型
# Find all production-ready transformer models
production_models = mlflow.search_logged_models(
filter_string="tags.environment = 'production' AND model_type = 'transformer'",
order_by=[{"field_name": "creation_time", "ascending": False}],
output_format="pandas",
)
# Search for models with specific performance metrics
high_accuracy_models = mlflow.search_logged_models(
filter_string="metrics.accuracy > 0.95",
datasets=[{"dataset_name": "test_set"}], # Only consider test set metrics
max_results=10,
)
# Get the most recently logged model in current session
latest_model = mlflow.last_logged_model()
if latest_model:
print(f"Latest model: {latest_model.name} (ID: {latest_model.model_id})")
精确指标跟踪
通过自定义时间戳和步骤精确控制何时以及如何记录指标
import time
from datetime import datetime
# Log with custom step (training iteration/epoch)
for epoch in range(100):
loss = train_epoch()
mlflow.log_metric("train_loss", loss, step=epoch)
# Log with custom timestamp
now = int(time.time() * 1000) # MLflow expects milliseconds
mlflow.log_metric("inference_latency", latency, timestamp=now)
# Log with both step and timestamp
mlflow.log_metric("gpu_utilization", gpu_usage, step=epoch, timestamp=now)
步骤要求
- 必须是有效的 64 位整数
- 可以是负数或乱序
- 支持序列中的间隔(例如 1、5、75、-20)
实验组织
组织您的实验以便于比较和分析
# Method 1: Environment variables
import os
os.environ["MLFLOW_EXPERIMENT_NAME"] = "fraud-detection-v2"
# Method 2: Explicit experiment setting
mlflow.set_experiment("hyperparameter-tuning")
# Method 3: Create with custom configuration
experiment_id = mlflow.create_experiment(
"production-models",
artifact_location="s3://my-bucket/experiments/",
tags={"team": "data-science", "environment": "prod"},
)
具有父子关系的层次化运行
组织复杂的实验,如超参数扫描或交叉验证
# Parent run for the entire experiment
with mlflow.start_run(run_name="hyperparameter_sweep") as parent_run:
mlflow.log_param("search_strategy", "random")
best_score = 0
best_params = {}
# Child runs for each parameter combination
for lr in [0.001, 0.01, 0.1]:
for batch_size in [16, 32, 64]:
with mlflow.start_run(
nested=True, run_name=f"lr_{lr}_bs_{batch_size}"
) as child_run:
mlflow.log_params({"learning_rate": lr, "batch_size": batch_size})
# Train and evaluate
model = train_model(lr, batch_size)
score = evaluate_model(model)
mlflow.log_metric("accuracy", score)
# Track best configuration in parent
if score > best_score:
best_score = score
best_params = {"learning_rate": lr, "batch_size": batch_size}
# Log best results to parent run
mlflow.log_params(best_params)
mlflow.log_metric("best_accuracy", best_score)
# Query child runs
child_runs = mlflow.search_runs(
filter_string=f"tags.mlflow.parentRunId = '{parent_run.info.run_id}'"
)
print("Child run results:")
print(child_runs[["run_id", "params.learning_rate", "metrics.accuracy"]])
并行执行策略
使用不同的并行化方法高效处理多个运行
- 顺序运行
- 多进程
- 多线程
非常适合简单的超参数扫描或 A/B 测试
configs = [
{"model": "RandomForest", "n_estimators": 100},
{"model": "XGBoost", "max_depth": 6},
{"model": "LogisticRegression", "C": 1.0},
]
for config in configs:
with mlflow.start_run(run_name=config["model"]):
mlflow.log_params(config)
model = train_model(config)
score = evaluate_model(model)
mlflow.log_metric("f1_score", score)
跨多个 CPU 核心扩展训练
import multiprocessing as mp
def train_with_config(config):
# Set tracking URI in each process (required for spawn method)
mlflow.set_tracking_uri("https://:5000")
mlflow.set_experiment("parallel-training")
with mlflow.start_run():
mlflow.log_params(config)
model = train_model(config)
score = evaluate_model(model)
mlflow.log_metric("accuracy", score)
return score
if __name__ == "__main__":
configs = [{"lr": lr, "bs": bs} for lr in [0.01, 0.1] for bs in [16, 32]]
with mp.Pool(processes=4) as pool:
results = pool.map(train_with_config, configs)
print(f"Completed {len(results)} experiments")
使用子运行实现线程安全的并行执行
import threading
from concurrent.futures import ThreadPoolExecutor
def train_worker(config):
with mlflow.start_run(nested=True):
mlflow.log_params(config)
model = train_model(config)
score = evaluate_model(model)
mlflow.log_metric("accuracy", score)
return score
# Start parent run
with mlflow.start_run(run_name="threaded_experiment"):
configs = [{"lr": 0.01, "epochs": e} for e in range(10, 101, 10)]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(train_worker, config) for config in configs]
results = [future.result() for future in futures]
# Log summary to parent run
mlflow.log_metric("avg_accuracy", sum(results) / len(results))
mlflow.log_metric("max_accuracy", max(results))
用于组织智能标记
策略性地使用标签来组织和过滤实验
with mlflow.start_run():
# Descriptive tags for filtering
mlflow.set_tags(
{
"model_family": "transformer",
"dataset_version": "v2.1",
"environment": "production",
"team": "nlp-research",
"gpu_type": "V100",
"experiment_phase": "hyperparameter_tuning",
}
)
# Special notes tag for documentation
mlflow.set_tag(
"mlflow.note.content",
"Baseline transformer model with attention dropout. "
"Testing different learning rate schedules.",
)
# Training code here...
按标签搜索实验
# Find all transformer experiments
transformer_runs = mlflow.search_runs(filter_string="tags.model_family = 'transformer'")
# Find production-ready models
prod_models = mlflow.search_runs(
filter_string="tags.environment = 'production' AND metrics.accuracy > 0.95"
)
系统标签参考
MLflow 自动设置几个系统标签来捕获执行上下文
标签 | 描述 | 设置时机 |
---|---|---|
mlflow.source.name | 源文件或笔记本名称 | 始终 |
mlflow.source.type | 源类型 (NOTEBOOK, JOB, LOCAL 等) | 始终 |
mlflow.user | 创建运行的用户 | 始终 |
mlflow.source.git.commit | Git 提交哈希 | 从 Git 仓库运行时 |
mlflow.source.git.branch | Git 分支名称 | 仅限 MLflow 项目 |
mlflow.parentRunId | 嵌套运行的父运行 ID | 仅限子运行 |
mlflow.docker.image.name | 使用的 Docker 镜像 | Docker 环境 |
mlflow.note.content | 用户可编辑的描述 | 仅限手动 |
专业提示
使用 mlflow.note.content
直接在 MLflow UI 中记录实验洞察、假设或结果。此标签显示在运行页面的专用“备注”部分中。
与自动日志记录集成
将自动日志记录与手动跟踪结合,实现两全其美
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
# Enable auto logging
mlflow.autolog()
with mlflow.start_run():
# Auto logging captures model training automatically
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# Add custom metrics and artifacts
predictions = model.predict(X_test)
# Log custom evaluation metrics
report = classification_report(y_test, predictions, output_dict=True)
mlflow.log_metrics(
{
"precision_macro": report["macro avg"]["precision"],
"recall_macro": report["macro avg"]["recall"],
"f1_macro": report["macro avg"]["f1-score"],
}
)
# Log custom artifacts
feature_importance = pd.DataFrame(
{"feature": feature_names, "importance": model.feature_importances_}
)
feature_importance.to_csv("feature_importance.csv")
mlflow.log_artifact("feature_importance.csv")
# Access the auto-logged run for additional processing
current_run = mlflow.active_run()
print(f"Auto-logged run ID: {current_run.info.run_id}")
# Access the completed run
last_run = mlflow.last_active_run()
print(f"Final run status: {last_run.info.status}")
特定语言指南
- Python: 完整的 Python API 参考
- Java: Java API 文档
- R: R API 文档
- REST: REST API 参考
后续步骤
- 设置 MLflow 跟踪服务器以进行团队协作
- 探索支持框架的自动日志记录
- 学习高级搜索模式进行实验分析