跳到主要内容

MLflow PyTorch 集成

简介

PyTorch 是由 Meta AI 研究实验室开发的一个开源深度学习框架。它提供了动态计算图和 Pythonic API,用于构建神经网络,这使得它在研究和生产深度学习应用中都备受欢迎。

MLflow 的 PyTorch 集成提供了实验跟踪、模型版本控制和部署功能,以支持深度学习工作流。

为什么选择 MLflow + PyTorch?

自动日志记录

一行代码即可实现全面的实验跟踪:mlflow.pytorch.autolog() 可自动记录指标、参数和模型。

实验跟踪

跟踪所有 PyTorch 实验的训练指标、超参数、模型架构和工件。

模型注册表

使用 MLflow 的模型注册表和部署基础架构来版本化、分级和部署 PyTorch 模型。

可复现性

捕获模型状态、随机种子和环境,以实现可复现的深度学习实验。

开始使用

一行代码实现全面的实验跟踪

python
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Enable autologging
mlflow.pytorch.autolog()

# Create synthetic data
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
train_loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)

# Your existing PyTorch code works unchanged
model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Training loop - metrics, parameters, and models logged automatically
for epoch in range(10):
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()

自动日志记录可自动捕获训练指标、模型参数、优化器配置和模型检查点。

PyTorch Lightning 支持

MLflow 的自动日志记录与 PyTorch Lightning 无缝协作。对于使用自定义训练循环的原生 PyTorch,请使用下方部分所示的手动日志记录。

手动日志记录

对于标准的 PyTorch 工作流,将 MLflow 日志记录集成到您的训练循环中

python
import mlflow
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader


# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 10),
)

def forward(self, x):
x = self.flatten(x)
return self.linear_relu_stack(x)


# Training parameters
params = {
"epochs": 5,
"learning_rate": 1e-3,
"batch_size": 64,
}

# Training with MLflow logging
with mlflow.start_run():
# Log parameters
mlflow.log_params(params)

# Initialize model and optimizer
model = NeuralNetwork()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=params["learning_rate"])

# Training loop
for epoch in range(params["epochs"]):
model.train()
train_loss = 0
correct = 0
total = 0

for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = loss_fn(output, target)
loss.backward()
optimizer.step()

train_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()

# Log metrics per epoch
avg_loss = train_loss / len(train_loader)
accuracy = 100.0 * correct / total

mlflow.log_metrics(
{"train_loss": avg_loss, "train_accuracy": accuracy}, step=epoch
)

# Log final model
mlflow.pytorch.log_model(model, name="model")

系统指标跟踪

跟踪训练期间的硬件资源利用率,以监控 GPU 使用情况、内存消耗和系统性能

python
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Create data and model
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
train_loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)

model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Enable system metrics logging
mlflow.enable_system_metrics_logging()

with mlflow.start_run():
mlflow.log_params({"learning_rate": 0.001, "batch_size": 32, "epochs": 10})

# Training loop - system metrics logged automatically
for epoch in range(10):
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()

mlflow.log_metric("loss", loss.item(), step=epoch)

mlflow.pytorch.log_model(model, name="model")

系统指标日志记录会自动捕获

  • GPU 指标:利用率百分比、内存使用量、温度和功耗
  • CPU 指标:利用率百分比和内存使用量
  • 磁盘 I/O:读/写吞吐量和利用率
  • 网络 I/O:网络流量统计信息

对于 PyTorch 训练,监控 GPU 指标对于以下方面尤其有价值:

  • 识别可能指示数据加载瓶颈的 GPU 利用不足
  • 在发生内存不足错误之前检测内存问题
  • 根据 GPU 内存使用情况优化批次大小
  • 比较不同模型架构之间的资源效率
高级配置

您可以自定义系统指标收集频率和行为。有关详细配置选项,请参阅系统指标文档

带签名的模型日志记录

记录带有输入/输出签名的 PyTorch 模型,以更好地理解模型

python
import mlflow
import torch
import torch.nn as nn
from mlflow.models import infer_signature

