数据集评估
数据集评估允许您在不重新运行模型的情况下,评估预先计算的预测结果上的模型性能。这对于评估大规模批量推理结果、历史预测,或者当您希望分离预测和评估阶段时特别有用。
快速入门:评估静态预测
最简单的数据集评估涉及一个包含预测和目标的DataFrame
import mlflow
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Generate sample data and train a model
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Generate predictions (this could be from a batch job, stored results, etc.)
predictions = model.predict(X_test)
prediction_probabilities = model.predict_proba(X_test)[:, 1]
# Create evaluation dataset with predictions already computed
eval_dataset = pd.DataFrame(
{
"prediction": predictions,
"prediction_proba": prediction_probabilities,
"target": y_test,
}
)
# Add original features for analysis (optional)
feature_names = [f"feature_{i}" for i in range(X_test.shape[1])]
for i, feature_name in enumerate(feature_names):
eval_dataset[feature_name] = X_test[:, i]
with mlflow.start_run():
# Evaluate static dataset - no model needed!
result = mlflow.evaluate(
data=eval_dataset,
predictions="prediction", # Column containing predictions
targets="target", # Column containing true labels
model_type="classifier",
)
print(f"Accuracy: {result.metrics['accuracy_score']:.3f}")
print(f"F1 Score: {result.metrics['f1_score']:.3f}")
print(f"ROC AUC: {result.metrics['roc_auc']:.3f}")
这种方法非常适用以下情况:
- 您拥有来自生产系统的批量预测结果
- 您希望评估历史预测
- 您正在比较同一模型不同版本的输出
- 您需要将计算密集型预测与评估分离
数据集管理
- MLflow PandasDataset
- 多输出模型
对于更结构化的数据集管理,请使用 MLflow 的 PandasDataset
import mlflow.data
# Create MLflow dataset with prediction column specified
dataset = mlflow.data.from_pandas(
eval_dataset,
predictions="prediction", # Specify prediction column
targets="target", # Specify target column
)
with mlflow.start_run():
# Log the dataset
mlflow.log_input(dataset, context="evaluation")
# Evaluate using the dataset (predictions=None since specified in dataset)
result = mlflow.evaluate(
data=dataset,
predictions=None, # Already specified in dataset creation
targets="target",
model_type="classifier",
)
print("Evaluation completed using MLflow PandasDataset")
当您的模型生成多个输出时,您可以评估不同的输出列
# Simulate multi-output model results
multi_output_data = pd.DataFrame(
{
"primary_prediction": predictions,
"confidence_score": prediction_probabilities,
"auxiliary_output": np.random.random(
len(predictions)
), # Additional model output
"target": y_test,
}
)
with mlflow.start_run():
# Evaluate primary prediction
result = mlflow.evaluate(
data=multi_output_data,
predictions="primary_prediction",
targets="target",
model_type="classifier",
)
# Access other outputs for custom analysis
confidence_scores = multi_output_data["confidence_score"]
auxiliary_outputs = multi_output_data["auxiliary_output"]
# Log additional analysis
mlflow.log_metrics(
{
"avg_confidence": confidence_scores.mean(),
"confidence_std": confidence_scores.std(),
"avg_auxiliary": auxiliary_outputs.mean(),
}
)
批量评估工作流
- 大规模批量结果
- 历史性能分析
- 比较性数据集评估
适用于生产批量推理结果
def evaluate_batch_predictions(batch_results_path, batch_size=10000):
"""Evaluate large batch prediction results efficiently."""
# Read batch results (could be from S3, database, etc.)
batch_df = pd.read_parquet(batch_results_path)
print(f"Evaluating {len(batch_df)} batch predictions")
with mlflow.start_run(run_name="Batch_Evaluation"):
# Log batch metadata
mlflow.log_params(
{
"batch_size": len(batch_df),
"batch_date": batch_df.get("prediction_date", "unknown").iloc[0]
if len(batch_df) > 0
else "unknown",
"data_source": batch_results_path,
}
)
# Evaluate full batch
result = mlflow.evaluate(
data=batch_df,
predictions="model_prediction",
targets="true_label",
model_type="classifier",
)
# Additional batch-specific analysis
if "prediction_timestamp" in batch_df.columns:
# Analyze performance over time
batch_df["hour"] = pd.to_datetime(batch_df["prediction_timestamp"]).dt.hour
hourly_accuracy = batch_df.groupby("hour").apply(
lambda x: (x["model_prediction"] == x["true_label"]).mean()
)
# Log time-based metrics
for hour, accuracy in hourly_accuracy.items():
mlflow.log_metric(f"accuracy_hour_{hour}", accuracy)
return result
# Usage
# result = evaluate_batch_predictions("s3://my-bucket/batch-predictions/2024-01-15.parquet")
分析模型性能随时间的变化趋势
def analyze_historical_performance(historical_data, time_column="prediction_date"):
"""Analyze model performance trends over time."""
historical_data[time_column] = pd.to_datetime(historical_data[time_column])
with mlflow.start_run(run_name="Historical_Performance_Analysis"):
# Overall historical evaluation
overall_result = mlflow.evaluate(
data=historical_data,
predictions="prediction",
targets="actual",
model_type="classifier",
)
# Time-based analysis
historical_data["month"] = historical_data[time_column].dt.to_period("M")
monthly_performance = []
for month in historical_data["month"].unique():
month_data = historical_data[historical_data["month"] == month]
if len(month_data) > 50: # Minimum samples for reliable metrics
with mlflow.start_run(run_name=f"Month_{month}", nested=True):
month_result = mlflow.evaluate(
data=month_data,
predictions="prediction",
targets="actual",
model_type="classifier",
)
monthly_performance.append(
{
"month": str(month),
"accuracy": month_result.metrics["accuracy_score"],
"f1": month_result.metrics["f1_score"],
"sample_count": len(month_data),
}
)
# Log trend analysis
if monthly_performance:
perf_df = pd.DataFrame(monthly_performance)
# Calculate trends
accuracy_trend = np.polyfit(range(len(perf_df)), perf_df["accuracy"], 1)[0]
f1_trend = np.polyfit(range(len(perf_df)), perf_df["f1"], 1)[0]
mlflow.log_metrics(
{
"accuracy_trend_slope": accuracy_trend,
"f1_trend_slope": f1_trend,
"performance_variance": perf_df["accuracy"].var(),
"months_analyzed": len(monthly_performance),
}
)
# Save trend data
perf_df.to_csv("monthly_performance.csv", index=False)
mlflow.log_artifact("monthly_performance.csv")
return overall_result, monthly_performance
# Usage example
# historical_result, trends = analyze_historical_performance(historical_predictions_df)
比较模型在不同数据集或数据切片上的性能
def compare_datasets(datasets_dict, model_type="classifier"):
"""Compare model performance across multiple datasets."""
comparison_results = {}
with mlflow.start_run(run_name="Dataset_Comparison"):
for dataset_name, dataset in datasets_dict.items():
with mlflow.start_run(run_name=f"Dataset_{dataset_name}", nested=True):
result = mlflow.evaluate(
data=dataset,
predictions="prediction",
targets="target",
model_type=model_type,
)
comparison_results[dataset_name] = result.metrics
# Log dataset characteristics
mlflow.log_params(
{
"dataset_name": dataset_name,
"dataset_size": len(dataset),
"positive_rate": dataset["target"].mean()
if model_type == "classifier"
else None,
}
)
# Log comparison metrics
if comparison_results:
accuracy_values = [
r.get("accuracy_score", 0) for r in comparison_results.values()
]
mlflow.log_metrics(
{
"max_accuracy": max(accuracy_values),
"min_accuracy": min(accuracy_values),
"accuracy_range": max(accuracy_values) - min(accuracy_values),
"datasets_compared": len(comparison_results),
}
)
return comparison_results
# Usage
datasets = {
"validation_set": validation_predictions_df,
"test_set": test_predictions_df,
"holdout_set": holdout_predictions_df,
}
comparison = compare_datasets(datasets)
处理大型数据集
- 分块处理
- 基于采样的评估
- 内存优化
适用于内存无法容纳的大型数据集
def evaluate_large_dataset_in_chunks(data_path, chunk_size=50000):
"""Evaluate very large datasets by processing in chunks."""
# Read data in chunks
chunk_results = []
total_samples = 0
with mlflow.start_run(run_name="Large_Dataset_Evaluation"):
for chunk_idx, chunk in enumerate(
pd.read_parquet(data_path, chunksize=chunk_size)
):
chunk_size_actual = len(chunk)
total_samples += chunk_size_actual
# Evaluate chunk
with mlflow.start_run(run_name=f"Chunk_{chunk_idx}", nested=True):
chunk_result = mlflow.evaluate(
data=chunk,
predictions="prediction",
targets="target",
model_type="classifier",
)
# Weight metrics by chunk size for aggregation
weighted_metrics = {
f"{k}_weighted": v * chunk_size_actual
for k, v in chunk_result.metrics.items()
if isinstance(v, (int, float))
}
chunk_results.append(
{
"chunk_idx": chunk_idx,
"chunk_size": chunk_size_actual,
"metrics": chunk_result.metrics,
"weighted_metrics": weighted_metrics,
}
)
mlflow.log_param("chunk_size", chunk_size_actual)
# Aggregate results across chunks
if chunk_results:
# Calculate weighted averages
total_weighted = {}
for chunk in chunk_results:
for metric, value in chunk["weighted_metrics"].items():
total_weighted[metric] = total_weighted.get(metric, 0) + value
# Log aggregated metrics
aggregated_metrics = {
k.replace("_weighted", "_aggregate"): v / total_samples
for k, v in total_weighted.items()
}
mlflow.log_metrics(aggregated_metrics)
mlflow.log_params(
{
"total_samples": total_samples,
"chunks_processed": len(chunk_results),
"avg_chunk_size": total_samples / len(chunk_results),
}
)
return chunk_results
# Usage
# results = evaluate_large_dataset_in_chunks("large_predictions.parquet")
对于超大型数据集,请使用统计采样
def evaluate_with_sampling(large_dataset, sample_size=10000, n_samples=5):
"""Evaluate large dataset using multiple random samples."""
sample_results = []
with mlflow.start_run(run_name="Sampling_Based_Evaluation"):
for sample_idx in range(n_samples):
# Create random sample
if len(large_dataset) > sample_size:
sample_data = large_dataset.sample(
n=sample_size, random_state=sample_idx
)
else:
sample_data = large_dataset.copy()
with mlflow.start_run(run_name=f"Sample_{sample_idx}", nested=True):
sample_result = mlflow.evaluate(
data=sample_data,
predictions="prediction",
targets="target",
model_type="classifier",
)
sample_results.append(sample_result.metrics)
mlflow.log_params(
{
"sample_idx": sample_idx,
"sample_size": len(sample_data),
"random_seed": sample_idx,
}
)
# Aggregate sample results
if sample_results:
# Calculate statistics across samples
metrics_df = pd.DataFrame(sample_results)
aggregated_stats = {}
for metric in metrics_df.columns:
if metrics_df[metric].dtype in ["float64", "int64"]:
aggregated_stats.update(
{
f"{metric}_mean": metrics_df[metric].mean(),
f"{metric}_std": metrics_df[metric].std(),
f"{metric}_min": metrics_df[metric].min(),
f"{metric}_max": metrics_df[metric].max(),
}
)
mlflow.log_metrics(aggregated_stats)
mlflow.log_params(
{
"sampling_strategy": "random",
"samples_taken": n_samples,
"target_sample_size": sample_size,
}
)
# Save detailed results
metrics_df.to_csv("sample_results.csv", index=False)
mlflow.log_artifact("sample_results.csv")
return sample_results, aggregated_stats
# Usage
# samples, stats = evaluate_with_sampling(very_large_dataset, sample_size=5000, n_samples=10)
内存高效评估的最佳实践
def memory_efficient_evaluation(large_dataset_path, chunk_size=10000):
"""Memory-efficient evaluation of large datasets."""
chunk_metrics = []
with mlflow.start_run(run_name="Memory_Efficient_Evaluation"):
for chunk in pd.read_parquet(large_dataset_path, chunksize=chunk_size):
# Process chunk
chunk_result = mlflow.evaluate(
data=chunk,
predictions="prediction",
targets="target",
model_type="classifier",
)
# Store only essential metrics
chunk_metrics.append(
{
"size": len(chunk),
"accuracy": chunk_result.metrics["accuracy_score"],
"f1_score": chunk_result.metrics["f1_score"],
}
)
# Clear chunk from memory
del chunk
del chunk_result
# Compute weighted averages
total_samples = sum(cm["size"] for cm in chunk_metrics)
weighted_accuracy = (
sum(cm["accuracy"] * cm["size"] for cm in chunk_metrics) / total_samples
)
weighted_f1 = (
sum(cm["f1_score"] * cm["size"] for cm in chunk_metrics) / total_samples
)
# Log final metrics
mlflow.log_metrics(
{
"final_weighted_accuracy": weighted_accuracy,
"final_weighted_f1": weighted_f1,
"total_samples_processed": total_samples,
"chunks_processed": len(chunk_metrics),
}
)
return weighted_accuracy, chunk_metrics
# Usage
# accuracy, metrics = memory_efficient_evaluation("very_large_predictions.parquet")
内存管理技巧
- 分块处理:对于大于1GB的数据集,请使用分块处理
- 数据类型:使用适当的数据类型(如int32而不是int64)以减少内存使用
- 垃圾回收:处理完成后显式删除大型变量
- 流式处理:尽可能以流式方式处理数据
主要用例和优势
MLflow 中的数据集评估在以下几种场景中特别有价值:
批量处理 - 非常适合评估来自生产系统的大规模批量预测结果,无需重新运行昂贵的推理。
历史分析 - 非常适合使用预先计算的预测和真实数据分析模型性能随时间的变化趋势。
模型比较 - 非常适合在同一数据集上比较不同模型版本的输出,无需重新训练或重新推理。
生产监控 - 对于评估传入批量预测的模型性能的自动化评估管道至关重要。
成本优化 - 通过将预测生成与性能评估分离,降低了计算成本,从而无需重新执行模型即可进行评估。
最佳实践
使用数据集评估时,请考虑以下最佳实践:
- 数据验证:始终验证预测和目标列是否包含预期的数据类型和范围
- 缺失值:在评估前适当处理缺失的预测或目标
- 内存管理:对于超大型数据集,使用分块处理或采样
- 元数据日志记录:记录数据集特征、处理参数和评估上下文
- 存储格式:对于大型预测数据集,使用Parquet等高效格式
结论
MLflow 中的数据集评估提供了强大的能力,用于评估预先计算的预测结果上的模型性能。这种方法对于需要将预测生成与性能评估分离的生产级ML系统至关重要。
数据集评估的主要优势包括:
- 灵活性:评估来自任何来源的预测,无需重新运行模型
- 效率:当预测结果已可用时,跳过昂贵的模型推理
- 规模:处理大规模批量预测和历史分析
- 集成:与生产预测管道无缝协作
无论您是在分析批量预测、进行历史性能研究,还是实施自动化评估管道,MLflow 的数据集评估功能都提供了大规模全面模型评估所需的工具。