跳到主要内容

MLflow 中的 Keras

在本指南中,我们将向您介绍如何在 MLflow 中使用 Keras。我们将演示如何使用自动日志记录和手动日志记录方法来跟踪您的 Keras 实验并将您的 Keras 模型记录到 MLflow 中。

设置 Keras 后端

Keras 3.0 本身是多后端的,支持 TensorFlow、JAX 和 PyTorch。您必须在导入 Keras 之前设置后端环境变量

import os

# You can use 'tensorflow', 'torch', or 'jax' as backend
# Make sure to set the environment variable before importing Keras
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
import numpy as np
import mlflow
后端选择

后端必须在导入 Keras 之前设置。一旦导入 Keras,后端无法在同一 Python 会话中更改。

这种多后端架构意味着您的 MLflow 跟踪代码无论您选择哪个后端都能一致地工作,让您可以灵活地针对您的特定用例进行优化,而无需更改您的实验跟踪设置。

将 Keras 实验记录到 MLflow

自动记录 Keras 实验

MLflow 提供与 Keras/TensorFlow 的无缝自动日志记录集成。要启用指标、参数和模型的自动日志记录,只需在您的训练代码之前调用 mlflow.tensorflow.autolog()mlflow.autolog()

import mlflow
import mlflow.tensorflow

# Enable autologging for TensorFlow/Keras
mlflow.tensorflow.autolog()

# Your existing Keras training code works unchanged
model.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=10)
版本支持

仅支持 tensorflow>=2.3 版本。自动日志记录功能从 tf.kerastf.keras.callbacks.EarlyStopping 捕获指标。

自动记录了什么

自动日志记录捕获有关 Keras 训练的全面信息

框架指标参数工件
tf.keras训练损失;验证损失;用户指定的指标fit() 参数;优化器名称;学习率;epsilon训练开始时的模型摘要;MLflow 模型(Keras 模型);训练结束时的 TensorBoard 日志
tf.keras.callbacks.EarlyStopping来自 EarlyStopping 回调的指标:stopped_epochrestored_epochrestore_best_weight来自 EarlyStopping 的 fit() 参数:min_deltapatiencebaselinerestore_best_weights--

自动运行管理

MLflow 在使用自动日志记录时智能管理运行

  • 无活动运行:如果 autolog() 捕获数据时没有活动运行,MLflow 会自动创建一个运行并在通过 tf.keras.fit() 完成训练后结束它
  • 现有运行:如果运行已存在,MLflow 会将日志记录到该运行中,但不会自动结束它 - 如果需要,您必须手动停止运行

手动记录 Keras 实验

为了更好地控制日志记录内容,您可以使用 MLflow 的日志记录 API 手动检测您的 Keras 训练代码

手动日志记录的最佳实践

手动记录 Keras 实验时,请遵循以下最佳实践

完整手动日志记录示例

这是一个手动记录 Keras 实验的端到端示例

import mlflow
import mlflow.keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

# Load and prepare data
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255
x_test = x_test.reshape(10000, 784).astype("float32") / 255
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)


# Define model architecture
def create_model():
model = keras.Sequential(
[
layers.Dense(512, activation="relu", input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(256, activation="relu"),
layers.Dropout(0.2),
layers.Dense(10, activation="softmax"),
]
)
return model


# Training parameters
params = {
"epochs": 10,
"batch_size": 128,
"learning_rate": 0.001,
"optimizer": "adam",
"loss_function": "categorical_crossentropy",
"dropout_rate": 0.2,
"hidden_units": [512, 256],
}

with mlflow.start_run():
# Log training parameters
mlflow.log_params(params)

# Create and compile model
model = create_model()
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=params["learning_rate"]),
loss=params["loss_function"],
metrics=["accuracy"],
)

# Log model architecture
with open("model_summary.txt", "w") as f:
model.summary(print_fn=lambda x: f.write(x + "\n"))
mlflow.log_artifact("model_summary.txt")

# Custom callback for logging metrics
class MLflowCallback(keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
if logs:
mlflow.log_metrics(
{
"train_loss": logs.get("loss"),
"train_accuracy": logs.get("accuracy"),
"val_loss": logs.get("val_loss"),
"val_accuracy": logs.get("val_accuracy"),
},
step=epoch,
)

# Train model with custom callback
history = model.fit(
x_train,
y_train,
batch_size=params["batch_size"],
epochs=params["epochs"],
validation_data=(x_test, y_test),
callbacks=[MLflowCallback()],
verbose=1,
)

# Evaluate model
test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)
mlflow.log_metrics({"test_loss": test_loss, "test_accuracy": test_accuracy})

