MLflow PyTorch 集成
简介
PyTorch 是由 Meta AI 研究实验室开发的一个开源深度学习框架。它提供了动态计算图和 Pythonic API,用于构建神经网络,这使得它在研究和生产深度学习应用中都备受欢迎。
MLflow 的 PyTorch 集成提供了实验跟踪、模型版本控制和部署功能,以支持深度学习工作流。
为什么选择 MLflow + PyTorch?
自动日志记录
一行代码即可实现全面的实验跟踪:mlflow.pytorch.autolog() 可自动记录指标、参数和模型。
实验跟踪
跟踪所有 PyTorch 实验的训练指标、超参数、模型架构和工件。
模型注册表
使用 MLflow 的模型注册表和部署基础架构来版本化、分级和部署 PyTorch 模型。
可复现性
捕获模型状态、随机种子和环境,以实现可复现的深度学习实验。
开始使用
一行代码实现全面的实验跟踪
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
# Enable autologging
mlflow.pytorch.autolog()
# Create synthetic data
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
train_loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)
# Your existing PyTorch code works unchanged
model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# Training loop - metrics, parameters, and models logged automatically
for epoch in range(10):
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
自动日志记录可自动捕获训练指标、模型参数、优化器配置和模型检查点。
MLflow 的自动日志记录与 PyTorch Lightning 无缝协作。对于使用自定义训练循环的原生 PyTorch,请使用下方部分所示的手动日志记录。
手动日志记录
对于标准的 PyTorch 工作流,将 MLflow 日志记录集成到您的训练循环中
import mlflow
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 10),
)
def forward(self, x):
x = self.flatten(x)
return self.linear_relu_stack(x)
# Training parameters
params = {
"epochs": 5,
"learning_rate": 1e-3,
"batch_size": 64,
}
# Training with MLflow logging
with mlflow.start_run():
# Log parameters
mlflow.log_params(params)
# Initialize model and optimizer
model = NeuralNetwork()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=params["learning_rate"])
# Training loop
for epoch in range(params["epochs"]):
model.train()
train_loss = 0
correct = 0
total = 0
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = loss_fn(output, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
# Log metrics per epoch
avg_loss = train_loss / len(train_loader)
accuracy = 100.0 * correct / total
mlflow.log_metrics(
{"train_loss": avg_loss, "train_accuracy": accuracy}, step=epoch
)
# Log final model
mlflow.pytorch.log_model(model, name="model")
系统指标跟踪
跟踪训练期间的硬件资源利用率,以监控 GPU 使用情况、内存消耗和系统性能
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
# Create data and model
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
train_loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)
model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# Enable system metrics logging
mlflow.enable_system_metrics_logging()
with mlflow.start_run():
mlflow.log_params({"learning_rate": 0.001, "batch_size": 32, "epochs": 10})
# Training loop - system metrics logged automatically
for epoch in range(10):
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
mlflow.log_metric("loss", loss.item(), step=epoch)
mlflow.pytorch.log_model(model, name="model")
系统指标日志记录会自动捕获
- GPU 指标:利用率百分比、内存使用量、温度和功耗
- CPU 指标:利用率百分比和内存使用量
- 磁盘 I/O:读/写吞吐量和利用率
- 网络 I/O:网络流量统计信息
对于 PyTorch 训练,监控 GPU 指标对于以下方面尤其有价值:
- 识别可能指示数据加载瓶颈的 GPU 利用不足
- 在发生内存不足错误之前检测内存问题
- 根据 GPU 内存使用情况优化批次大小
- 比较不同模型架构之间的资源效率
您可以自定义系统指标收集频率和行为。有关详细配置选项,请参阅系统指标文档。
带签名的模型日志记录
记录带有输入/输出签名的 PyTorch 模型,以更好地理解模型
import mlflow
import torch
import torch.nn as nn
from mlflow.models import infer_signature
model = nn.Sequential(nn.Linear(10, 50), nn.ReLU(), nn.Linear(50, 1))
# Create sample input and output for signature
input_example = torch.randn(1, 10)
predictions = model(input_example)
# Infer signature from input/output
signature = infer_signature(input_example.numpy(), predictions.detach().numpy())
with mlflow.start_run():
# Log model with signature and input example
mlflow.pytorch.log_model(
model,
name="pytorch_model",
signature=signature,
input_example=input_example.numpy(),
)
检查点跟踪
使用 MLflow 3 的检查点版本控制在训练过程中跟踪模型检查点。使用 step 参数来版本化检查点并将指标链接到特定的模型版本
import mlflow
import torch
import torch.nn as nn
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# Helper function to prepare data
def prepare_data(df):
X = torch.tensor(df.iloc[:, :-1].values, dtype=torch.float32)
y = torch.tensor(df.iloc[:, -1].values, dtype=torch.long)
return X, y
# Helper function to compute accuracy
def compute_accuracy(model, X, y):
with torch.no_grad():
outputs = model(X)
_, predicted = torch.max(outputs, 1)
accuracy = (predicted == y).sum().item() / y.size(0)
return accuracy
# Define a basic PyTorch classifier
class IrisClassifier(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# Load Iris dataset and prepare the DataFrame
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df["target"] = iris.target
# Split into training and testing datasets
train_df, test_df = train_test_split(iris_df, test_size=0.2, random_state=42)
# Prepare training data
train_dataset = mlflow.data.from_pandas(train_df, name="iris_train")
X_train, y_train = prepare_data(train_dataset.df)
# Initialize model
input_size = X_train.shape[1]
hidden_size = 16
output_size = len(iris.target_names)
model = IrisClassifier(input_size, hidden_size, output_size)
# Training configuration
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
with mlflow.start_run() as run:
# Log parameters once at the start
mlflow.log_params(
{
"n_layers": 3,
"activation": "ReLU",
"criterion": "CrossEntropyLoss",
"optimizer": "Adam",
"learning_rate": 0.01,
}
)
for epoch in range(101):
# Training step
out = model(X_train)
loss = criterion(out, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Log a checkpoint every 10 epochs
if epoch % 10 == 0:
# Log model checkpoint with step parameter
model_info = mlflow.pytorch.log_model(
pytorch_model=model,
name=f"iris-checkpoint-{epoch}",
step=epoch,
input_example=X_train[:5].numpy(),
)
# Log metrics linked to this checkpoint and dataset
accuracy = compute_accuracy(model, X_train, y_train)
mlflow.log_metric(
key="train_accuracy",
value=accuracy,
step=epoch,
model_id=model_info.model_id,
dataset=train_dataset,
)
# Search and rank checkpoints by performance
ranked_checkpoints = mlflow.search_logged_models(
filter_string=f"source_run_id='{run.info.run_id}'",
order_by=[{"field_name": "metrics.train_accuracy", "ascending": False}],
output_format="list",
)
best_checkpoint = ranked_checkpoints[0]
print(f"Best checkpoint: {best_checkpoint.name}")
print(f"Accuracy: {best_checkpoint.metrics[0].value}")
这种方法使您能够
- 使用
step参数系统地版本化检查点 - 使用
log_metric()中的model_id将指标链接到特定模型 - 将指标与数据集关联以进行更好的跟踪
- 使用
mlflow.search_logged_models()排名和比较检查点
模型加载
加载已记录的 PyTorch 模型以进行推理
# Load as PyTorch model
model_uri = "runs:/<run_id>/pytorch_model"
loaded_model = mlflow.pytorch.load_model(model_uri)
# Make predictions
input_tensor = torch.randn(5, 10)
predictions = loaded_model(input_tensor)
# Load as PyFunc for generic inference
pyfunc_model = mlflow.pyfunc.load_model(model_uri)
predictions = pyfunc_model.predict(input_tensor.numpy())
超参数优化
使用 MLflow 跟踪超参数调优实验
import mlflow
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# Create synthetic dataset for demonstration
input_size = 784 # e.g., flattened 28x28 images
output_size = 10 # e.g., 10 classes
X_train = torch.randn(1000, input_size)
y_train = torch.randint(0, output_size, (1000,))
X_val = torch.randn(200, input_size)
y_val = torch.randint(0, output_size, (200,))
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=32)
def train_and_evaluate(model, optimizer, train_loader, val_loader, epochs=5):
"""Simple training loop for demonstration."""
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
model.train()
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for data, target in val_loader:
output = model(data)
val_loss += criterion(output, target).item()
return val_loss / len(val_loader)
def objective(trial):
"""Optuna objective for hyperparameter tuning."""
with mlflow.start_run(nested=True):
# Define hyperparameter search space
params = {
"learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True),
"hidden_size": trial.suggest_int("hidden_size", 32, 512),
"dropout": trial.suggest_float("dropout", 0.1, 0.5),
}
# Log parameters
mlflow.log_params(params)
# Create model
model = nn.Sequential(
nn.Linear(input_size, params["hidden_size"]),
nn.ReLU(),
nn.Dropout(params["dropout"]),
nn.Linear(params["hidden_size"], output_size),
)
# Train model
optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"])
val_loss = train_and_evaluate(model, optimizer, train_loader, val_loader)
# Log validation loss
mlflow.log_metric("val_loss", val_loss)
return val_loss
# Run optimization
with mlflow.start_run(run_name="PyTorch HPO"):
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=50)
# Log best parameters
mlflow.log_params({f"best_{k}": v for k, v in study.best_params.items()})
mlflow.log_metric("best_val_loss", study.best_value)
模型注册表集成
注册 PyTorch 模型以进行版本控制和部署
import mlflow
import torch.nn as nn
from mlflow import MlflowClient
client = MlflowClient()
with mlflow.start_run():
# Create a simple model for demonstration
model = nn.Sequential(
nn.Conv2d(3, 32, 3),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Flatten(),
nn.Linear(32 * 15 * 15, 10),
)
# Log model to registry
model_info = mlflow.pytorch.log_model(
model, name="pytorch_model", registered_model_name="ImageClassifier"
)
# Tag for tracking
mlflow.set_tags(
{"model_type": "cnn", "dataset": "imagenet", "framework": "pytorch"}
)
# Set alias for production deployment
client.set_registered_model_alias(
name="ImageClassifier",
alias="champion",
version=model_info.registered_model_version,
)
分布式训练
跟踪分布式 PyTorch 训练实验
import mlflow
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
# Create synthetic dataset
X_train = torch.randn(1000, 784)
y_train = torch.randint(0, 10, (1000,))
train_dataset = TensorDataset(X_train, y_train)
# Simple model
model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))
def train_epoch(model, train_loader):
"""Simple training epoch for demonstration."""
model.train()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
total_loss = 0
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
def train_distributed():
# Initialize distributed training
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
# Wrap model with DDP
model_ddp = DDP(model.to(rank), device_ids=[rank])
# Create distributed sampler
from torch.utils.data.distributed import DistributedSampler
sampler = DistributedSampler(
train_dataset, num_replicas=dist.get_world_size(), rank=rank
)
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)
# Only log from rank 0
if rank == 0:
mlflow.start_run()
mlflow.log_params({"world_size": dist.get_world_size(), "backend": "nccl"})
# Training loop
epochs = 10
for epoch in range(epochs):
sampler.set_epoch(epoch) # Shuffle data differently each epoch
train_loss = train_epoch(model_ddp, train_loader)
# Log metrics from rank 0 only
if rank == 0:
mlflow.log_metric("train_loss", train_loss, step=epoch)
# Save model from rank 0
if rank == 0:
mlflow.pytorch.log_model(model, name="distributed_model")
mlflow.end_run()