跳到主要内容

MLflow TensorFlow 集成

简介

TensorFlow 是由 Google 开发的端到端开源机器学习平台。它提供了一个全面的生态系统,用于构建和部署机器学习模型,从研究原型到生产系统。TensorFlow 的 Keras API 提供了用于构建神经网络的直观界面,而其强大的后端则支持在 CPU、GPU 和 TPU 上进行高效计算。

MLflow 的 TensorFlow 集成提供了深度学习工作流的实验跟踪、模型版本管理和部署功能。

为什么选择 MLflow + TensorFlow?

自动日志记录

用一行代码实现全面的实验跟踪:mlflow.tensorflow.autolog() 自动记录指标、参数和模型。

实验跟踪

跟踪所有 TensorFlow 实验的训练指标、超参数、模型架构和工件。

模型注册表

使用 MLflow 的模型注册表和模型服务基础设施,对 TensorFlow 模型进行版本管理、分阶段和部署。

可复现性

捕获模型状态、训练配置和环境,以实现可复现的实验。

自动日志记录

通过一行代码实现全面的自动日志记录

python
import mlflow
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Enable autologging
mlflow.tensorflow.autolog()

# Prepare sample data
data = np.random.uniform(size=[20, 28, 28, 3])
label = np.random.randint(2, size=20)

# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)

model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(0.001),
metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Training with automatic logging
with mlflow.start_run():
model.fit(data, label, batch_size=5, epochs=2)

自动日志记录会自动捕获训练指标、模型参数、优化器配置和模型工件。需要 TensorFlow >= 2.3.0 和 model.fit() Keras API。

配置自动日志记录行为

python
mlflow.tensorflow.autolog(
log_models=True,
log_input_examples=True,
log_model_signatures=True,
log_every_n_steps=1,
)

使用 Keras 回调进行手动日志记录

为了获得更多控制,请使用 mlflow.tensorflow.MlflowCallback()

python
import mlflow
import numpy as np
from tensorflow import keras

# Prepare sample data
data = np.random.uniform(size=[100, 28, 28, 3])
labels = np.random.randint(2, size=100)

# Define and compile your model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 3),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2, activation="softmax"),
]
)

model.compile(
loss="sparse_categorical_crossentropy",
optimizer=keras.optimizers.Adam(0.001),
metrics=["accuracy"],
)

# Create an MLflow run and add the callback
with mlflow.start_run() as run:
model.fit(
data,
labels,
batch_size=32,
epochs=10,
callbacks=[mlflow.tensorflow.MlflowCallback(run)],
)

自定义回调

通过继承 keras.callbacks.Callback 来创建自定义日志记录逻辑

python
from tensorflow import keras
import math
import mlflow


class CustomMlflowCallback(keras.callbacks.Callback):
def on_epoch_begin(self, epoch, logs=None):
mlflow.log_metric("current_epoch", epoch)

def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
# Log metrics in log scale
for k, v in logs.items():
if v > 0:
mlflow.log_metric(f"log_{k}", math.log(v), step=epoch)
mlflow.log_metric(k, v, step=epoch)

def on_train_end(self, logs=None):
# Log final model weights statistics
weights = self.model.get_weights()
mlflow.log_metric("total_parameters", sum(w.size for w in weights))

模型日志记录

使用 mlflow.tensorflow.log_model() 保存 TensorFlow 模型

python
import mlflow
import tensorflow as tf
from tensorflow import keras

# Define model
model = keras.Sequential(
[
keras.Input([28, 28, 3]),
keras.layers.Conv2D(8, 2),
keras.layers.MaxPool2D(2),
keras.layers.Flatten(),
keras.layers.Dense(2),
keras.layers.Softmax(),
]
)

# Train model (code omitted for brevity)

# Log the model to MLflow
model_info = mlflow.tensorflow.log_model(model, name="model")

# Later, load the model for inference
loaded_model = mlflow.tensorflow.load_model(model_info.model_uri)
predictions = loaded_model.predict(tf.random.uniform([1, 28, 28, 3]))

超参数优化

通过 MLflow 跟踪超参数调优

python
import mlflow
import tensorflow as tf
from tensorflow import keras
import optuna


def objective(trial, x_train, y_train, x_val, y_val):
"""Optuna objective for TensorFlow hyperparameter tuning."""
with mlflow.start_run(nested=True):
# Define hyperparameter search space
params = {
"learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True),
"units": trial.suggest_int("units", 32, 512),
"dropout": trial.suggest_float("dropout", 0.1, 0.5),
}

# Create model with hyperparameters
model = keras.Sequential(
[
keras.layers.Input(shape=(28, 28, 3)),
keras.layers.Flatten(),
keras.layers.Dense(params["units"], activation="relu"),
keras.layers.Dropout(params["dropout"]),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer=keras.optimizers.Adam(learning_rate=params["learning_rate"]),
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)

# Train and evaluate
history = model.fit(
x_train, y_train, validation_data=(x_val, y_val), epochs=5, verbose=0
)

val_accuracy = max(history.history["val_accuracy"])
mlflow.log_metric("val_accuracy", val_accuracy)

return val_accuracy


# Main experiment run
with mlflow.start_run(run_name="tensorflow_hyperparameter_optimization"):
study = optuna.create_study(direction="maximize")
study.optimize(
lambda trial: objective(trial, x_train, y_train, x_val, y_val), n_trials=20
)

# Log best parameters and results
mlflow.log_params({f"best_{k}": v for k, v in study.best_params.items()})
mlflow.log_metric("best_val_accuracy", study.best_value)

模型注册表集成

注册 TensorFlow 模型以进行版本控制和部署

python
import mlflow
from tensorflow import keras
from mlflow import MlflowClient

client = MlflowClient()

with mlflow.start_run():
# Create model for demonstration
model = keras.Sequential(
[
keras.layers.Conv2D(32, 3, activation="relu", input_shape=(224, 224, 3)),
keras.layers.MaxPooling2D(2),
keras.layers.Flatten(),
keras.layers.Dense(10, activation="softmax"),
]
)

# Log model to registry
model_info = mlflow.tensorflow.log_model(
model, name="tensorflow_model", registered_model_name="ImageClassifier"
)

# Tag for tracking
mlflow.set_tags(
{"model_type": "cnn", "dataset": "imagenet", "framework": "tensorflow"}
)

# Set model alias for deployment
client.set_registered_model_alias(
name="ImageClassifier",
alias="champion",
version=model_info.registered_model_version,
)

了解更多