跳到主要内容

MLflow 中的 PyTorch

PyTorch 已成为领先的深度学习框架之一,以其直观的设计、动态计算图和无缝调试能力而闻名。通过将 PyTorch 的灵活性与 MLflow 的实验跟踪相结合,您可以获得一个强大的工作流,用于开发、监控和部署机器学习模型。

为什么 PyTorch 是研究人员的最爱

动态计算设计

  • 🔄 动态计算图:即时构建和修改神经网络
  • 🐞 直观调试:像普通 Python 代码一样逐步执行代码
  • 🔬 研究优先理念:以实验和快速原型设计为核心理念
  • 🧩 Python 化接口:对 Python 开发者来说自然而熟悉

强大的生态系统

  • 🛠️ 丰富的库支持:从计算机视觉 (torchvision) 到 NLP (transformers)
  • 🚀 优化性能:支持 CUDA 的 C++ 后端用于 GPU 加速
  • 👥 活跃社区:丰富的文档、教程和预训练模型
  • 🏢 行业采用:广泛应用于学术研究和生产环境

为什么选择 MLflow + PyTorch?

MLflow 与 PyTorch 的集成创建了一个简化的深度学习实践工作流

  • 📊 全面跟踪:在一个地方捕获参数、指标、模型架构和工件
  • 🔄 可复现实验:每次训练运行都完整记录,并可精确复现
  • 📈 可视化性能分析:比较不同架构和超参数下的模型性能
  • 🏗️ 模型版本控制:在整个开发生命周期中跟踪模型的沿袭和演变
  • 👥 协作开发:通过 MLflow 直观的 UI 与团队成员共享实验和结果
  • 🚀 简化部署:打包模型以便在各种生产环境中轻松部署

将 PyTorch 实验记录到 MLflow

了解 PyTorch 自动日志记录的局限性

与其他深度学习框架不同,MLflow 不为原始 PyTorch 提供自动日志记录,因为它采用自定义训练循环范式。

替代方案:PyTorch Lightning 自动日志记录

如果您想使用 PyTorch 进行自动日志记录,Lightning 提供了一个结构化框架,可与 MLflow 的自动日志记录功能无缝协作

import mlflow
import pytorch_lightning as pl

# Enable autologging with Lightning
mlflow.pytorch.autolog()

# Define your Lightning module and train as usual
trainer = pl.Trainer(max_epochs=10)
trainer.fit(model, train_dataloader, val_dataloader)

使用 Lightning + MLflow,您可以获得

  • 🔄 自动指标日志记录:在每个 epoch 捕获训练/验证指标
  • ⚙️ 超参数跟踪:自动记录模型参数和训练配置
  • 📦 模型检查点:最佳模型保存并记录到 MLflow
  • 📊 TensorBoard 集成:通过 MLflow 访问 TensorBoard 日志

有关 Lightning 集成的更多详细信息,请参阅 MLflow Lightning 开发者指南。

手动记录 PyTorch 实验

对于标准 PyTorch 工作流,您可以使用以下关键 API 轻松地将 MLflow 日志记录集成到您的训练循环中

PyTorch 日志记录的最佳实践

初始化阶段

  • 📋 记录配置参数:在训练开始时使用 mlflow.log_params() 记录学习率、批量大小、优化器配置等。
  • 🏗️ 记录模型架构:使用 torchinfo 生成模型摘要并通过 mlflow.log_artifact() 记录
  • ⚙️ 记录依赖项:记录 PyTorch 版本和关键包以确保可复现性

训练阶段

  • 📊 批次与 Epoch 日志记录:对于长 epoch,每 N 个批次记录关键指标;否则,每个 epoch 记录
  • 📈 使用批处理日志记录:为了更好的性能,优先使用 mlflow.log_metrics() 而不是多次调用 mlflow.log_metric()
  • 🔄 跟踪训练动态:不仅记录最终指标,还记录它们在整个训练过程中的演变

完成阶段

  • 💾 保存最终模型:使用 mlflow.pytorch.log_model() 保存训练好的模型
  • 📊 记录性能可视化:创建并保存训练曲线、混淆矩阵等的图表
  • 📝 添加模型签名:包含输入/输出签名以更好地理解模型

完整的 PyTorch 日志记录示例

这是使用 MLflow 跟踪 PyTorch 实验的端到端示例

