跳到主要内容

MLflow 中的 TensorFlow

TensorFlow 是一个强大的端到端开源机器学习平台,彻底改变了开发者构建和部署 ML 解决方案的方式。凭借其全面的工具和库生态系统,TensorFlow 使从初学者到专家的每个人都能为各种应用程序创建复杂的模型。

TensorFlow 的 Keras API 提供了一个直观的界面,用于构建和训练深度学习模型,而其强大的后端能够在 CPU、GPU 和 TPU 上进行高效计算。

为什么 TensorFlow 引领行业

完整的 ML 生态系统

  • 🏗️ 可用于生产环境:从实验到部署的端到端平台
  • 📱 多平台部署:在浏览器、移动设备、边缘硬件和服务器上运行模型
  • 🔬 研究灵活性:为初学者和专家提供高级和低级 API
  • 📊 TensorBoard 集成:对模型架构和训练指标进行丰富的可视化

强大的核心功能

  • 图执行:优化的执行以获得最佳性能
  • 🔄 Eager Execution:立即评估以进行直观的调试
  • 🧩 模块化设计:自定义 ML 管道的任何部分
  • 🌐 全球社区:丰富的资源、教程和预训练模型

为什么选择 MLflow + TensorFlow?

MLflow 与 TensorFlow 的集成为机器学习从业者创建了一个强大的工作流程

  • 📊 一键式自动日志记录:只需 mlflow.tensorflow.autolog() 即可启用全面的跟踪
  • ⚙️ 零代码集成:您现有的 TensorFlow 训练代码可以不变地工作
  • 🔄 完全可重现性:每个参数、指标和模型都会自动捕获
  • 📈 训练可视化:通过 MLflow UI 监控性能
  • 👥 协作开发:与团队成员分享实验和结果
  • 🚀 简化的部署:打包模型以跨不同环境进行部署

自动记录 TensorFlow 实验

MLflow 可以自动记录 TensorFlow 训练运行中的指标、参数和模型。只需在训练代码之前调用 mlflow.tensorflow.autolog()mlflow.autolog()

import mlflow
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Enable autologging
mlflow.tensorflow.autolog()

# Prepare sample data
data = np.random.uniform(size=[20, 28, 28, 3])
label = np.random.randint(2, size=20)

# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)

model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(0.001),
metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Training with automatic logging
with mlflow.start_run():
model.fit(data, label, batch_size=5, epochs=2)
自动日志记录的要求和限制

要求

  • TensorFlow 版本:仅支持 TensorFlow >= 2.3.0
  • 训练 API:必须使用 model.fit() Keras API
  • 运行上下文:可以与活动 MLflow 运行一起使用,也可以不使用

限制

  • 自定义训练循环:不支持(请改用手动日志记录)
  • 旧版本的 TensorFlow:不支持(请改用手动日志记录)
  • 非 Keras TensorFlow:不支持(请改用手动日志记录)
注意

只有在使用 model.fit() Keras API 训练模型时才支持自动日志记录。此外,仅支持 TensorFlow >= 2.3.0。如果您使用的是旧版本的 TensorFlow 或没有 Keras 的 TensorFlow,请使用手动日志记录。

自动记录的内容

全面的自动日志记录详细信息

模型信息

  • 📋 模型摘要:由 model.summary() 返回的完整架构概述
  • 🧩 层配置:模型中每一层的详细信息
  • 📐 参数计数:可训练和不可训练参数的总数

训练参数

  • ⚙️ 批次大小:每次梯度更新的样本数
  • 🔢 Epochs:通过训练数据集的完整遍数
  • 🧮 每个 Epoch 的步数:每个 epoch 的批次迭代次数
  • 📏 验证步数:用于验证的批次迭代次数

优化器配置

  • 🧠 优化器名称:使用的优化器类型(Adam、SGD 等)
  • 📉 学习率:梯度更新的步长
  • 🎯 Epsilon:用于数值稳定性的常数
  • 🔄 其他优化器参数:Beta 值、动量等

数据集信息

  • 📊 数据集形状:输入和输出维度
  • 🔢 样本计数:训练和验证样本的数量

训练指标

  • 📉 训练损失:每个 epoch 的损失值
  • 📈 验证损失:验证数据上的损失
  • 🎯 自定义指标model.compile() 中指定的任何指标
  • 🔄 提前停止指标stopped_epochrestored_epoch

工件

  • 🤖 已保存的模型:TensorFlow SavedModel 格式的完整模型
  • 📊 TensorBoard 日志:训练和验证指标

您可以通过将参数传递给 mlflow.tensorflow.autolog() 来自定义自动日志记录行为

mlflow.tensorflow.autolog(
log_models=True,
log_input_examples=True,
log_model_signatures=True,
log_dataset_info=True,
log_every_n_steps=1,
)
TensorFlow 自动日志记录的工作原理

