跳到主要内容

MLflow 中的 Keras

在本指南中,我们将向您介绍如何将 Keras 与 MLflow 一起使用。我们将演示如何跟踪您的 Keras 实验,并使用自动日志记录和手动日志记录方法将您的 Keras 模型记录到 MLflow 中。

设置 Keras 后端

Keras 3.0 本质上是多后端的,支持 TensorFlow、JAX 和 PyTorch。您必须在导入 Keras 之前设置后端环境变量

import os

# You can use 'tensorflow', 'torch', or 'jax' as backend
# Make sure to set the environment variable before importing Keras
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
import numpy as np
import mlflow
backend-selection

必须在导入 Keras 之前设置后端。导入 Keras 后,无法在同一 Python 会话中更改后端。

这种多后端架构意味着您的 MLflow 跟踪代码可以始终如一地工作,无论您选择哪个后端,这使您可以灵活地针对您的特定用例进行优化,而无需更改您的实验跟踪设置。

将 Keras 实验记录到 MLflow

自动记录 Keras 实验

MLflow 提供与 Keras/TensorFlow 的无缝自动日志记录集成。要启用指标、参数和模型的自动日志记录,只需在您的训练代码之前调用 mlflow.tensorflow.autolog()mlflow.autolog()

import mlflow
import mlflow.tensorflow

# Enable autologging for TensorFlow/Keras
mlflow.tensorflow.autolog()

# Your existing Keras training code works unchanged
model.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=10)
version-support

仅支持 tensorflow>=2.3 版本。自动日志记录功能从 tf.kerastf.keras.callbacks.EarlyStopping 捕获指标。

自动记录的内容

自动日志记录捕获有关您的 Keras 训练的全面信息

框架指标参数工件
tf.keras训练损失;验证损失;用户指定的指标fit() 参数;优化器名称;学习率;epsilon训练开始时的模型摘要;MLflow 模型(Keras 模型);训练结束时的 TensorBoard 日志
tf.keras.callbacks.EarlyStopping来自 EarlyStopping 回调的指标:stopped_epochrestored_epochrestore_best_weight来自 EarlyStopping 的 fit() 参数:min_deltapatiencebaselinerestore_best_weights--

自动运行管理

使用自动日志记录时,MLflow 智能地管理运行

  • 没有活动的运行:如果 autolog() 捕获数据时不存在活动的运行,MLflow 会自动创建一个运行,并在通过 tf.keras.fit() 完成训练后结束它
  • 现有运行:如果运行已存在,MLflow 会记录到该运行,但不会自动结束它 - 如果需要,您必须手动停止该运行

手动记录 Keras 实验

为了更好地控制记录的内容,您可以使用 MLflow 的日志记录 API 手动检测您的 Keras 训练代码

手动日志记录的最佳实践

手动记录 Keras 实验时,请遵循以下最佳实践

完整的手动日志记录示例

这是一个手动记录 Keras 实验的端到端示例

import mlflow
import mlflow.keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

# Load and prepare data
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255
x_test = x_test.reshape(10000, 784).astype("float32") / 255
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)


# Define model architecture
def create_model():
model = keras.Sequential(
[
layers.Dense(512, activation="relu", input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(256, activation="relu"),
layers.Dropout(0.2),
layers.Dense(10, activation="softmax"),
]
)
return model


# Training parameters
params = {
"epochs": 10,
"batch_size": 128,
"learning_rate": 0.001,
"optimizer": "adam",
"loss_function": "categorical_crossentropy",
"dropout_rate": 0.2,
"hidden_units": [512, 256],
}

with mlflow.start_run():
# Log training parameters
mlflow.log_params(params)

# Create and compile model
model = create_model()
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=params["learning_rate"]),
loss=params["loss_function"],
metrics=["accuracy"],
)

# Log model architecture
with open("model_summary.txt", "w") as f:
model.summary(print_fn=lambda x: f.write(x + "\n"))
mlflow.log_artifact("model_summary.txt")

# Custom callback for logging metrics
class MLflowCallback(keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
if logs:
mlflow.log_metrics(
{
"train_loss": logs.get("loss"),
"train_accuracy": logs.get("accuracy"),
"val_loss": logs.get("val_loss"),
"val_accuracy": logs.get("val_accuracy"),
},
step=epoch,
)

# Train model with custom callback
history = model.fit(
x_train,
y_train,
batch_size=params["batch_size"],
epochs=params["epochs"],
validation_data=(x_test, y_test),
callbacks=[MLflowCallback()],
verbose=1,
)

# Evaluate model
test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)
mlflow.log_metrics({"test_loss": test_loss, "test_accuracy": test_accuracy})

