跳到主要内容

MLflow 中的 Keras

在本指南中,我们将引导您完成如何在 MLflow 中使用 Keras。我们将演示如何跟踪您的 Keras 实验,并使用自动日志记录和手动日志记录方法将 Keras 模型记录到 MLflow。

设置 Keras 后端

Keras 3.0 本身就支持多后端,支持 TensorFlow、JAX 和 PyTorch。您必须在导入 Keras 之前设置后端环境变量。

import os

# You can use 'tensorflow', 'torch', or 'jax' as backend
# Make sure to set the environment variable before importing Keras
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
import numpy as np
import mlflow
backend-selection

必须在导入 Keras 之前设置后端。一旦导入 Keras,在同一个 Python 会话中就无法更改后端。

这种多后端架构意味着无论您选择哪个后端,您的 MLflow 跟踪代码都能一致运行,让您无需更改实验跟踪设置即可灵活地针对特定用例进行优化。

将 Keras 实验记录到 MLflow

自动记录 Keras 实验

MLflow 为 Keras/TensorFlow 提供了无缝的自动日志记录集成。要启用指标、参数和模型的自动记录,只需在训练代码之前调用 mlflow.tensorflow.autolog()mlflow.autolog()

import mlflow
import mlflow.tensorflow

# Enable autologging for TensorFlow/Keras
mlflow.tensorflow.autolog()

# Your existing Keras training code works unchanged
model.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=10)
version-support

仅支持 tensorflow>=2.3 版本。自动日志记录功能可捕获 tf.kerastf.keras.callbacks.EarlyStopping 的指标。

自动记录的内容

自动日志记录可捕获有关 Keras 训练的全面信息

框架指标参数工件
tf.keras训练损失;验证损失;用户指定的指标fit() 参数;优化器名称;学习率;epsilon训练开始时的模型摘要;MLflow 模型(Keras 模型);训练结束时的 TensorBoard 日志
tf.keras.callbacks.EarlyStoppingEarlyStopping 回调中的指标:stopped_epochrestored_epochrestore_best_weightEarlyStopping 的 fit() 参数:min_deltapatiencebaselinerestore_best_weights--

自动运行管理

使用自动日志记录时,MLflow 会智能地管理运行

  • 无活动运行:如果在 autolog() 捕获数据时没有活动运行,MLflow 会自动创建一个运行,并在训练通过 tf.keras.fit() 完成后结束该运行。
  • 现有运行:如果已有运行,MLflow 会记录到该运行,但不会自动结束它——如果需要,您必须手动停止该运行。

手动记录 Keras 实验

为了更精细地控制记录的内容,您可以使用 MLflow 的日志记录 API 手动检测您的 Keras 训练代码。

手动日志记录的最佳实践

在手动记录 Keras 实验时,请遵循以下最佳实践:

完整的代码手动日志记录示例

以下是手动记录 Keras 实验的端到端示例。

import mlflow
import mlflow.keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

# Load and prepare data
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255
x_test = x_test.reshape(10000, 784).astype("float32") / 255
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)


# Define model architecture
def create_model():
model = keras.Sequential(
[
layers.Dense(512, activation="relu", input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(256, activation="relu"),
layers.Dropout(0.2),
layers.Dense(10, activation="softmax"),
]
)
return model


# Training parameters
params = {
"epochs": 10,
"batch_size": 128,
"learning_rate": 0.001,
"optimizer": "adam",
"loss_function": "categorical_crossentropy",
"dropout_rate": 0.2,
"hidden_units": [512, 256],
}

with mlflow.start_run():
# Log training parameters
mlflow.log_params(params)

# Create and compile model
model = create_model()
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=params["learning_rate"]),
loss=params["loss_function"],
metrics=["accuracy"],
)

# Log model architecture
with open("model_summary.txt", "w") as f:
model.summary(print_fn=lambda x: f.write(x + "\n"))
mlflow.log_artifact("model_summary.txt")

# Custom callback for logging metrics
class MLflowCallback(keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
if logs:
mlflow.log_metrics(
{
"train_loss": logs.get("loss"),
"train_accuracy": logs.get("accuracy"),
"val_loss": logs.get("val_loss"),
"val_accuracy": logs.get("val_accuracy"),
},
step=epoch,
)

# Train model with custom callback
history = model.fit(
x_train,
y_train,
batch_size=params["batch_size"],
epochs=params["epochs"],
validation_data=(x_test, y_test),
callbacks=[MLflowCallback()],
verbose=1,
)

# Evaluate model
test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)
mlflow.log_metrics({"test_loss": test_loss, "test_accuracy": test_accuracy})

