跳到主要内容

数据集评估

数据集评估允许您在不重新运行模型的情况下,评估预先计算的预测结果上的模型性能。这对于评估大规模批量推理结果、历史预测,或者当您希望分离预测和评估阶段时特别有用。

快速入门:评估静态预测

最简单的数据集评估涉及一个包含预测和目标的DataFrame

import mlflow
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Generate sample data and train a model
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Generate predictions (this could be from a batch job, stored results, etc.)
predictions = model.predict(X_test)
prediction_probabilities = model.predict_proba(X_test)[:, 1]

# Create evaluation dataset with predictions already computed
eval_dataset = pd.DataFrame(
{
"prediction": predictions,
"prediction_proba": prediction_probabilities,
"target": y_test,
}
)

# Add original features for analysis (optional)
feature_names = [f"feature_{i}" for i in range(X_test.shape[1])]
for i, feature_name in enumerate(feature_names):
eval_dataset[feature_name] = X_test[:, i]

with mlflow.start_run():
# Evaluate static dataset - no model needed!
result = mlflow.evaluate(
data=eval_dataset,
predictions="prediction", # Column containing predictions
targets="target", # Column containing true labels
model_type="classifier",
)

print(f"Accuracy: {result.metrics['accuracy_score']:.3f}")
print(f"F1 Score: {result.metrics['f1_score']:.3f}")
print(f"ROC AUC: {result.metrics['roc_auc']:.3f}")

这种方法非常适用以下情况:

  • 您拥有来自生产系统的批量预测结果
  • 您希望评估历史预测
  • 您正在比较同一模型不同版本的输出
  • 您需要将计算密集型预测与评估分离

数据集管理

对于更结构化的数据集管理,请使用 MLflow 的 PandasDataset

import mlflow.data

# Create MLflow dataset with prediction column specified
dataset = mlflow.data.from_pandas(
eval_dataset,
predictions="prediction", # Specify prediction column
targets="target", # Specify target column
)

with mlflow.start_run():
# Log the dataset
mlflow.log_input(dataset, context="evaluation")

# Evaluate using the dataset (predictions=None since specified in dataset)
result = mlflow.evaluate(
data=dataset,
predictions=None, # Already specified in dataset creation
targets="target",
model_type="classifier",
)

print("Evaluation completed using MLflow PandasDataset")

批量评估工作流

适用于生产批量推理结果

def evaluate_batch_predictions(batch_results_path, batch_size=10000):
"""Evaluate large batch prediction results efficiently."""

# Read batch results (could be from S3, database, etc.)
batch_df = pd.read_parquet(batch_results_path)

print(f"Evaluating {len(batch_df)} batch predictions")

with mlflow.start_run(run_name="Batch_Evaluation"):
# Log batch metadata
mlflow.log_params(
{
"batch_size": len(batch_df),
"batch_date": batch_df.get("prediction_date", "unknown").iloc[0]
if len(batch_df) > 0
else "unknown",
"data_source": batch_results_path,
}
)

# Evaluate full batch
result = mlflow.evaluate(
data=batch_df,
predictions="model_prediction",
targets="true_label",
model_type="classifier",
)

# Additional batch-specific analysis
if "prediction_timestamp" in batch_df.columns:
# Analyze performance over time
batch_df["hour"] = pd.to_datetime(batch_df["prediction_timestamp"]).dt.hour
hourly_accuracy = batch_df.groupby("hour").apply(
lambda x: (x["model_prediction"] == x["true_label"]).mean()
)

# Log time-based metrics
for hour, accuracy in hourly_accuracy.items():
mlflow.log_metric(f"accuracy_hour_{hour}", accuracy)

return result


# Usage
# result = evaluate_batch_predictions("s3://my-bucket/batch-predictions/2024-01-15.parquet")

处理大型数据集

适用于内存无法容纳的大型数据集

def evaluate_large_dataset_in_chunks(data_path, chunk_size=50000):
"""Evaluate very large datasets by processing in chunks."""

# Read data in chunks
chunk_results = []
total_samples = 0

with mlflow.start_run(run_name="Large_Dataset_Evaluation"):
for chunk_idx, chunk in enumerate(
pd.read_parquet(data_path, chunksize=chunk_size)
):
chunk_size_actual = len(chunk)
total_samples += chunk_size_actual

# Evaluate chunk
with mlflow.start_run(run_name=f"Chunk_{chunk_idx}", nested=True):
chunk_result = mlflow.evaluate(
data=chunk,
predictions="prediction",
targets="target",
model_type="classifier",
)

# Weight metrics by chunk size for aggregation
weighted_metrics = {
f"{k}_weighted": v * chunk_size_actual
for k, v in chunk_result.metrics.items()
if isinstance(v, (int, float))
}

chunk_results.append(
{
"chunk_idx": chunk_idx,
"chunk_size": chunk_size_actual,
"metrics": chunk_result.metrics,
"weighted_metrics": weighted_metrics,
}
)

mlflow.log_param("chunk_size", chunk_size_actual)

# Aggregate results across chunks
if chunk_results:
# Calculate weighted averages
total_weighted = {}
for chunk in chunk_results:
for metric, value in chunk["weighted_metrics"].items():
total_weighted[metric] = total_weighted.get(metric, 0) + value

# Log aggregated metrics
aggregated_metrics = {
k.replace("_weighted", "_aggregate"): v / total_samples
for k, v in total_weighted.items()
}

mlflow.log_metrics(aggregated_metrics)
mlflow.log_params(
{
"total_samples": total_samples,
"chunks_processed": len(chunk_results),
"avg_chunk_size": total_samples / len(chunk_results),
}
)

return chunk_results


# Usage
# results = evaluate_large_dataset_in_chunks("large_predictions.parquet")

主要用例和优势

MLflow 中的数据集评估在以下几种场景中特别有价值:

批量处理 - 非常适合评估来自生产系统的大规模批量预测结果,无需重新运行昂贵的推理。

历史分析 - 非常适合使用预先计算的预测和真实数据分析模型性能随时间的变化趋势。

模型比较 - 非常适合在同一数据集上比较不同模型版本的输出,无需重新训练或重新推理。

生产监控 - 对于评估传入批量预测的模型性能的自动化评估管道至关重要。

成本优化 - 通过将预测生成与性能评估分离,降低了计算成本,从而无需重新执行模型即可进行评估。

最佳实践

使用数据集评估时,请考虑以下最佳实践:

  • 数据验证:始终验证预测和目标列是否包含预期的数据类型和范围
  • 缺失值:在评估前适当处理缺失的预测或目标
  • 内存管理:对于超大型数据集,使用分块处理或采样
  • 元数据日志记录:记录数据集特征、处理参数和评估上下文
  • 存储格式:对于大型预测数据集,使用Parquet等高效格式

结论

MLflow 中的数据集评估提供了强大的能力,用于评估预先计算的预测结果上的模型性能。这种方法对于需要将预测生成与性能评估分离的生产级ML系统至关重要。

数据集评估的主要优势包括:

  • 灵活性:评估来自任何来源的预测,无需重新运行模型
  • 效率:当预测结果已可用时,跳过昂贵的模型推理
  • 规模:处理大规模批量预测和历史分析
  • 集成:与生产预测管道无缝协作

无论您是在分析批量预测、进行历史性能研究,还是实施自动化评估管道,MLflow 的数据集评估功能都提供了大规模全面模型评估所需的工具。