跳到主要内容

数据集评估

数据集评估允许您在无需重新运行模型的情况下,对预先计算的预测结果进行模型性能评估。这对于评估大规模批量推理结果、历史预测,或者当您希望将预测与评估阶段分开时特别有用。

快速入门:评估静态预测

最简单的数据集评估涉及一个包含预测和目标的 DataFrame

import mlflow
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Generate sample data and train a model
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Generate predictions (this could be from a batch job, stored results, etc.)
predictions = model.predict(X_test)
prediction_probabilities = model.predict_proba(X_test)[:, 1]

# Create evaluation dataset with predictions already computed
eval_dataset = pd.DataFrame(
{
"prediction": predictions,
"prediction_proba": prediction_probabilities,
"target": y_test,
}
)

# Add original features for analysis (optional)
feature_names = [f"feature_{i}" for i in range(X_test.shape[1])]
for i, feature_name in enumerate(feature_names):
eval_dataset[feature_name] = X_test[:, i]

with mlflow.start_run():
# Evaluate static dataset - no model needed!
result = mlflow.evaluate(
data=eval_dataset,
predictions="prediction", # Column containing predictions
targets="target", # Column containing true labels
model_type="classifier",
)

print(f"Accuracy: {result.metrics['accuracy_score']:.3f}")
print(f"F1 Score: {result.metrics['f1_score']:.3f}")
print(f"ROC AUC: {result.metrics['roc_auc']:.3f}")

这种方法在以下情况中非常适用:

  • 您拥有来自生产系统的批量预测结果
  • 您希望评估历史预测
  • 您正在比较同一模型不同版本的输出
  • 您需要将计算密集型的预测与评估分离开

数据集管理

为了进行更结构化的数据集管理,请使用 MLflow 的 PandasDataset

import mlflow.data

# Create MLflow dataset with prediction column specified
dataset = mlflow.data.from_pandas(
eval_dataset,
predictions="prediction", # Specify prediction column
targets="target", # Specify target column
)

with mlflow.start_run():
# Log the dataset
mlflow.log_input(dataset, context="evaluation")

# Evaluate using the dataset (predictions=None since specified in dataset)
result = mlflow.evaluate(
data=dataset,
predictions=None, # Already specified in dataset creation
targets="target",
model_type="classifier",
)

print("Evaluation completed using MLflow PandasDataset")

批量评估工作流

用于生产环境的批量推理结果

def evaluate_batch_predictions(batch_results_path, batch_size=10000):
"""Evaluate large batch prediction results efficiently."""

# Read batch results (could be from S3, database, etc.)
batch_df = pd.read_parquet(batch_results_path)

print(f"Evaluating {len(batch_df)} batch predictions")

with mlflow.start_run(run_name="Batch_Evaluation"):
# Log batch metadata
mlflow.log_params(
{
"batch_size": len(batch_df),
"batch_date": batch_df.get("prediction_date", "unknown").iloc[0]
if len(batch_df) > 0
else "unknown",
"data_source": batch_results_path,
}
)

# Evaluate full batch
result = mlflow.evaluate(
data=batch_df,
predictions="model_prediction",
targets="true_label",
model_type="classifier",
)

# Additional batch-specific analysis
if "prediction_timestamp" in batch_df.columns:
# Analyze performance over time
batch_df["hour"] = pd.to_datetime(batch_df["prediction_timestamp"]).dt.hour
hourly_accuracy = batch_df.groupby("hour").apply(
lambda x: (x["model_prediction"] == x["true_label"]).mean()
)

# Log time-based metrics
for hour, accuracy in hourly_accuracy.items():
mlflow.log_metric(f"accuracy_hour_{hour}", accuracy)

return result


# Usage
# result = evaluate_batch_predictions("s3://my-bucket/batch-predictions/2024-01-15.parquet")

处理大型数据集

适用于大到无法装入内存的数据集

def evaluate_large_dataset_in_chunks(data_path, chunk_size=50000):
"""Evaluate very large datasets by processing in chunks."""

# Read data in chunks
chunk_results = []
total_samples = 0

with mlflow.start_run(run_name="Large_Dataset_Evaluation"):
for chunk_idx, chunk in enumerate(
pd.read_parquet(data_path, chunksize=chunk_size)
):
chunk_size_actual = len(chunk)
total_samples += chunk_size_actual

# Evaluate chunk
with mlflow.start_run(run_name=f"Chunk_{chunk_idx}", nested=True):
chunk_result = mlflow.evaluate(
data=chunk,
predictions="prediction",
targets="target",
model_type="classifier",
)

# Weight metrics by chunk size for aggregation
weighted_metrics = {
f"{k}_weighted": v * chunk_size_actual
for k, v in chunk_result.metrics.items()
if isinstance(v, (int, float))
}

chunk_results.append(
{
"chunk_idx": chunk_idx,
"chunk_size": chunk_size_actual,
"metrics": chunk_result.metrics,
"weighted_metrics": weighted_metrics,
}
)

mlflow.log_param("chunk_size", chunk_size_actual)

# Aggregate results across chunks
if chunk_results:
# Calculate weighted averages
total_weighted = {}
for chunk in chunk_results:
for metric, value in chunk["weighted_metrics"].items():
total_weighted[metric] = total_weighted.get(metric, 0) + value

# Log aggregated metrics
aggregated_metrics = {
k.replace("_weighted", "_aggregate"): v / total_samples
for k, v in total_weighted.items()
}

mlflow.log_metrics(aggregated_metrics)
mlflow.log_params(
{
"total_samples": total_samples,
"chunks_processed": len(chunk_results),
"avg_chunk_size": total_samples / len(chunk_results),
}
)

return chunk_results


# Usage
# results = evaluate_large_dataset_in_chunks("large_predictions.parquet")

关键用例与优势

MLflow 中的数据集评估在以下几种场景中特别有价值:

批量处理 - 非常适合评估来自生产系统的大规模批量预测结果,而无需重新运行昂贵的推理过程。

历史分析 - 理想选择,可利用先前计算的预测和真实标签数据来分析模型性能随时间变化的趋势。

模型比较 - 极佳工具,用于比较不同模型版本在同一数据集上的输出,而无需重新训练或重新推理。

生产监控 - 对于评估新输入批量预测的模型性能的自动化评估流水线至关重要。

成本优化 - 通过将预测生成与性能评估分离开来降低计算成本,允许在不重新执行模型的情况下进行评估。

最佳实践

使用数据集评估时,请考虑以下最佳实践:

  • 数据验证:始终验证预测和目标列是否包含预期的数据类型和范围
  • 缺失值:在评估前妥善处理缺失的预测或目标
  • 内存管理:对于非常大的数据集,使用分块处理或采样
  • 元数据记录:记录数据集特征、处理参数和评估上下文
  • 存储格式:对大型预测数据集使用高效格式,如 Parquet

结论

MLflow 中的数据集评估功能为评估预先计算的预测结果提供了强大的能力。对于需要将预测生成与性能评估分开的生产级机器学习系统而言,这种方法至关重要。

数据集评估的主要优势包括:

  • 灵活性:无需重新运行模型即可评估来自任何来源的预测
  • 效率:当预测已可用时,跳过昂贵的模型推理过程
  • 可扩展性:处理大规模批量预测和历史分析
  • 集成性:与生产环境的预测流水线无缝协作

无论您是在分析批量预测、进行历史性能研究,还是实施自动化评估流水线,MLflow 的数据集评估功能都为您提供了在规模化场景下进行全面模型评估所需的工具。