跳到主要内容

MLflow PyTorch 快速入门

下载此 Notebook

本快速入门指南将引导您了解如何将 PyTorch 实验记录到 MLflow。阅读本快速入门后,您将学习将 PyTorch 实验记录到 MLflow 的基础知识,以及如何在 MLflow UI 中查看实验结果。

本快速入门指南兼容 Google Colab 和 Databricks Notebook 等基于云的 Notebook,您也可以在本地运行它。

安装所需软件包

%pip install -q mlflow torchmetrics torchinfo
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchinfo import summary
from torchmetrics import Accuracy
from torchvision import datasets
from torchvision.transforms import ToTensor

import mlflow

任务概述

在本指南中,我们将通过一个简单的 MNIST 图像分类任务来演示 MLflow 与 PyTorch 的功能。我们将构建一个卷积神经网络作为图像分类器,并将以下信息记录到 mlflow 中:

  • 训练指标:训练损失和准确率。
  • 评估指标:评估损失和准确率。
  • 训练配置:学习率、批量大小等。
  • 模型信息:模型结构。
  • 保存的模型:训练后的模型实例。

现在让我们深入细节!

准备数据

让我们从 torchvision 加载训练数据 FashionMNIST,它已经被预处理并缩放到 [0, 1) 范围内。然后我们将数据集包装到 torch.utils.data.Dataloader 的一个实例中。

training_data = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor(),
)

test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=ToTensor(),
)

让我们看看我们的数据。

print(f"Image size: {training_data[0][0].shape}")
print(f"Size of training dataset: {len(training_data)}")
print(f"Size of test dataset: {len(test_data)}")
Image size: torch.Size([1, 28, 28])
Size of training dataset: 60000
Size of test dataset: 10000

我们将数据集包装成一个 Dataloader 实例用于批量处理。Dataloader 是一个有用的数据预处理工具。欲了解更多详情,请参阅 PyTorch 的开发者指南

train_dataloader = DataLoader(training_data, batch_size=64)
test_dataloader = DataLoader(test_data, batch_size=64)

定义模型

现在,让我们定义我们的模型。我们将构建一个简单的卷积神经网络作为分类器。要定义一个 PyTorch 模型,您需要继承自 torch.nn.Module 并重写 __init__ 方法来定义模型组件,以及实现前向传播逻辑的 forward() 方法。

我们将构建一个包含 2 个卷积层的简单卷积神经网络 (CNN) 作为图像分类器。CNN 是图像分类任务中常用的架构,有关 CNN 的更多详情,请阅读此文档。我们的模型输出将是每个类别的 logits(总共 10 个类别)。对 logits 应用 softmax 会得到跨类别的概率分布。

class ImageClassifier(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=3),
nn.ReLU(),
nn.Conv2d(8, 16, kernel_size=3),
nn.ReLU(),
nn.Flatten(),
nn.LazyLinear(10), # 10 classes in total.
)

def forward(self, x):
return self.model(x)

连接到 MLflow 跟踪服务器

在实现训练循环之前,我们需要配置 MLflow 跟踪服务器,因为我们将在训练期间将数据记录到 MLflow 中。

在本指南中,我们将使用 Databricks 免费试用版 作为 MLflow 跟踪服务器。有关其他选项,例如使用您本地的 MLflow 服务器,请阅读跟踪服务器概述

如果您还没有这样做,请按照此指南设置您的 Databricks 免费试用版账户和访问令牌。注册过程不会超过 5 分钟。Databricks 免费试用版是用户免费试用 Databricks 功能的一种方式。对于本指南,我们需要 ML 实验仪表板来跟踪我们的训练进度。

成功注册 Databricks 免费试用版账户后,让我们将 MLflow 连接到 Databricks Workspace。您需要输入以下信息:

  • Databricks 主机:https://<your workspace host>.cloud.databricks.com/
  • 令牌:您的个人访问令牌
mlflow.login()

现在您已成功连接到 Databricks Workspace 上的 MLflow 跟踪服务器,让我们为我们的实验命名。

mlflow.set_experiment("/Users/<your email>/mlflow-pytorch-quickstart")
<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/1078557169589361', creation_time=1703121702068, experiment_id='1078557169589361', last_update_time=1703194525608, lifecycle_stage='active', name='/mlflow-pytorch-quickstart', tags={'mlflow.experiment.sourceName': '/mlflow-pytorch-quickstart',
'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
'mlflow.ownerEmail': 'qianchen94era@gmail.com',
'mlflow.ownerId': '3209978630771139'}>

