MLflow 中的 TensorFlow
TensorFlow 是一个强大的端到端开源机器学习平台,彻底改变了开发者构建和部署 ML 解决方案的方式。凭借其全面的工具和库生态系统,TensorFlow 使从初学者到专家的每个人都能为各种应用程序创建复杂的模型。
TensorFlow 的 Keras API 提供了一个直观的界面,用于构建和训练深度学习模型,而其强大的后端能够在 CPU、GPU 和 TPU 上进行高效计算。
为什么 TensorFlow 引领行业
为什么选择 MLflow + TensorFlow?
MLflow 与 TensorFlow 的集成为机器学习从业者创建了一个强大的工作流程
- 📊 一键式自动日志记录:只需
mlflow.tensorflow.autolog()
即可启用全面的跟踪 - ⚙️ 零代码集成:您现有的 TensorFlow 训练代码可以不变地工作
- 🔄 完全可重现性:每个参数、指标和模型都会自动捕获
- 📈 训练可视化:通过 MLflow UI 监控性能
- 👥 协作开发:与团队成员分享实验和结果
- 🚀 简化的部署:打包模型以跨不同环境进行部署
自动记录 TensorFlow 实验
MLflow 可以自动记录 TensorFlow 训练运行中的指标、参数和模型。只需在训练代码之前调用 mlflow.tensorflow.autolog()
或 mlflow.autolog()
import mlflow
import numpy as np
import tensorflow as tf
from tensorflow import keras
# Enable autologging
mlflow.tensorflow.autolog()
# Prepare sample data
data = np.random.uniform(size=[20, 28, 28, 3])
label = np.random.randint(2, size=20)
# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(0.001),
metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
# Training with automatic logging
with mlflow.start_run():
model.fit(data, label, batch_size=5, epochs=2)
自动日志记录的要求和限制
要求
- ✅ TensorFlow 版本:仅支持 TensorFlow >= 2.3.0
- ✅ 训练 API:必须使用
model.fit()
Keras API - ✅ 运行上下文:可以与活动 MLflow 运行一起使用,也可以不使用
限制
- ❌ 自定义训练循环:不支持(请改用手动日志记录)
- ❌ 旧版本的 TensorFlow:不支持(请改用手动日志记录)
- ❌ 非 Keras TensorFlow:不支持(请改用手动日志记录)
只有在使用 model.fit()
Keras API 训练模型时才支持自动日志记录。此外,仅支持 TensorFlow >= 2.3.0。如果您使用的是旧版本的 TensorFlow 或没有 Keras 的 TensorFlow,请使用手动日志记录。
自动记录的内容
全面的自动日志记录详细信息
模型信息
- 📋 模型摘要:由
model.summary()
返回的完整架构概述 - 🧩 层配置:模型中每一层的详细信息
- 📐 参数计数:可训练和不可训练参数的总数
训练参数
- ⚙️ 批次大小:每次梯度更新的样本数
- 🔢 Epochs:通过训练数据集的完整遍数
- 🧮 每个 Epoch 的步数:每个 epoch 的批次迭代次数
- 📏 验证步数:用于验证的批次迭代次数
优化器配置
- 🧠 优化器名称:使用的优化器类型(Adam、SGD 等)
- 📉 学习率:梯度更新的步长
- 🎯 Epsilon:用于数值稳定性的常数
- 🔄 其他优化器参数:Beta 值、动量等
数据集信息
- 📊 数据集形状:输入和输出维度
- 🔢 样本计数:训练和验证样本的数量
训练指标
- 📉 训练损失:每个 epoch 的损失值
- 📈 验证损失:验证数据上的损失
- 🎯 自定义指标:
model.compile()
中指定的任何指标 - 🔄 提前停止指标:
stopped_epoch
、restored_epoch
等
工件
- 🤖 已保存的模型:TensorFlow SavedModel 格式的完整模型
- 📊 TensorBoard 日志:训练和验证指标
您可以通过将参数传递给 mlflow.tensorflow.autolog()
来自定义自动日志记录行为
mlflow.tensorflow.autolog(
log_models=True,
log_input_examples=True,
log_model_signatures=True,
log_dataset_info=True,
log_every_n_steps=1,
)
TensorFlow 自动日志记录的工作原理
MLflow 的 TensorFlow 自动日志记录使用通过猴子补丁附加到您的模型的自定义 Keras 回调。此回调
- 捕获初始状态:在训练开始时,记录模型架构、超参数和优化器设置
- 监控训练:跟踪每个 epoch 或指定时间间隔的指标
- 记录完成:在训练完成时保存最终训练的模型
此方法与 TensorFlow 现有的回调系统无缝集成,确保与您的其他回调(例如提前停止或学习率调度)兼容。
使用 Keras 回调记录到 MLflow
为了更好地控制记录的内容,您可以使用 MLflow 内置的 Keras 回调或创建自己的自定义回调。
使用预定义的回调
MLflow 提供 mlflow.tensorflow.MlflowCallback
,它提供与自动日志记录相同的功能,但具有更明确的控制
import mlflow
from tensorflow import keras
# Define and compile your model
model = keras.Sequential([...])
model.compile(...)
# Create an MLflow run and add the callback
with mlflow.start_run() as run:
model.fit(
data,
labels,
batch_size=32,
epochs=10,
callbacks=[mlflow.tensorflow.MlflowCallback(run)],
)
回调配置选项
MlflowCallback
接受多个参数来自定义日志记录行为
mlflow.tensorflow.MlflowCallback(
log_every_epoch=True, # Log metrics at the end of each epoch
log_every_n_steps=None, # Log metrics every N steps (overrides log_every_epoch)
)
- 基于 Epoch 的日志记录:设置
log_every_epoch=True
(默认)以在每个 epoch 结束时记录 - 基于批次的日志记录:设置
log_every_n_steps=N
以每 N 个批次记录一次 - 选择性模型日志记录:设置
log_models=False
以禁用模型保存
自定义 MLflow 日志记录
您可以通过继承 keras.callbacks.Callback
来创建自己的回调,以实现自定义日志记录逻辑
from tensorflow import keras
import math
import mlflow
class CustomMlflowCallback(keras.callbacks.Callback):
def on_epoch_begin(self, epoch, logs=None):
mlflow.log_metric("current_epoch", epoch)
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
# Log metrics in log scale
for k, v in logs.items():
if v > 0: # Avoid log(0) or log(negative)
mlflow.log_metric(f"log_{k}", math.log(v), step=epoch)
mlflow.log_metric(k, v, step=epoch)
def on_train_end(self, logs=None):
# Log final model weights statistics
weights = self.model.get_weights()
mlflow.log_metric("total_parameters", sum(w.size for w in weights))
mlflow.log_metric(
"average_weight",
sum(w.sum() for w in weights) / sum(w.size for w in weights),
)
Keras 回调生命周期挂钩
Keras 回调为训练过程提供各种挂钩
- 训练设置:
on_train_begin
、on_train_end
- Epoch 进度:
on_epoch_begin
、on_epoch_end
- 批次进度:
on_batch_begin
、on_batch_end
- 验证:
on_test_begin
、on_test_end
- 预测:
on_predict_begin
、on_predict_end
传递给这些方法的 logs
字典包含诸如
loss
:训练损失val_loss
:验证损失model.compile()
中定义的任何自定义指标
有关完整文档,请参阅 keras.callbacks.Callback。
将您的 TensorFlow 模型保存到 MLflow
基本模型保存
如果您尚未启用自动日志记录(自动保存模型),则可以使用 mlflow.tensorflow.log_model()
手动保存您的 TensorFlow 模型
import mlflow
import tensorflow as tf
from tensorflow import keras
# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)
# Train model (code omitted for brevity)
# Log the model to MLflow
model_info = mlflow.tensorflow.log_model(model, name="model")
# Later, load the model for inference
loaded_model = mlflow.tensorflow.load_model(model_info.model_uri)
predictions = loaded_model.predict(tf.random.uniform([1, 28, 28, 3]))
了解 MLflow 模型保存
当您使用 MLflow 保存 TensorFlow 模型时
- 格式转换:该模型将转换为通用的 MLflow
pyfunc
模型以支持部署,通过mlflow.pyfunc.load_model()
加载 - 保留原始格式:该模型仍然能够作为本机 TensorFlow 对象通过
mlflow.tensorflow.load_model()
加载 - 元数据创建:存储模型元数据,包括依赖项和签名
- 工件存储:该模型被保存到 MLflow 工件存储中
- 加载能力:该模型可以作为本机 TensorFlow 模型或通用的
pyfunc
模型加载
此方法能够实现一致的模型管理,而与使用的框架无关。
模型格式
默认情况下,MLflow 将 TensorFlow 模型保存在 TensorFlow SavedModel 格式(编译图)中,这是部署的理想选择。您也可以保存为其他格式
# Save in H5 format (weights only)
mlflow.tensorflow.log_model(
model, name="model", keras_model_kwargs={"save_format": "h5"}
)
# Save in native Keras format
mlflow.tensorflow.log_model(
model, name="model", keras_model_kwargs={"save_format": "keras"}
)
比较模型格式
TensorFlow SavedModel(默认)
- ✅ 完整的序列化:包括模型架构、权重和编译信息
- ✅ 已准备好部署:针对生产环境进行了优化
- ✅ TensorFlow Serving:与 TensorFlow Serving 兼容
- ✅ 跨平台:可以跨不同平台部署
H5 格式
- ✅ 权重存储:高效存储模型权重
- ✅ 更小尺寸:通常比 SavedModel 格式小
- ❌ 有限的信息:不包括完整的计算图
- ❌ 部署限制:不适合某些部署方案
Keras 格式
- ✅ 本机 Keras:使用 Keras 的本机序列化
- ✅ 兼容性:与较新的 Keras 版本配合良好
- ❌ 部署:可能需要额外的部署步骤
对于大多数生产用例,建议使用默认的 SavedModel 格式。有关更多详细信息,请参阅 TensorFlow 保存和加载指南。
模型签名
模型签名描述了模型的预期输入和输出格式。虽然是可选的,但它是更好地理解和验证模型的最佳实践。添加签名的最简单方法是使用自动推断
import mlflow
from mlflow.models import infer_signature
import tensorflow as tf
import numpy as np
# Sample input data
sample_input = np.random.uniform(size=[2, 28, 28, 3])
# Get predictions
sample_output = model.predict(sample_input)
# Infer signature from data
signature = infer_signature(sample_input, sample_output)
# Log model with inferred signature
model_info = mlflow.tensorflow.log_model(model, name="model", signature=signature)
当启用 log_input_examples=True
和 log_model_signatures=True
的自动日志记录时,MLflow 会自动从您的训练数据中推断并记录签名。
签名出现在 MLflow UI 中
手动签名定义
为了完全控制您的模型签名,您可以手动定义输入和输出架构
import mlflow
import tensorflow as tf
import numpy as np
from tensorflow import keras
from mlflow.types import Schema, TensorSpec
from mlflow.models import ModelSignature
# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)
# Define input schema
input_schema = Schema(
[
TensorSpec(np.dtype(np.float32), (-1, 28, 28, 3), "input"),
]
)
# Create signature with input schema
signature = ModelSignature(inputs=input_schema)
# Log model with signature
model_info = mlflow.tensorflow.log_model(model, name="model", signature=signature)
手动定义在以下情况下很有用
- 您需要精确控制张量规范
- 处理复杂的输入/输出结构
- 自动推理无法捕获您期望的模式
- 您想预先指定确切的数据类型和形状
高级 TensorFlow 集成
复杂模型跟踪
对于更复杂的模型,您可能希望跟踪其他信息
跟踪迁移学习
import mlflow
import tensorflow as tf
import matplotlib.pyplot as plt
# Load pre-trained model
base_model = tf.keras.applications.MobileNetV2(
input_shape=(224, 224, 3), include_top=False, weights="imagenet"
)
# Freeze base model
base_model.trainable = False
# Create new model head
inputs = tf.keras.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(256, activation="relu")(x)
outputs = tf.keras.layers.Dense(10, activation="softmax")(x)
model = tf.keras.Model(inputs, outputs)
with mlflow.start_run() as run:
# Log base model information
mlflow.log_param("base_model", "MobileNetV2")
mlflow.log_param("base_model_trainable", False)
mlflow.log_param("new_layers", "GlobalAveragePooling2D, Dense(256), Dense(10)")
# Log base model summary
with open("base_model_summary.txt", "w") as f:
base_model.summary(print_fn=lambda x: f.write(x + "\n"))
mlflow.log_artifact("base_model_summary.txt")
# Log model visualization
tf.keras.utils.plot_model(model, to_file="model_architecture.png", show_shapes=True)
mlflow.log_artifact("model_architecture.png")
# Continue with normal training...
跟踪多模型实验
import mlflow
# Main experiment run
with mlflow.start_run(run_name="ensemble_experiment") as parent_run:
mlflow.log_param("experiment_type", "ensemble")
# Train first model
with mlflow.start_run(run_name="model_1", nested=True) as child_run_1:
model_1 = create_model_1()
# Train model_1
mlflow.tensorflow.log_model(model_1, name="model_1")
mlflow.log_metric("accuracy", accuracy_1)
# Train second model
with mlflow.start_run(run_name="model_2", nested=True) as child_run_2:
model_2 = create_model_2()
# Train model_2
mlflow.tensorflow.log_model(model_2, name="model_2")
mlflow.log_metric("accuracy", accuracy_2)
# Create and log ensemble model
ensemble_model = create_ensemble([model_1, model_2])
mlflow.tensorflow.log_model(ensemble_model, name="ensemble_model")
mlflow.log_metric("ensemble_accuracy", ensemble_accuracy)
超参数优化
将 TensorFlow 与超参数调整工具结合使用,同时跟踪 MLflow 中的所有内容
import mlflow
import tensorflow as tf
from tensorflow import keras
import optuna
def create_model(trial):
# Define hyperparameters to tune
learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
units = trial.suggest_int("units", 32, 512)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
# Create model with suggested hyperparameters
model = keras.Sequential(
[
keras.layers.Input(shape=(28, 28, 3)),
keras.layers.Flatten(),
keras.layers.Dense(units, activation="relu"),
keras.layers.Dropout(dropout),
keras.layers.Dense(10, activation="softmax"),
]
)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)
return model
def objective(trial):
# Start nested run for this trial
with mlflow.start_run(nested=True):
# Log hyperparameters
params = {
"learning_rate": trial.params["learning_rate"],
"units": trial.params["units"],
"dropout": trial.params["dropout"],
}
mlflow.log_params(params)
# Create and train model
model = create_model(trial)
history = model.fit(
x_train, y_train, validation_data=(x_val, y_val), epochs=5, verbose=0
)
# Get validation accuracy
val_accuracy = max(history.history["val_accuracy"])
mlflow.log_metric("val_accuracy", val_accuracy)
# Log model
mlflow.tensorflow.log_model(model, name="model")
return val_accuracy
# Main experiment run
with mlflow.start_run(run_name="hyperparameter_optimization"):
# Log study parameters
mlflow.log_params(
{
"optimization_framework": "optuna",
"n_trials": 20,
"direction": "maximize",
"metric": "val_accuracy",
}
)
# Create and run study
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)
# Log best parameters and score
mlflow.log_params({f"best_{k}": v for k, v in study.best_params.items()})
mlflow.log_metric("best_val_accuracy", study.best_value)
# Train final model with best parameters
final_model = create_model(study.best_trial)
final_model.fit(x_train, y_train, epochs=10)
mlflow.tensorflow.log_model(final_model, name="best_model")
部署准备
一旦您使用 MLflow 训练和记录了您的 TensorFlow 模型,就可以通过使用 MLflow CLI 的单个命令在本地轻松部署它
mlflow models serve -m models:/<model_id> -p 5000
测试您部署的模型
import requests
import json
# Prepare test data
test_data = {"inputs": sample_input.numpy().tolist()}
# Make prediction request
response = requests.post(
"https://:5000/invocations",
data=json.dumps(test_data),
headers={"Content-Type": "application/json"},
)
predictions = response.json()
print("Predictions:", predictions)
高级部署选项
mlflow models serve
命令支持多个自定义选项
# Specify environment manager
mlflow models serve -m models:/<model_id> -p 5000 --env-manager conda
# Enable MLServer for enhanced inference capabilities
mlflow models serve -m models:/<model_id> -p 5000 --enable-mlserver
# Set custom host
mlflow models serve -m models:/<model_id> -p 5000 --host 0.0.0.0
对于生产部署,请考虑
- 使用 MLServer (
--enable-mlserver
) 以获得更好的性能和其他功能 - 使用
mlflow models build-docker
构建 Docker 镜像 - 部署到 Azure ML 或 Amazon SageMaker 等云平台
- 设置适当的环境管理和依赖项隔离
实际应用
MLflow-TensorFlow 集成在以下情况下表现出色
- 🖼️ 计算机视觉:跟踪 CNN 架构、数据增强策略以及用于图像分类、对象检测和分割的训练动态
- 📝 自然语言处理:监控 Transformer 模型、嵌入以及用于语言理解、翻译和生成的微调
- 📊 时间序列分析:记录用于预测、异常检测和序列预测的 RNN 和 LSTM 模型
- 🏭 生产 ML 系统:对从开发到部署的模型进行版本控制,并具有完整的沿袭跟踪
- 🎓 教育项目:记录从简单到复杂模型的学习进度
- 🧪 实验研究:将新型架构和训练技术与已建立的基线进行比较
结论
MLflow-TensorFlow 集成为跟踪、管理和部署机器学习实验提供了一个全面的解决方案。通过将 TensorFlow 的强大计算能力与 MLflow 的实验跟踪相结合,您可以创建一个工作流程,该工作流程是
- 🔍 透明:模型训练的每个方面都有记录
- 🔄 可重现:实验可以完全重新创建
- 📊 可比较:可以并排评估不同的方法
- 📈 可扩展:从简单的原型到复杂的生产模型
- 👥 协作:团队成员可以分享和建立彼此的工作
无论您是探索新型模型架构的研究人员,还是将模型部署到生产环境的工程师,MLflow-TensorFlow 集成都为有组织的、可重现和可扩展的机器学习开发奠定了基础。