model = nn.Sequential(nn.Linear(10, 50), nn.ReLU(), nn.Linear(50, 1))

# Create sample input and output for signature
input_example = torch.randn(1, 10)
predictions = model(input_example)

# Infer signature from input/output
signature = infer_signature(input_example.numpy(), predictions.detach().numpy())

with mlflow.start_run():
# Log model with signature and input example
mlflow.pytorch.log_model(
model,
name="pytorch_model",
signature=signature,
input_example=input_example.numpy(),
)

检查点跟踪

使用 MLflow 3 的检查点版本控制在训练过程中跟踪模型检查点。使用 step 参数来版本化检查点并将指标链接到特定的模型版本

python
import mlflow
import torch
import torch.nn as nn
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split


# Helper function to prepare data
def prepare_data(df):
X = torch.tensor(df.iloc[:, :-1].values, dtype=torch.float32)
y = torch.tensor(df.iloc[:, -1].values, dtype=torch.long)
return X, y


# Helper function to compute accuracy
def compute_accuracy(model, X, y):
with torch.no_grad():
outputs = model(X)
_, predicted = torch.max(outputs, 1)
accuracy = (predicted == y).sum().item() / y.size(0)
return accuracy


# Define a basic PyTorch classifier
class IrisClassifier(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)

def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x


# Load Iris dataset and prepare the DataFrame
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df["target"] = iris.target

# Split into training and testing datasets
train_df, test_df = train_test_split(iris_df, test_size=0.2, random_state=42)

# Prepare training data
train_dataset = mlflow.data.from_pandas(train_df, name="iris_train")
X_train, y_train = prepare_data(train_dataset.df)

# Initialize model
input_size = X_train.shape[1]
hidden_size = 16
output_size = len(iris.target_names)
model = IrisClassifier(input_size, hidden_size, output_size)

# Training configuration
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

with mlflow.start_run() as run:
# Log parameters once at the start
mlflow.log_params(
{
"n_layers": 3,
"activation": "ReLU",
"criterion": "CrossEntropyLoss",
"optimizer": "Adam",
"learning_rate": 0.01,
}
)

for epoch in range(101):
# Training step
out = model(X_train)
loss = criterion(out, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()

# Log a checkpoint every 10 epochs
if epoch % 10 == 0:
# Log model checkpoint with step parameter
model_info = mlflow.pytorch.log_model(
pytorch_model=model,
name=f"iris-checkpoint-{epoch}",
step=epoch,
input_example=X_train[:5].numpy(),
)

# Log metrics linked to this checkpoint and dataset
accuracy = compute_accuracy(model, X_train, y_train)
mlflow.log_metric(
key="train_accuracy",
value=accuracy,
step=epoch,
model_id=model_info.model_id,
dataset=train_dataset,
)

# Search and rank checkpoints by performance
ranked_checkpoints = mlflow.search_logged_models(
filter_string=f"source_run_id='{run.info.run_id}'",
order_by=[{"field_name": "metrics.train_accuracy", "ascending": False}],
output_format="list",
)

best_checkpoint = ranked_checkpoints[0]
print(f"Best checkpoint: {best_checkpoint.name}")
print(f"Accuracy: {best_checkpoint.metrics[0].value}")

这种方法使您能够

  • 使用 step 参数系统地版本化检查点
  • 使用 log_metric() 中的 model_id将指标链接到特定模型
  • 将指标与数据集关联以进行更好的跟踪
  • 使用 mlflow.search_logged_models()排名和比较检查点

模型加载

加载已记录的 PyTorch 模型以进行推理

python
# Load as PyTorch model
model_uri = "runs:/<run_id>/pytorch_model"
loaded_model = mlflow.pytorch.load_model(model_uri)

# Make predictions
input_tensor = torch.randn(5, 10)
predictions = loaded_model(input_tensor)

# Load as PyFunc for generic inference
pyfunc_model = mlflow.pyfunc.load_model(model_uri)
predictions = pyfunc_model.predict(input_tensor.numpy())

超参数优化

使用 MLflow 跟踪超参数调优实验

python
import mlflow
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Create synthetic dataset for demonstration
input_size = 784 # e.g., flattened 28x28 images
output_size = 10 # e.g., 10 classes

X_train = torch.randn(1000, input_size)
y_train = torch.randint(0, output_size, (1000,))
X_val = torch.randn(200, input_size)
y_val = torch.randint(0, output_size, (200,))

train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=32)