MLflow 的 TensorFlow 自动日志记录使用通过猴子补丁附加到您的模型的自定义 Keras 回调。此回调

  1. 捕获初始状态:在训练开始时,记录模型架构、超参数和优化器设置
  2. 监控训练:跟踪每个 epoch 或指定时间间隔的指标
  3. 记录完成:在训练完成时保存最终训练的模型

此方法与 TensorFlow 现有的回调系统无缝集成,确保与您的其他回调(例如提前停止或学习率调度)兼容。

使用 Keras 回调记录到 MLflow

为了更好地控制记录的内容,您可以使用 MLflow 内置的 Keras 回调或创建自己的自定义回调。

使用预定义的回调

MLflow 提供 mlflow.tensorflow.MlflowCallback,它提供与自动日志记录相同的功能,但具有更明确的控制

import mlflow
from tensorflow import keras

# Define and compile your model
model = keras.Sequential([...])
model.compile(...)

# Create an MLflow run and add the callback
with mlflow.start_run() as run:
model.fit(
data,
labels,
batch_size=32,
epochs=10,
callbacks=[mlflow.tensorflow.MlflowCallback(run)],
)
回调配置选项

MlflowCallback 接受多个参数来自定义日志记录行为

mlflow.tensorflow.MlflowCallback(
log_every_epoch=True, # Log metrics at the end of each epoch
log_every_n_steps=None, # Log metrics every N steps (overrides log_every_epoch)
)
  • 基于 Epoch 的日志记录:设置 log_every_epoch=True(默认)以在每个 epoch 结束时记录
  • 基于批次的日志记录:设置 log_every_n_steps=N 以每 N 个批次记录一次
  • 选择性模型日志记录:设置 log_models=False 以禁用模型保存

自定义 MLflow 日志记录

您可以通过继承 keras.callbacks.Callback 来创建自己的回调,以实现自定义日志记录逻辑

from tensorflow import keras
import math
import mlflow


class CustomMlflowCallback(keras.callbacks.Callback):
def on_epoch_begin(self, epoch, logs=None):
mlflow.log_metric("current_epoch", epoch)

def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
# Log metrics in log scale
for k, v in logs.items():
if v > 0: # Avoid log(0) or log(negative)
mlflow.log_metric(f"log_{k}", math.log(v), step=epoch)
mlflow.log_metric(k, v, step=epoch)

def on_train_end(self, logs=None):
# Log final model weights statistics
weights = self.model.get_weights()
mlflow.log_metric("total_parameters", sum(w.size for w in weights))
mlflow.log_metric(
"average_weight",
sum(w.sum() for w in weights) / sum(w.size for w in weights),
)
Keras 回调生命周期挂钩

Keras 回调为训练过程提供各种挂钩

  • 训练设置on_train_beginon_train_end
  • Epoch 进度on_epoch_beginon_epoch_end
  • 批次进度on_batch_beginon_batch_end
  • 验证on_test_beginon_test_end
  • 预测on_predict_beginon_predict_end

传递给这些方法的 logs 字典包含诸如

  • loss:训练损失
  • val_loss:验证损失
  • model.compile() 中定义的任何自定义指标

有关完整文档,请参阅 keras.callbacks.Callback

将您的 TensorFlow 模型保存到 MLflow

基本模型保存

如果您尚未启用自动日志记录(自动保存模型),则可以使用 mlflow.tensorflow.log_model() 手动保存您的 TensorFlow 模型

import mlflow
import tensorflow as tf
from tensorflow import keras

# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)

# Train model (code omitted for brevity)

# Log the model to MLflow
model_info = mlflow.tensorflow.log_model(model, name="model")

# Later, load the model for inference
loaded_model = mlflow.tensorflow.load_model(model_info.model_uri)
predictions = loaded_model.predict(tf.random.uniform([1, 28, 28, 3]))
了解 MLflow 模型保存

当您使用 MLflow 保存 TensorFlow 模型时

  1. 格式转换:该模型将转换为通用的 MLflow pyfunc 模型以支持部署,通过 mlflow.pyfunc.load_model() 加载
  2. 保留原始格式:该模型仍然能够作为本机 TensorFlow 对象通过 mlflow.tensorflow.load_model() 加载
  3. 元数据创建:存储模型元数据,包括依赖项和签名
  4. 工件存储:该模型被保存到 MLflow 工件存储中
  5. 加载能力:该模型可以作为本机 TensorFlow 模型或通用的 pyfunc 模型加载

此方法能够实现一致的模型管理,而与使用的框架无关。

模型格式

默认情况下,MLflow 将 TensorFlow 模型保存在 TensorFlow SavedModel 格式(编译图)中,这是部署的理想选择。您也可以保存为其他格式