import mlflow
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchinfo import summary

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load and prepare data
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
train_dataset = datasets.FashionMNIST(
"data", train=True, download=True, transform=transform
)
test_dataset = datasets.FashionMNIST("data", train=False, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000)


# Define the model
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)

def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits


# Training parameters
params = {
"epochs": 3,
"learning_rate": 1e-3,
"batch_size": 64,
"optimizer": "SGD",
"model_type": "MLP",
"hidden_units": [512, 512],
}

# Training and logging
with mlflow.start_run():
# 1. Log parameters
mlflow.log_params(params)

# 2. Create and prepare model
model = NeuralNetwork().to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=params["learning_rate"])

# 3. Log model architecture
with open("model_summary.txt", "w") as f:
f.write(str(summary(model, input_size=(1, 1, 28, 28))))
mlflow.log_artifact("model_summary.txt")

# 4. Training loop with metric logging
for epoch in range(params["epochs"]):
model.train()
train_loss = 0
correct = 0
total = 0

for batch_idx, (data, target) in enumerate(train_loader):
data = data.to(device)
target = target.to(device)

# Forward pass
optimizer.zero_grad()
output = model(data)
loss = loss_fn(output, target)

# Backward pass
loss.backward()
optimizer.step()

# Calculate metrics
train_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()

# Log batch metrics (every 100 batches)
if batch_idx % 100 == 0:
batch_loss = train_loss / (batch_idx + 1)
batch_acc = 100.0 * correct / total
mlflow.log_metrics(
{"batch_loss": batch_loss, "batch_accuracy": batch_acc},
step=epoch * len(train_loader) + batch_idx,
)

# Calculate epoch metrics
epoch_loss = train_loss / len(train_loader)
epoch_acc = 100.0 * correct / total

# Validation
model.eval()
val_loss = 0
val_correct = 0
val_total = 0

with torch.no_grad():
for data, target in test_loader:
data = data.to(device)
target = target.to(device)
output = model(data)
loss = loss_fn(output, target)

val_loss += loss.item()
_, predicted = output.max(1)
val_total += target.size(0)
val_correct += predicted.eq(target).sum().item()

# Calculate and log epoch validation metrics
val_loss = val_loss / len(test_loader)
val_acc = 100.0 * val_correct / val_total

# Log epoch metrics
mlflow.log_metrics(
{
"train_loss": epoch_loss,
"train_accuracy": epoch_acc,
"val_loss": val_loss,
"val_accuracy": val_acc,
},
step=epoch,
)

print(
f"Epoch {epoch+1}/{params['epochs']}, "
f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.2f}%, "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%"
)

# 5. Log the trained model
model_info = mlflow.pytorch.log_model(model, name="model")

# 6. Final evaluation
model.eval()
test_loss = 0
test_correct = 0
test_total = 0

with torch.no_grad():
for data, target in test_loader:
data = data.to(device)
target = target.to(device)
output = model(data)
loss = loss_fn(output, target)

test_loss += loss.item()
_, predicted = output.max(1)
test_total += target.size(0)
test_correct += predicted.eq(target).sum().item()

# Calculate and log final test metrics
test_loss = test_loss / len(test_loader)
test_acc = 100.0 * test_correct / test_total

mlflow.log_metrics({"test_loss": test_loss, "test_accuracy": test_acc})

print(f"Final Test Accuracy: {test_acc:.2f}%")

如果您在本地 MLflow 服务器上运行此代码,您将在 MLflow UI 中看到全面的跟踪

Basic Example of PyTorch with MLflow

将 PyTorch 模型保存到 MLflow

基本模型保存

MLflow 可以轻松地保存和加载 PyTorch 模型以实现可复现的推理

import mlflow
import numpy as np
import torch
import torch.nn as nn


# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)

def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits


model = NeuralNetwork()

# Train your model (code omitted for brevity)

model_info = mlflow.pytorch.log_model(model, name="model")

# Load and use the model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

# Make predictions
sample_input = np.random.uniform(size=[1, 28, 28]).astype(np.float32)
predictions = loaded_model.predict(sample_input)
print("Predictions:", predictions)
TorchScript 兼容性

MLflow 与 TorchScript 无缝协作,TorchScript 可以优化您的模型以进行生产

# Create a scripted version of your model
scripted_model = torch.jit.script(model)