实现训练循环

现在让我们定义训练循环,它基本上是迭代数据集并对每个数据批次应用前向和后向传播。

获取设备信息,因为 PyTorch 需要手动管理设备。

# Get cpu or gpu for training.
device = "cuda" if torch.cuda.is_available() else "cpu"

定义训练函数。

def train(dataloader, model, loss_fn, metrics_fn, optimizer, epoch):
"""Train the model on a single pass of the dataloader.

Args:
dataloader: an instance of `torch.utils.data.DataLoader`, containing the training data.
model: an instance of `torch.nn.Module`, the model to be trained.
loss_fn: a callable, the loss function.
metrics_fn: a callable, the metrics function.
optimizer: an instance of `torch.optim.Optimizer`, the optimizer used for training.
epoch: an integer, the current epoch number.
"""
model.train()
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)

pred = model(X)
loss = loss_fn(pred, y)
accuracy = metrics_fn(pred, y)

# Backpropagation.
loss.backward()
optimizer.step()
optimizer.zero_grad()

if batch % 100 == 0:
loss, current = loss.item(), batch
step = batch // 100 * (epoch + 1)
mlflow.log_metric("loss", f"{loss:2f}", step=step)
mlflow.log_metric("accuracy", f"{accuracy:2f}", step=step)
print(f"loss: {loss:2f} accuracy: {accuracy:2f} [{current} / {len(dataloader)}]")

定义评估函数,它将在每个 epoch 结束时运行。

def evaluate(dataloader, model, loss_fn, metrics_fn, epoch):
"""Evaluate the model on a single pass of the dataloader.

Args:
dataloader: an instance of `torch.utils.data.DataLoader`, containing the eval data.
model: an instance of `torch.nn.Module`, the model to be trained.
loss_fn: a callable, the loss function.
metrics_fn: a callable, the metrics function.
epoch: an integer, the current epoch number.
"""
num_batches = len(dataloader)
model.eval()
eval_loss, eval_accuracy = 0, 0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
eval_loss += loss_fn(pred, y).item()
eval_accuracy += metrics_fn(pred, y)

eval_loss /= num_batches
eval_accuracy /= num_batches
mlflow.log_metric("eval_loss", f"{eval_loss:2f}", step=epoch)
mlflow.log_metric("eval_accuracy", f"{eval_accuracy:2f}", step=epoch)