# Save in H5 format (weights only)
mlflow.tensorflow.log_model(
model, name="model", keras_model_kwargs={"save_format": "h5"}
)

# Save in native Keras format
mlflow.tensorflow.log_model(
model, name="model", keras_model_kwargs={"save_format": "keras"}
)
比较模型格式

TensorFlow SavedModel(默认)

  • 完整的序列化:包括模型架构、权重和编译信息
  • 已准备好部署:针对生产环境进行了优化
  • TensorFlow Serving:与 TensorFlow Serving 兼容
  • 跨平台:可以跨不同平台部署

H5 格式

  • 权重存储:高效存储模型权重
  • 更小尺寸:通常比 SavedModel 格式小
  • 有限的信息:不包括完整的计算图
  • 部署限制:不适合某些部署方案

Keras 格式

  • 本机 Keras:使用 Keras 的本机序列化
  • 兼容性:与较新的 Keras 版本配合良好
  • 部署:可能需要额外的部署步骤

对于大多数生产用例,建议使用默认的 SavedModel 格式。有关更多详细信息,请参阅 TensorFlow 保存和加载指南

模型签名

模型签名描述了模型的预期输入和输出格式。虽然是可选的,但它是更好地理解和验证模型的最佳实践。添加签名的最简单方法是使用自动推断

import mlflow
from mlflow.models import infer_signature
import tensorflow as tf
import numpy as np

# Sample input data
sample_input = np.random.uniform(size=[2, 28, 28, 3])

# Get predictions
sample_output = model.predict(sample_input)

# Infer signature from data
signature = infer_signature(sample_input, sample_output)

# Log model with inferred signature
model_info = mlflow.tensorflow.log_model(model, name="model", signature=signature)

当启用 log_input_examples=Truelog_model_signatures=True 的自动日志记录时,MLflow 会自动从您的训练数据中推断并记录签名。

签名出现在 MLflow UI 中

TensorFlow Model Signature

手动签名定义

为了完全控制您的模型签名,您可以手动定义输入和输出架构

import mlflow
import tensorflow as tf
import numpy as np
from tensorflow import keras
from mlflow.types import Schema, TensorSpec
from mlflow.models import ModelSignature

# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)

# Define input schema
input_schema = Schema(
[
TensorSpec(np.dtype(np.float32), (-1, 28, 28, 3), "input"),
]
)

# Create signature with input schema
signature = ModelSignature(inputs=input_schema)

# Log model with signature
model_info = mlflow.tensorflow.log_model(model, name="model", signature=signature)

手动定义在以下情况下很有用

  • 您需要精确控制张量规范
  • 处理复杂的输入/输出结构
  • 自动推理无法捕获您期望的模式
  • 您想预先指定确切的数据类型和形状

高级 TensorFlow 集成

复杂模型跟踪

对于更复杂的模型,您可能希望跟踪其他信息

跟踪迁移学习
import mlflow
import tensorflow as tf
import matplotlib.pyplot as plt

# Load pre-trained model
base_model = tf.keras.applications.MobileNetV2(
input_shape=(224, 224, 3), include_top=False, weights="imagenet"
)

# Freeze base model
base_model.trainable = False

# Create new model head
inputs = tf.keras.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(256, activation="relu")(x)
outputs = tf.keras.layers.Dense(10, activation="softmax")(x)
model = tf.keras.Model(inputs, outputs)

with mlflow.start_run() as run:
# Log base model information
mlflow.log_param("base_model", "MobileNetV2")
mlflow.log_param("base_model_trainable", False)
mlflow.log_param("new_layers", "GlobalAveragePooling2D, Dense(256), Dense(10)")

# Log base model summary
with open("base_model_summary.txt", "w") as f:
base_model.summary(print_fn=lambda x: f.write(x + "\n"))
mlflow.log_artifact("base_model_summary.txt")

# Log model visualization
tf.keras.utils.plot_model(model, to_file="model_architecture.png", show_shapes=True)
mlflow.log_artifact("model_architecture.png")

# Continue with normal training...
跟踪多模型实验
import mlflow

# Main experiment run
with mlflow.start_run(run_name="ensemble_experiment") as parent_run:
mlflow.log_param("experiment_type", "ensemble")

# Train first model
with mlflow.start_run(run_name="model_1", nested=True) as child_run_1:
model_1 = create_model_1()
# Train model_1
mlflow.tensorflow.log_model(model_1, name="model_1")
mlflow.log_metric("accuracy", accuracy_1)

# Train second model
with mlflow.start_run(run_name="model_2", nested=True) as child_run_2:
model_2 = create_model_2()
# Train model_2
mlflow.tensorflow.log_model(model_2, name="model_2")
mlflow.log_metric("accuracy", accuracy_2)

# Create and log ensemble model
ensemble_model = create_ensemble([model_1, model_2])
mlflow.tensorflow.log_model(ensemble_model, name="ensemble_model")
mlflow.log_metric("ensemble_accuracy", ensemble_accuracy)