# Log the scripted model to MLflow
model_info = mlflow.pytorch.log_model(scripted_model, name="scripted_model")

# The logged model will contain the compiled graph

使用 TorchScript 和 MLflow 的优势

  • 性能优化:编译图以实现更快的推理
  • 🔒 部署安全性:受保护的模型架构用于安全部署
  • 🌉 语言互操作性:在 C++ 环境中使用模型
  • 📱 移动部署:针对资源受限设备进行优化

您可以在 MLflow UI 中查看保存的模型

Basic PyTorch Saving

模型签名

模型签名定义了输入和输出的模式,增强了模型理解并实现了验证。添加签名的最简单方法是使用自动推理

import mlflow
from mlflow.models import infer_signature
import numpy as np
import torch

# Create sample input and predictions
sample_input = np.random.uniform(size=[1, 28, 28]).astype(np.float32)

# Get model output - convert tensor to numpy
with torch.no_grad():
output = model(torch.tensor(sample_input))
sample_output = output.numpy()

# Infer signature automatically
signature = infer_signature(sample_input, sample_output)

# Log model with signature
model_info = mlflow.pytorch.log_model(model, name="model", signature=signature)
注意

截至 MLflow 2.9.1,mlflow.models.infer_signature() 的输入和输出必须是 numpy.ndarray,而不是 torch.Tensor。请务必先将张量转换为 numpy 数组。

签名将显示在 MLflow UI 中

PyTorch Model Signature

手动签名定义

为了完全控制您的模型签名,您可以手动定义输入和输出模式

import mlflow
import numpy as np
from mlflow.types import Schema, TensorSpec
from mlflow.models import ModelSignature

# Manually define input and output schemas
input_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 28, 28))])
output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 10))])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

# Log model with signature
model_info = mlflow.pytorch.log_model(model, name="model", signature=signature)

手动定义在以下情况下非常有用

  • 您需要精确控制张量规范
  • 处理复杂的输入/输出结构
  • 自动推理无法捕获您期望的模式
  • 您想预先指定确切的数据类型和形状

高级 PyTorch 跟踪

带详细指标的自定义训练循环

对于更复杂的跟踪,您可以实现自定义回调和可视化

全面跟踪与可视化
import mlflow
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix


class MLflowTracker:
def __init__(self, model, classes):
self.model = model
self.classes = classes
self.train_losses = []
self.val_losses = []
self.train_accs = []
self.val_accs = []

def log_epoch(self, epoch, train_loss, train_acc, val_loss, val_acc):
"""Log metrics for an epoch."""
self.train_losses.append(train_loss)
self.val_losses.append(val_loss)
self.train_accs.append(train_acc)
self.val_accs.append(val_acc)

mlflow.log_metrics(
{
"train_loss": train_loss,
"train_accuracy": train_acc,
"val_loss": val_loss,
"val_accuracy": val_acc,
},
step=epoch,
)

def log_confusion_matrix(self, val_loader, device):
"""Generate and log confusion matrix."""
self.model.eval()
all_preds = []
all_targets = []

with torch.no_grad():
for inputs, targets in val_loader:
inputs = inputs.to(device)
targets = targets.to(device)
outputs = self.model(inputs)
_, preds = torch.max(outputs, 1)

all_preds.extend(preds.cpu().numpy())
all_targets.extend(targets.cpu().numpy())

# Create confusion matrix
cm = confusion_matrix(all_targets, all_preds)

# Plot
plt.figure(figsize=(10, 8))
sns.heatmap(
cm,
annot=True,
fmt="d",
cmap="Blues",
xticklabels=self.classes,
yticklabels=self.classes,
)
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.tight_layout()

# Save and log
plt.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")
plt.close()

def log_training_curves(self):
"""Generate and log training curves."""
plt.figure(figsize=(12, 5))

# Loss subplot
plt.subplot(1, 2, 1)
plt.plot(self.train_losses, label="Train Loss")
plt.plot(self.val_losses, label="Validation Loss")
plt.title("Loss Curves")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

# Accuracy subplot
plt.subplot(1, 2, 2)
plt.plot(self.train_accs, label="Train Accuracy")
plt.plot(self.val_accs, label="Validation Accuracy")
plt.title("Accuracy Curves")
plt.xlabel("Epoch")
plt.ylabel("Accuracy (%)")
plt.legend()