# Log the trained model
mlflow.keras.log_model(model, name="model")

print(f"Test accuracy: {test_accuracy:.4f}")

如果您使用本地 MLflow 服务器运行此代码,您将在 MLflow UI 中看到全面的跟踪。

使用 MLflow 的 Keras 回调

MLflow 为 Keras 提供了一个内置回调,简化了实验跟踪。mlflow.keras.MlflowCallback() 与您的 Keras 训练循环无缝集成

import mlflow
import mlflow.keras
from mlflow.keras import MlflowCallback

# Create model and prepare data (same as above)
model = create_model()
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

with mlflow.start_run() as run:
# Use MLflow's built-in callback
mlflow_callback = MlflowCallback(run)

history = model.fit(
x_train,
y_train,
batch_size=128,
epochs=10,
validation_data=(x_test, y_test),
callbacks=[mlflow_callback],
verbose=1,
)

高级回调用法

MlflowCallback 支持各种配置选项

# Log metrics every 5 batches instead of every epoch
mlflow_callback = MlflowCallback(run, log_every_epoch=False, log_every_n_steps=5)


# Custom callback subclass for specialized logging
class CustomMlflowCallback(MlflowCallback):
def on_epoch_end(self, epoch, logs=None):
# Call parent method
super().on_epoch_end(epoch, logs)

# Add custom logging
if logs and epoch % 5 == 0: # Log every 5 epochs
mlflow.log_metric(
"learning_rate", self.model.optimizer.learning_rate.numpy()
)

def on_train_end(self, logs=None):
# Log final model weights distribution
weights = self.model.get_weights()
avg_weight = np.mean([np.mean(w) for w in weights])
mlflow.log_metric("avg_final_weight", avg_weight)

将您的 Keras 模型保存到 MLflow

基本模型保存

使用 mlflow.keras.log_model() 保存您训练的 Keras 模型

import mlflow
import mlflow.keras
import numpy as np

# Train your model
model = create_model()
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
model.fit(x_train, y_train, epochs=5)

model_info = mlflow.keras.log_model(model, name="model")

# Load and use the model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

# Make predictions
predictions = loaded_model.predict(x_test[:5])
print("Predictions:", predictions)

模型签名

模型签名描述了模型的输入和输出模式。虽然不是必需的,但它是更好理解和验证模型的最佳实践

import mlflow
from mlflow.models import infer_signature
import numpy as np

# Prepare sample data for signature inference
sample_input = x_test[:100]
sample_predictions = model.predict(sample_input)

# Infer signature from sample data
signature = infer_signature(sample_input, sample_predictions)

# Log model with signature
model_info = mlflow.keras.log_model(model, name="model", signature=signature)

您还可以手动创建签名以进行更多控制

from mlflow.types import Schema, TensorSpec
from mlflow.models import ModelSignature
import numpy as np

# Define input and output schemas
input_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 784))])
output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 10))])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

model_info = mlflow.keras.log_model(model, name="model", signature=signature)

Keras 3.0 的多后端支持

正如本指南开头提到的,Keras 3.0 的多后端支持是其最强大的功能之一。MLflow 的跟踪可以在所有支持的后端上无缝工作

import os
import mlflow

# Switch backends easily - MLflow tracking code remains identical
os.environ["KERAS_BACKEND"] = "jax" # or "torch" or "tensorflow"

import keras
import mlflow.keras

# Enable autologging (works with any backend)
mlflow.tensorflow.autolog()

# Your training code is backend-agnostic
model = keras.Sequential(
[
keras.layers.Dense(64, activation="relu"),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

with mlflow.start_run():
model.fit(x_train, y_train, epochs=5, validation_split=0.2)

这种一致性意味着您可以

  • 在不更改您的 MLflow 跟踪代码的情况下尝试不同的后端
  • 通过为您的硬件选择最佳后端(TPU 的 JAX、研究灵活性的 PyTorch、生产的 TensorFlow)来优化性能
  • 在不同的计算环境中保持可重复性

高级功能

使用 Keras 和 MLflow 进行超参数调整

将 Keras 与超参数调整库结合使用,同时在 MLflow 中跟踪所有内容

import mlflow
import optuna
from sklearn.model_selection import train_test_split


def objective(trial):
# Suggest hyperparameters
lr = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [32, 64, 128])
hidden_units = trial.suggest_int("hidden_units", 64, 512)
dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)