# Log the trained model
mlflow.keras.log_model(model, name="model")

print(f"Test accuracy: {test_accuracy:.4f}")

如果使用本地 MLflow 服务器运行此代码,您将在 MLflow UI 中看到全面的跟踪。

使用 MLflow 的 Keras 回调

MLflow 提供了一个内置的 Keras 回调,可简化实验跟踪。 mlflow.keras.MlflowCallback() 可与您的 Keras 训练循环无缝集成。

import mlflow
import mlflow.keras
from mlflow.keras import MlflowCallback

# Create model and prepare data (same as above)
model = create_model()
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

with mlflow.start_run() as run:
# Use MLflow's built-in callback
mlflow_callback = MlflowCallback(run)

history = model.fit(
x_train,
y_train,
batch_size=128,
epochs=10,
validation_data=(x_test, y_test),
callbacks=[mlflow_callback],
verbose=1,
)

高级回调用法

MlflowCallback 支持各种配置选项。

# Log metrics every 5 batches instead of every epoch
mlflow_callback = MlflowCallback(run, log_every_epoch=False, log_every_n_steps=5)


# Custom callback subclass for specialized logging
class CustomMlflowCallback(MlflowCallback):
def on_epoch_end(self, epoch, logs=None):
# Call parent method
super().on_epoch_end(epoch, logs)

# Add custom logging
if logs and epoch % 5 == 0: # Log every 5 epochs
mlflow.log_metric(
"learning_rate", self.model.optimizer.learning_rate.numpy()
)

def on_train_end(self, logs=None):
# Log final model weights distribution
weights = self.model.get_weights()
avg_weight = np.mean([np.mean(w) for w in weights])
mlflow.log_metric("avg_final_weight", avg_weight)

将 Keras 模型保存到 MLflow

基本模型保存

使用 mlflow.keras.log_model() 保存您训练好的 Keras 模型。

import mlflow
import mlflow.keras
import numpy as np

# Train your model
model = create_model()
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
model.fit(x_train, y_train, epochs=5)

model_info = mlflow.keras.log_model(model, name="model")

# Load and use the model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

# Make predictions
predictions = loaded_model.predict(x_test[:5])
print("Predictions:", predictions)

模型签名

模型签名描述了模型的输入和输出架构。虽然不是必需的,但为了更好地理解和验证模型,它是一个最佳实践。

import mlflow
from mlflow.models import infer_signature
import numpy as np

# Prepare sample data for signature inference
sample_input = x_test[:100]
sample_predictions = model.predict(sample_input)

# Infer signature from sample data
signature = infer_signature(sample_input, sample_predictions)

# Log model with signature
model_info = mlflow.keras.log_model(model, name="model", signature=signature)

您也可以手动创建签名以获得更多控制权。

from mlflow.types import Schema, TensorSpec
from mlflow.models import ModelSignature
import numpy as np

# Define input and output schemas
input_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 784))])
output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 10))])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

model_info = mlflow.keras.log_model(model, name="model", signature=signature)

Keras 3.0 的多后端支持

正如本指南开头所述,Keras 3.0 的多后端支持是其最强大的功能之一。MLflow 的跟踪在所有支持的后端之间都能无缝工作。

import os
import mlflow

# Switch backends easily - MLflow tracking code remains identical
os.environ["KERAS_BACKEND"] = "jax" # or "torch" or "tensorflow"

import keras
import mlflow.keras

# Enable autologging (works with any backend)
mlflow.tensorflow.autolog()

# Your training code is backend-agnostic
model = keras.Sequential(
[
keras.layers.Dense(64, activation="relu"),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

with mlflow.start_run():
model.fit(x_train, y_train, epochs=5, validation_split=0.2)

这种一致性意味着您可以

  • 尝试不同的后端,而无需更改您的 MLflow 跟踪代码。
  • 优化性能,选择最适合您硬件的后端(TPU 使用 JAX,研究灵活性使用 PyTorch,生产环境使用 TensorFlow)。
  • 跨不同计算环境维护可重现性

高级功能

使用 Keras 和 MLflow 进行超参数调优

将 Keras 与超参数调优库结合使用,同时在 MLflow 中跟踪所有内容。

import mlflow
import optuna
from sklearn.model_selection import train_test_split


def objective(trial):
# Suggest hyperparameters
lr = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [32, 64, 128])
hidden_units = trial.suggest_int("hidden_units", 64, 512)
dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)