# Log the trained model
mlflow.keras.log_model(model, name="model")

print(f"Test accuracy: {test_accuracy:.4f}")

如果您使用本地 MLflow 服务器运行此代码,您将在 MLflow UI 中看到全面的跟踪。

使用 MLflow 的 Keras 回调

MLflow 为 Keras 提供了一个内置回调,可简化实验跟踪。mlflow.keras.MlflowCallback() 与您的 Keras 训练循环无缝集成

import mlflow
import mlflow.keras
from mlflow.keras import MlflowCallback

# Create model and prepare data (same as above)
model = create_model()
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

with mlflow.start_run() as run:
# Use MLflow's built-in callback
mlflow_callback = MlflowCallback(run)

history = model.fit(
x_train,
y_train,
batch_size=128,
epochs=10,
validation_data=(x_test, y_test),
callbacks=[mlflow_callback],
verbose=1,
)

高级回调用法

MlflowCallback 支持各种配置选项

# Log metrics every 5 batches instead of every epoch
mlflow_callback = MlflowCallback(run, log_every_epoch=False, log_every_n_steps=5)


# Custom callback subclass for specialized logging
class CustomMlflowCallback(MlflowCallback):
def on_epoch_end(self, epoch, logs=None):
# Call parent method
super().on_epoch_end(epoch, logs)

# Add custom logging
if logs and epoch % 5 == 0: # Log every 5 epochs
mlflow.log_metric(
"learning_rate", self.model.optimizer.learning_rate.numpy()
)

def on_train_end(self, logs=None):
# Log final model weights distribution
weights = self.model.get_weights()
avg_weight = np.mean([np.mean(w) for w in weights])
mlflow.log_metric("avg_final_weight", avg_weight)

将 Keras 模型保存到 MLflow

基本模型保存

使用 mlflow.keras.log_model() 保存您训练好的 Keras 模型

import mlflow
import mlflow.keras
import numpy as np

# Train your model
model = create_model()
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
model.fit(x_train, y_train, epochs=5)

model_info = mlflow.keras.log_model(model, name="model")

# Load and use the model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

# Make predictions
predictions = loaded_model.predict(x_test[:5])
print("Predictions:", predictions)

模型签名

模型签名描述了模型的输入和输出模式。虽然不是必需的,但它是更好地理解和验证模型的最佳实践

import mlflow
from mlflow.models import infer_signature
import numpy as np

# Prepare sample data for signature inference
sample_input = x_test[:100]
sample_predictions = model.predict(sample_input)

# Infer signature from sample data
signature = infer_signature(sample_input, sample_predictions)

# Log model with signature
model_info = mlflow.keras.log_model(model, name="model", signature=signature)

您还可以手动创建签名以进行更多控制

from mlflow.types import Schema, TensorSpec
from mlflow.models import ModelSignature
import numpy as np

# Define input and output schemas
input_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 784))])
output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 10))])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

model_info = mlflow.keras.log_model(model, name="model", signature=signature)

Keras 3.0 的多后端支持

正如本指南开头所述,Keras 3.0 的多后端支持是其最强大的功能之一。MLflow 的跟踪在所有受支持的后端上无缝工作

import os
import mlflow

# Switch backends easily - MLflow tracking code remains identical
os.environ["KERAS_BACKEND"] = "jax" # or "torch" or "tensorflow"

import keras
import mlflow.keras

# Enable autologging (works with any backend)
mlflow.tensorflow.autolog()

# Your training code is backend-agnostic
model = keras.Sequential(
[
keras.layers.Dense(64, activation="relu"),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

with mlflow.start_run():
model.fit(x_train, y_train, epochs=5, validation_split=0.2)

这种一致性意味着您可以

  • 尝试不同的后端而无需更改 MLflow 跟踪代码
  • 通过为您的硬件选择最佳后端来优化性能(JAX 用于 TPU,PyTorch 用于研究灵活性,TensorFlow 用于生产)
  • 在不同的计算环境中保持可重现性

高级功能

使用 Keras 和 MLflow 进行超参数调整

将 Keras 与超参数调整库结合使用,同时在 MLflow 中跟踪所有内容

import mlflow
import optuna
from sklearn.model_selection import train_test_split


def objective(trial):
# Suggest hyperparameters
lr = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [32, 64, 128])
hidden_units = trial.suggest_int("hidden_units", 64, 512)
dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)