with mlflow.start_run(nested=True):
# Log trial parameters
mlflow.log_params(
{
"learning_rate": lr,
"batch_size": batch_size,
"hidden_units": hidden_units,
"dropout_rate": dropout_rate,
}
)

# Create model with suggested parameters
model = keras.Sequential(
[
keras.layers.Dense(hidden_units, activation="relu", input_shape=(784,)),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer=keras.optimizers.Adam(learning_rate=lr),
loss="categorical_crossentropy",
metrics=["accuracy"],
)

# Train model
history = model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=10,
validation_split=0.2,
verbose=0,
)

# Get validation accuracy
val_accuracy = max(history.history["val_accuracy"])
mlflow.log_metric("val_accuracy", val_accuracy)

return val_accuracy


# Run hyperparameter optimization
with mlflow.start_run():
mlflow.set_tag("optimization", "optuna")
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

# Log best parameters
mlflow.log_params(study.best_params)
mlflow.log_metric("best_val_accuracy", study.best_value)

自定义指标和工件

记录特定于您的用例的自定义可视化和指标

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report


def log_training_plots(history, run_id):
"""Log training history plots to MLflow."""

# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

ax1.plot(history.history["loss"], label="Training Loss")
ax1.plot(history.history["val_loss"], label="Validation Loss")
ax1.set_title("Model Loss")
ax1.set_xlabel("Epoch")
ax1.set_ylabel("Loss")
ax1.legend()

ax2.plot(history.history["accuracy"], label="Training Accuracy")
ax2.plot(history.history["val_accuracy"], label="Validation Accuracy")
ax2.set_title("Model Accuracy")
ax2.set_xlabel("Epoch")
ax2.set_ylabel("Accuracy")
ax2.legend()

plt.tight_layout()
plt.savefig("training_history.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("training_history.png")
plt.close()


def log_evaluation_metrics(model, x_test, y_test, class_names):
"""Log comprehensive evaluation metrics."""

# Get predictions
y_pred = model.predict(x_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)

# Confusion matrix
cm = confusion_matrix(y_true_classes, y_pred_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(
cm,
annot=True,
fmt="d",
cmap="Blues",
xticklabels=class_names,
yticklabels=class_names,
)
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.savefig("confusion_matrix.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("confusion_matrix.png")
plt.close()

# Classification report
report = classification_report(
y_true_classes, y_pred_classes, target_names=class_names, output_dict=True
)

# Log per-class metrics
for class_name in class_names:
if class_name in report:
mlflow.log_metrics(
{
f"{class_name}_precision": report[class_name]["precision"],
f"{class_name}_recall": report[class_name]["recall"],
f"{class_name}_f1": report[class_name]["f1-score"],
}
)


# Usage example
with mlflow.start_run():
# Train model
history = model.fit(
x_train, y_train, validation_data=(x_test, y_test), epochs=10, verbose=1
)

# Log comprehensive results
log_training_plots(history, mlflow.active_run().info.run_id)
log_evaluation_metrics(
model, x_test, y_test, class_names=[str(i) for i in range(10)]
)

结论

MLflow 与 Keras 的集成为深度学习工作流程中的实验跟踪和模型管理提供了全面的解决方案。无论您选择自动日志记录以实现简单性,还是选择手动日志记录以实现精细控制,MLflow 都会捕获可重现的机器学习研究和生产部署所需的所有基本信息。

将 MLflow 与 Keras 一起使用的主要优势包括

  • 无缝集成:一行的自动日志记录设置,具有全面的跟踪
  • 多后端支持:跨 TensorFlow、JAX 和 PyTorch 后端的一致跟踪
  • 灵活的日志记录:在自动和手动日志记录方法之间进行选择
  • 生产就绪:内置的模型服务和部署功能
  • 协作开发:通过 MLflow 的直观 UI 共享实验和模型

无论您是进行研究实验还是构建生产 ML 系统,MLflow-Keras 集成都为有组织、可重现且可扩展的深度学习工作流程奠定了基础。