def train_and_evaluate(model, optimizer, train_loader, val_loader, epochs=5):
"""Simple training loop for demonstration."""
criterion = nn.CrossEntropyLoss()

for epoch in range(epochs):
model.train()
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()

# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for data, target in val_loader:
output = model(data)
val_loss += criterion(output, target).item()

return val_loss / len(val_loader)


def objective(trial):
"""Optuna objective for hyperparameter tuning."""

with mlflow.start_run(nested=True):
# Define hyperparameter search space
params = {
"learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True),
"hidden_size": trial.suggest_int("hidden_size", 32, 512),
"dropout": trial.suggest_float("dropout", 0.1, 0.5),
}

# Log parameters
mlflow.log_params(params)

# Create model
model = nn.Sequential(
nn.Linear(input_size, params["hidden_size"]),
nn.ReLU(),
nn.Dropout(params["dropout"]),
nn.Linear(params["hidden_size"], output_size),
)

# Train model
optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"])
val_loss = train_and_evaluate(model, optimizer, train_loader, val_loader)

# Log validation loss
mlflow.log_metric("val_loss", val_loss)

return val_loss


# Run optimization
with mlflow.start_run(run_name="PyTorch HPO"):
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=50)

# Log best parameters
mlflow.log_params({f"best_{k}": v for k, v in study.best_params.items()})
mlflow.log_metric("best_val_loss", study.best_value)

模型注册表集成

注册 PyTorch 模型以进行版本控制和部署

python
import mlflow
import torch.nn as nn
from mlflow import MlflowClient

client = MlflowClient()

with mlflow.start_run():
# Create a simple model for demonstration
model = nn.Sequential(
nn.Conv2d(3, 32, 3),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Flatten(),
nn.Linear(32 * 15 * 15, 10),
)

# Log model to registry
model_info = mlflow.pytorch.log_model(
model, name="pytorch_model", registered_model_name="ImageClassifier"
)

# Tag for tracking
mlflow.set_tags(
{"model_type": "cnn", "dataset": "imagenet", "framework": "pytorch"}
)

# Set alias for production deployment
client.set_registered_model_alias(
name="ImageClassifier",
alias="champion",
version=model_info.registered_model_version,
)

分布式训练

跟踪分布式 PyTorch 训练实验

python
import mlflow
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset

# Create synthetic dataset
X_train = torch.randn(1000, 784)
y_train = torch.randint(0, 10, (1000,))
train_dataset = TensorDataset(X_train, y_train)

# Simple model
model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))


def train_epoch(model, train_loader):
"""Simple training epoch for demonstration."""
model.train()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
total_loss = 0

for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()

return total_loss / len(train_loader)


def train_distributed():
# Initialize distributed training
dist.init_process_group(backend="nccl")
rank = dist.get_rank()

# Wrap model with DDP
model_ddp = DDP(model.to(rank), device_ids=[rank])

# Create distributed sampler
from torch.utils.data.distributed import DistributedSampler

sampler = DistributedSampler(
train_dataset, num_replicas=dist.get_world_size(), rank=rank
)
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)

# Only log from rank 0
if rank == 0:
mlflow.start_run()
mlflow.log_params({"world_size": dist.get_world_size(), "backend": "nccl"})

# Training loop
epochs = 10
for epoch in range(epochs):
sampler.set_epoch(epoch) # Shuffle data differently each epoch
train_loss = train_epoch(model_ddp, train_loader)

# Log metrics from rank 0 only
if rank == 0:
mlflow.log_metric("train_loss", train_loss, step=epoch)

# Save model from rank 0
if rank == 0:
mlflow.pytorch.log_model(model, name="distributed_model")
mlflow.end_run()

了解更多