print(f"Eval metrics:
Accuracy: {eval_accuracy:.2f}, Avg loss: {eval_loss:2f}
")

开始训练

是时候开始训练了!首先让我们定义训练超参数,创建模型,声明损失函数并实例化优化器。

epochs = 3
loss_fn = nn.CrossEntropyLoss()
metric_fn = Accuracy(task="multiclass", num_classes=10).to(device)
model = ImageClassifier().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
/usr/local/lib/python3.10/dist-packages/torch/nn/modules/lazy.py:180: UserWarning: Lazy modules are a new feature under heavy development so changes to the API or functionality can happen at any moment.
warnings.warn('Lazy modules are a new feature under heavy development '

把所有东西放在一起,让我们开始训练并将信息记录到 MLflow。在训练开始时,我们将训练和模型信息记录到 MLflow;在训练期间,我们记录训练和评估指标。一切完成后,我们记录训练好的模型。

with mlflow.start_run() as run:
params = {
"epochs": epochs,
"learning_rate": 1e-3,
"batch_size": 64,
"loss_function": loss_fn.__class__.__name__,
"metric_function": metric_fn.__class__.__name__,
"optimizer": "SGD",
}
# Log training parameters.
mlflow.log_params(params)

# Log model summary.
with open("model_summary.txt", "w") as f:
f.write(str(summary(model)))
mlflow.log_artifact("model_summary.txt")

for t in range(epochs):
print(f"Epoch {t + 1}
-------------------------------")
train(train_dataloader, model, loss_fn, metric_fn, optimizer, epoch=t)
evaluate(test_dataloader, model, loss_fn, metric_fn, epoch=0)

# Save the trained model to MLflow.
mlflow.pytorch.log_model(model, "model")
Epoch 1
-------------------------------
loss: 2.294313 accuracy: 0.046875 [0 / 938]
loss: 2.151955 accuracy: 0.515625 [100 / 938]
loss: 1.825312 accuracy: 0.640625 [200 / 938]
loss: 1.513407 accuracy: 0.593750 [300 / 938]
loss: 1.059044 accuracy: 0.718750 [400 / 938]
loss: 0.931140 accuracy: 0.687500 [500 / 938]
loss: 0.889886 accuracy: 0.703125 [600 / 938]
loss: 0.742625 accuracy: 0.765625 [700 / 938]
loss: 0.786106 accuracy: 0.734375 [800 / 938]
loss: 0.788444 accuracy: 0.781250 [900 / 938]
Eval metrics: 
Accuracy: 0.75, Avg loss: 0.719401 

Epoch 2
-------------------------------
loss: 0.649325 accuracy: 0.796875 [0 / 938]
loss: 0.756684 accuracy: 0.718750 [100 / 938]
loss: 0.488664 accuracy: 0.828125 [200 / 938]
loss: 0.780433 accuracy: 0.718750 [300 / 938]
loss: 0.691777 accuracy: 0.656250 [400 / 938]
loss: 0.670005 accuracy: 0.750000 [500 / 938]
loss: 0.712286 accuracy: 0.687500 [600 / 938]
loss: 0.644150 accuracy: 0.765625 [700 / 938]
loss: 0.683426 accuracy: 0.750000 [800 / 938]
loss: 0.659378 accuracy: 0.781250 [900 / 938]
Eval metrics: 
Accuracy: 0.77, Avg loss: 0.636072 

Epoch 3
-------------------------------
loss: 0.528523 accuracy: 0.781250 [0 / 938]
loss: 0.634942 accuracy: 0.750000 [100 / 938]
loss: 0.420757 accuracy: 0.843750 [200 / 938]
loss: 0.701463 accuracy: 0.703125 [300 / 938]
loss: 0.649267 accuracy: 0.656250 [400 / 938]
loss: 0.624556 accuracy: 0.812500 [500 / 938]
loss: 0.648762 accuracy: 0.718750 [600 / 938]
loss: 0.630074 accuracy: 0.781250 [700 / 938]
loss: 0.682306 accuracy: 0.718750 [800 / 938]
loss: 0.587403 accuracy: 0.750000 [900 / 938]
2023/12/21 21:39:55 WARNING mlflow.models.model: Model logged without a signature. Signatures will be required for upcoming model registry features as they validate model inputs and denote the expected schema of model outputs. Please visit https://www.mlflow.org/docs/2.9.2/models.html#set-signature-on-logged-model for instructions on setting a model signature on your logged model.
2023/12/21 21:39:56 WARNING mlflow.utils.requirements_utils: Found torch version (2.1.0+cu121) contains a local version label (+cu121). MLflow logged a pip requirement for this package as 'torch==2.1.0' without the local version label to make it installable from PyPI. To specify pip requirements containing local version labels, please use `conda_env` or `pip_requirements`.
Eval metrics: 
Accuracy: 0.77, Avg loss: 0.616615
2023/12/21 21:40:02 WARNING mlflow.utils.requirements_utils: Found torch version (2.1.0+cu121) contains a local version label (+cu121). MLflow logged a pip requirement for this package as 'torch==2.1.0' without the local version label to make it installable from PyPI. To specify pip requirements containing local version labels, please use `conda_env` or `pip_requirements`.
/usr/local/lib/python3.10/dist-packages/_distutils_hack/__init__.py:33: UserWarning: Setuptools is replacing distutils.
warnings.warn("Setuptools is replacing distutils.")
Uploading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

在训练进行期间,您可以在仪表板中找到此训练。登录到您的 Databricks Workspace,然后点击 Experiments 选项卡。请参阅下面的屏幕截图:登陆页面

点击 Experiments 选项卡后,它将带您进入实验页面,您可以在其中找到您的运行。点击最近的实验和运行,您可以在那里找到您的指标,类似于:实验页面

在 artifacts 部分,您可以看到我们的模型已成功记录:已保存的模型

最后一步,让我们加载回模型并在其上运行推理。

logged_model = f"runs:/{run.info.run_id}/model"
loaded_model = mlflow.pyfunc.load_model(logged_model)
Downloading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

需要注意的是,加载模型的输入必须是 numpy 数组或 pandas Dataframe,因此我们需要将张量显式转换为 numpy 格式。

outputs = loaded_model.predict(training_data[0][0][None, :].numpy())