plt.tight_layout()
plt.savefig("training_curves.png")
mlflow.log_artifact("training_curves.png")
plt.close()

在训练循环中的使用

# Initialize tracker
tracker = MLflowTracker(
model,
classes=[
"T-shirt",
"Trouser",
"Pullover",
"Dress",
"Coat",
"Sandal",
"Shirt",
"Sneaker",
"Bag",
"Ankle boot",
],
)

with mlflow.start_run():
mlflow.log_params(params)

for epoch in range(params["epochs"]):
# Training code...

# Log epoch metrics
tracker.log_epoch(epoch, train_loss, train_acc, val_loss, val_acc)

# Log final visualizations
tracker.log_confusion_matrix(test_loader, device)
tracker.log_training_curves()

# Log model
model_info = mlflow.pytorch.log_model(model, name="model")

超参数优化

将 PyTorch 与超参数优化工具结合使用,同时在 MLflow 中跟踪一切

import mlflow
import optuna
from functools import partial


def objective(trial, train_loader, val_loader, device):
# Suggest hyperparameters
lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "SGD"])
hidden_size = trial.suggest_categorical("hidden_size", [128, 256, 512])

with mlflow.start_run(nested=True):
# Log hyperparameters
params = {
"lr": lr,
"optimizer": optimizer_name,
"hidden_size": hidden_size,
"batch_size": 64,
"epochs": 3,
}
mlflow.log_params(params)

# Create model
model = NeuralNetwork(hidden_size=hidden_size).to(device)
loss_fn = nn.CrossEntropyLoss()

# Configure optimizer
if optimizer_name == "Adam":
optimizer = optim.Adam(model.parameters(), lr=lr)
else:
optimizer = optim.SGD(model.parameters(), lr=lr)

# Train for a few epochs
best_val_acc = 0
for epoch in range(params["epochs"]):
# Training code (abbreviated)...
train_loss, train_acc = train_epoch(
model, train_loader, loss_fn, optimizer, device
)
val_loss, val_acc = evaluate(model, val_loader, loss_fn, device)

mlflow.log_metrics(
{
"train_loss": train_loss,
"train_acc": train_acc,
"val_loss": val_loss,
"val_acc": val_acc,
},
step=epoch,
)

best_val_acc = max(best_val_acc, val_acc)

# Final logging
mlflow.log_metric("best_val_acc", best_val_acc)
mlflow.pytorch.log_model(model, name="model")

return best_val_acc


# Execute hyperparameter search
with mlflow.start_run(run_name="hyperparam_optimization"):
study = optuna.create_study(direction="maximize")
objective_func = partial(
objective, train_loader=train_loader, val_loader=val_loader, device=device
)
study.optimize(objective_func, n_trials=20)

# Log best parameters and score
mlflow.log_params(study.best_params)
mlflow.log_metric("best_val_accuracy", study.best_value)

实际应用

MLflow-PyTorch 集成在以下场景中表现出色

  • 🖼️ 计算机视觉:跟踪 CNN 架构、数据增强策略以及图像分类、对象检测和分割的性能
  • 📝 自然语言处理:监控 Transformer 模型、嵌入和生成质量,用于语言理解和文本生成
  • 🔊 音频处理:记录声谱图、模型性能和音频样本,用于语音识别和音乐生成
  • 🎮 强化学习:跟踪代理性能、奖励优化和环境交互
  • 🧬 科学研究:监控复杂科学应用的模式收敛和验证指标
  • 🏭 工业应用:通过完整血缘跟踪,对从开发到部署的模型进行版本控制

结论

MLflow-PyTorch 集成提供了一个全面的解决方案,用于跟踪、管理和部署深度学习实验。通过将 PyTorch 的灵活性与 MLflow 的实验跟踪功能相结合,您将创建一个工作流,它具有以下特点:

  • 🔍 透明:训练的每个方面都可见且可跟踪
  • 🔄 可复现:实验可以精确复现
  • 📊 可比较:不同的方法可以并排评估
  • 📈 可扩展:从简单的原型到复杂的生产模型
  • 👥 协作:团队成员可以分享和借鉴彼此的工作

无论您是探索新架构的研究人员,还是将模型部署到生产环境的工程师,MLflow-PyTorch 集成都为有组织、可复现和可扩展的深度学习开发奠定了基础。