with mlflow.start_run(nested=True):
# Log trial parameters
mlflow.log_params(
{
"learning_rate": lr,
"batch_size": batch_size,
"hidden_units": hidden_units,
"dropout_rate": dropout_rate,
}
)

# Create model with suggested parameters
model = keras.Sequential(
[
keras.layers.Dense(hidden_units, activation="relu", input_shape=(784,)),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(10, activation="softmax"),
]
)

model.compile(
optimizer=keras.optimizers.Adam(learning_rate=lr),
loss="categorical_crossentropy",
metrics=["accuracy"],
)

# Train model
history = model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=10,
validation_split=0.2,
verbose=0,
)

# Get validation accuracy
val_accuracy = max(history.history["val_accuracy"])
mlflow.log_metric("val_accuracy", val_accuracy)

return val_accuracy


# Run hyperparameter optimization
with mlflow.start_run():
mlflow.set_tag("optimization", "optuna")
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

# Log best parameters
mlflow.log_params(study.best_params)
mlflow.log_metric("best_val_accuracy", study.best_value)

自定义指标和伪影

记录特定于用例的自定义可视化和指标。

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report


def log_training_plots(history, run_id):
"""Log training history plots to MLflow."""

# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

ax1.plot(history.history["loss"], label="Training Loss")
ax1.plot(history.history["val_loss"], label="Validation Loss")
ax1.set_title("Model Loss")
ax1.set_xlabel("Epoch")
ax1.set_ylabel("Loss")
ax1.legend()

ax2.plot(history.history["accuracy"], label="Training Accuracy")
ax2.plot(history.history["val_accuracy"], label="Validation Accuracy")
ax2.set_title("Model Accuracy")
ax2.set_xlabel("Epoch")
ax2.set_ylabel("Accuracy")
ax2.legend()

plt.tight_layout()
plt.savefig("training_history.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("training_history.png")
plt.close()


def log_evaluation_metrics(model, x_test, y_test, class_names):
"""Log comprehensive evaluation metrics."""

# Get predictions
y_pred = model.predict(x_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)

# Confusion matrix
cm = confusion_matrix(y_true_classes, y_pred_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(
cm,
annot=True,
fmt="d",
cmap="Blues",
xticklabels=class_names,
yticklabels=class_names,
)
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.savefig("confusion_matrix.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("confusion_matrix.png")
plt.close()

# Classification report
report = classification_report(
y_true_classes, y_pred_classes, target_names=class_names, output_dict=True
)

# Log per-class metrics
for class_name in class_names:
if class_name in report:
mlflow.log_metrics(
{
f"{class_name}_precision": report[class_name]["precision"],
f"{class_name}_recall": report[class_name]["recall"],
f"{class_name}_f1": report[class_name]["f1-score"],
}
)


# Usage example
with mlflow.start_run():
# Train model
history = model.fit(
x_train, y_train, validation_data=(x_test, y_test), epochs=10, verbose=1
)

# Log comprehensive results
log_training_plots(history, mlflow.active_run().info.run_id)
log_evaluation_metrics(
model, x_test, y_test, class_names=[str(i) for i in range(10)]
)

结论

MLflow 与 Keras 的集成提供了一个全面的解决方案,用于在深度学习工作流程中进行实验跟踪和模型管理。无论您是选择简化的自动日志记录,还是选择更精细控制的手动日志记录,MLflow 都能捕获可重现的机器学习研究和生产部署所需的所有关键信息。

使用 MLflow 结合 Keras 的主要好处包括:

  • 无缝集成:一行代码即可实现自动日志记录设置,并进行全面跟踪。
  • 多后端支持:在 TensorFlow、JAX 和 PyTorch 后端之间保持跟踪的一致性。
  • 灵活的日志记录:在自动日志记录和手动日志记录方法之间进行选择。
  • 生产就绪:内置模型服务和部署功能。
  • 协作开发:通过 MLflow 直观的 UI 共享实验和模型。

无论您是进行研究实验还是构建生产 ML 系统,MLflow-Keras 集成都为有组织、可重现且可扩展的深度学习工作流程提供了基础。