with mlflow.start_run(nested=True):
# Log trial parameters
mlflow.log_params(
{
"learning_rate": lr,
"batch_size": batch_size,
"hidden_units": hidden_units,
"dropout_rate": dropout_rate,
}
)

# Create model with suggested parameters
model = keras.Sequential(
[
keras.layers.Dense(hidden_units, activation="relu", input_shape=(784,)),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer=keras.optimizers.Adam(learning_rate=lr),
loss="categorical_crossentropy",
metrics=["accuracy"],
)

# Train model
history = model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=10,
validation_split=0.2,
verbose=0,
)

# Get validation accuracy
val_accuracy = max(history.history["val_accuracy"])
mlflow.log_metric("val_accuracy", val_accuracy)

return val_accuracy


# Run hyperparameter optimization
with mlflow.start_run():
mlflow.set_tag("optimization", "optuna")
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

# Log best parameters
mlflow.log_params(study.best_params)
mlflow.log_metric("best_val_accuracy", study.best_value)

自定义指标和工件

记录特定于您的用例的自定义可视化和指标

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report


def log_training_plots(history, run_id):
"""Log training history plots to MLflow."""

# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

ax1.plot(history.history["loss"], label="Training Loss")
ax1.plot(history.history["val_loss"], label="Validation Loss")
ax1.set_title("Model Loss")
ax1.set_xlabel("Epoch")
ax1.set_ylabel("Loss")
ax1.legend()

ax2.plot(history.history["accuracy"], label="Training Accuracy")
ax2.plot(history.history["val_accuracy"], label="Validation Accuracy")
ax2.set_title("Model Accuracy")
ax2.set_xlabel("Epoch")
ax2.set_ylabel("Accuracy")
ax2.legend()

plt.tight_layout()
plt.savefig("training_history.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("training_history.png")
plt.close()


def log_evaluation_metrics(model, x_test, y_test, class_names):
"""Log comprehensive evaluation metrics."""

# Get predictions
y_pred = model.predict(x_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)

# Confusion matrix
cm = confusion_matrix(y_true_classes, y_pred_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(
cm,
annot=True,
fmt="d",
cmap="Blues",
xticklabels=class_names,
yticklabels=class_names,
)
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.savefig("confusion_matrix.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("confusion_matrix.png")
plt.close()

# Classification report
report = classification_report(
y_true_classes, y_pred_classes, target_names=class_names, output_dict=True
)

# Log per-class metrics
for class_name in class_names:
if class_name in report:
mlflow.log_metrics(
{
f"{class_name}_precision": report[class_name]["precision"],
f"{class_name}_recall": report[class_name]["recall"],
f"{class_name}_f1": report[class_name]["f1-score"],
}
)


# Usage example
with mlflow.start_run():
# Train model
history = model.fit(
x_train, y_train, validation_data=(x_test, y_test), epochs=10, verbose=1
)

# Log comprehensive results
log_training_plots(history, mlflow.active_run().info.run_id)
log_evaluation_metrics(
model, x_test, y_test, class_names=[str(i) for i in range(10)]
)

结论

MLflow 与 Keras 的集成提供了用于深度学习工作流中实验跟踪和模型管理的综合解决方案。无论您选择自动日志记录以简化操作,还是手动日志记录以进行精细控制,MLflow 都会捕获可重现的机器学习研究和生产部署所需的所有基本信息。

将 MLflow 与 Keras 结合使用的主要优点包括

  • 无缝集成:一键式自动日志记录设置,提供全面跟踪
  • 多后端支持:跨 TensorFlow、JAX 和 PyTorch 后端的一致跟踪
  • 灵活的日志记录:在自动和手动日志记录方法之间进行选择
  • 生产就绪:内置模型服务和部署功能
  • 协作开发:通过 MLflow 直观的 UI 共享实验和模型

无论您是进行研究实验还是构建生产 ML 系统,MLflow-Keras 集成为有组织、可重现和可扩展的深度学习工作流奠定了基础。