超参数优化

将 TensorFlow 与超参数调整工具结合使用,同时跟踪 MLflow 中的所有内容

import mlflow
import tensorflow as tf
from tensorflow import keras
import optuna


def create_model(trial):
# Define hyperparameters to tune
learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
units = trial.suggest_int("units", 32, 512)
dropout = trial.suggest_float("dropout", 0.1, 0.5)

# Create model with suggested hyperparameters
model = keras.Sequential(
[
keras.layers.Input(shape=(28, 28, 3)),
keras.layers.Flatten(),
keras.layers.Dense(units, activation="relu"),
keras.layers.Dropout(dropout),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)

return model


def objective(trial):
# Start nested run for this trial
with mlflow.start_run(nested=True):
# Log hyperparameters
params = {
"learning_rate": trial.params["learning_rate"],
"units": trial.params["units"],
"dropout": trial.params["dropout"],
}
mlflow.log_params(params)

# Create and train model
model = create_model(trial)
history = model.fit(
x_train, y_train, validation_data=(x_val, y_val), epochs=5, verbose=0
)

# Get validation accuracy
val_accuracy = max(history.history["val_accuracy"])
mlflow.log_metric("val_accuracy", val_accuracy)

# Log model
mlflow.tensorflow.log_model(model, name="model")

return val_accuracy


# Main experiment run
with mlflow.start_run(run_name="hyperparameter_optimization"):
# Log study parameters
mlflow.log_params(
{
"optimization_framework": "optuna",
"n_trials": 20,
"direction": "maximize",
"metric": "val_accuracy",
}
)

# Create and run study
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

# Log best parameters and score
mlflow.log_params({f"best_{k}": v for k, v in study.best_params.items()})
mlflow.log_metric("best_val_accuracy", study.best_value)

# Train final model with best parameters
final_model = create_model(study.best_trial)
final_model.fit(x_train, y_train, epochs=10)
mlflow.tensorflow.log_model(final_model, name="best_model")

部署准备

一旦您使用 MLflow 训练和记录了您的 TensorFlow 模型,就可以通过使用 MLflow CLI 的单个命令在本地轻松部署它

mlflow models serve -m models:/<model_id> -p 5000

测试您部署的模型

import requests
import json

# Prepare test data
test_data = {"inputs": sample_input.numpy().tolist()}

# Make prediction request
response = requests.post(
"https://:5000/invocations",
data=json.dumps(test_data),
headers={"Content-Type": "application/json"},
)

predictions = response.json()
print("Predictions:", predictions)
高级部署选项

mlflow models serve 命令支持多个自定义选项

# Specify environment manager
mlflow models serve -m models:/<model_id> -p 5000 --env-manager conda

# Enable MLServer for enhanced inference capabilities
mlflow models serve -m models:/<model_id> -p 5000 --enable-mlserver

# Set custom host
mlflow models serve -m models:/<model_id> -p 5000 --host 0.0.0.0

对于生产部署,请考虑

  • 使用 MLServer (--enable-mlserver) 以获得更好的性能和其他功能
  • 使用 mlflow models build-docker 构建 Docker 镜像
  • 部署到 Azure ML 或 Amazon SageMaker 等云平台
  • 设置适当的环境管理和依赖项隔离

实际应用

MLflow-TensorFlow 集成在以下情况下表现出色

  • 🖼️ 计算机视觉:跟踪 CNN 架构、数据增强策略以及用于图像分类、对象检测和分割的训练动态
  • 📝 自然语言处理:监控 Transformer 模型、嵌入以及用于语言理解、翻译和生成的微调
  • 📊 时间序列分析:记录用于预测、异常检测和序列预测的 RNN 和 LSTM 模型
  • 🏭 生产 ML 系统:对从开发到部署的模型进行版本控制,并具有完整的沿袭跟踪
  • 🎓 教育项目:记录从简单到复杂模型的学习进度
  • 🧪 实验研究:将新型架构和训练技术与已建立的基线进行比较

结论

MLflow-TensorFlow 集成为跟踪、管理和部署机器学习实验提供了一个全面的解决方案。通过将 TensorFlow 的强大计算能力与 MLflow 的实验跟踪相结合,您可以创建一个工作流程,该工作流程是

  • 🔍 透明:模型训练的每个方面都有记录
  • 🔄 可重现:实验可以完全重新创建
  • 📊 可比较:可以并排评估不同的方法
  • 📈 可扩展:从简单的原型到复杂的生产模型
  • 👥 协作:团队成员可以分享和建立彼此的工作

无论您是探索新型模型架构的研究人员,还是将模型部署到生产环境的工程师,MLflow-TensorFlow 集成都为有组织的、可重现和可扩展的机器学习开发奠定了基础。