函数评估
函数评估允许您直接评估 Python 函数,无需将模型记录到 MLflow 的额外开销。这种轻量级方法非常适合快速原型设计、测试自定义预测逻辑以及评估可能不符合传统模型范式的复杂业务规则。
快速入门:评估简单函数
最直接的函数评估涉及一个可调用对象,它接收数据并返回预测结果
import mlflow
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# Train a model (we'll use this in our function)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Define a prediction function
def predict_function(input_data):
"""Custom prediction function that can include business logic."""
# Get base model predictions
base_predictions = model.predict(input_data)
# Add custom business logic
# Example: Override predictions for specific conditions
feature_sum = input_data.sum(axis=1)
high_feature_mask = feature_sum > feature_sum.quantile(0.9)
# Custom rule: high feature sum values are always class 1
final_predictions = base_predictions.copy()
final_predictions[high_feature_mask] = 1
return final_predictions
# Create evaluation dataset
eval_data = pd.DataFrame(X_test)
eval_data["target"] = y_test
with mlflow.start_run():
# Evaluate function directly - no model logging needed!
result = mlflow.evaluate(
predict_function, # Function to evaluate
eval_data, # Evaluation data
targets="target", # Target column
model_type="classifier", # Task type
)
print(f"Function Accuracy: {result.metrics['accuracy_score']:.3f}")
print(f"Function F1 Score: {result.metrics['f1_score']:.3f}")
这种方法在以下情况下是理想的:
- 您希望快速测试函数而无需模型持久化
- 您的预测逻辑包含复杂的业务规则
- 您正在原型设计自定义算法或集成方法
- 您需要评估预处理 + 预测管道
高级函数模式
- 管道函数
- 集成函数
- 业务逻辑集成
评估完整的数据处理和预测管道
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
def complete_pipeline_function(input_data):
"""Function that includes preprocessing, feature engineering, and prediction."""
# Preprocessing
scaler = StandardScaler()
scaled_data = scaler.fit_transform(input_data)
# Feature engineering
pca = PCA(n_components=10)
pca_features = pca.fit_transform(scaled_data)
# Create additional features
feature_interactions = pca_features[:, 0] * pca_features[:, 1]
feature_ratios = pca_features[:, 0] / (pca_features[:, 1] + 1e-8)
# Combine features
enhanced_features = np.column_stack(
[
pca_features,
feature_interactions.reshape(-1, 1),
feature_ratios.reshape(-1, 1),
]
)
# Model prediction
model = RandomForestClassifier(n_estimators=50, random_state=42)
# Note: In practice, you'd fit this on training data separately
model.fit(enhanced_features, np.random.choice([0, 1], size=len(enhanced_features)))
predictions = model.predict(enhanced_features)
return predictions
with mlflow.start_run(run_name="Complete_Pipeline_Function"):
# Log pipeline configuration
mlflow.log_params(
{
"preprocessing": "StandardScaler + PCA",
"pca_components": 10,
"feature_engineering": "interactions + ratios",
"model": "RandomForest",
}
)
result = mlflow.evaluate(
complete_pipeline_function, eval_data, targets="target", model_type="classifier"
)
print(f"Pipeline Function Performance: {result.metrics['accuracy_score']:.3f}")
评估结合多个模型的集成方法
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
def ensemble_prediction_function(input_data):
"""Ensemble function combining multiple base models."""
# Initialize base models
models = {
"rf": RandomForestClassifier(n_estimators=50, random_state=42),
"lr": LogisticRegression(random_state=42),
"svm": SVC(probability=True, random_state=42),
"nb": GaussianNB(),
}
# Train base models (in practice, these would be pre-trained)
predictions = {}
probabilities = {}
for name, model in models.items():
# Note: This is for demonstration - in practice, models would be pre-trained
model.fit(X_train, y_train)
predictions[name] = model.predict(input_data)
if hasattr(model, "predict_proba"):
probabilities[name] = model.predict_proba(input_data)[:, 1]
else:
probabilities[name] = predictions[name].astype(float)
# Ensemble strategies
# 1. Majority voting
pred_array = np.array(list(predictions.values()))
majority_vote = np.apply_along_axis(
lambda x: np.bincount(x).argmax(), axis=0, arr=pred_array
)
# 2. Weighted average of probabilities
prob_array = np.array(list(probabilities.values()))
weights = np.array([0.4, 0.3, 0.2, 0.1]) # RF, LR, SVM, NB
weighted_probs = np.average(prob_array, axis=0, weights=weights)
weighted_predictions = (weighted_probs > 0.5).astype(int)
# 3. Dynamic ensemble based on confidence
confidence_scores = np.std(prob_array, axis=0)
high_confidence_mask = confidence_scores < 0.2
final_predictions = majority_vote.copy()
final_predictions[high_confidence_mask] = weighted_predictions[high_confidence_mask]
return final_predictions
with mlflow.start_run(run_name="Ensemble_Function_Evaluation"):
# Log ensemble configuration
mlflow.log_params(
{
"base_models": "RF, LR, SVM, NB",
"ensemble_strategy": "dynamic_confidence_based",
"confidence_threshold": 0.2,
"weights": [0.4, 0.3, 0.2, 0.1],
}
)
result = mlflow.evaluate(
ensemble_prediction_function,
eval_data,
targets="target",
model_type="classifier",
)
print(f"Ensemble Function Accuracy: {result.metrics['accuracy_score']:.3f}")
评估结合机器学习预测和业务规则的函数
def business_rule_function(input_data):
"""Function combining ML predictions with business rules."""
# Get base ML predictions
ml_model = RandomForestClassifier(n_estimators=100, random_state=42)
ml_model.fit(X_train, y_train)
ml_predictions = ml_model.predict(input_data)
ml_probabilities = ml_model.predict_proba(input_data)[:, 1]
# Business rules (example domain: loan approval)
feature_names = [f"feature_{i}" for i in range(input_data.shape[1])]
# Rule 1: High-risk features override ML prediction
high_risk_mask = input_data[:, 0] < -2 # Example: low credit score
# Rule 2: Low confidence ML predictions get conservative treatment
low_confidence_mask = np.abs(ml_probabilities - 0.5) < 0.1
# Rule 3: Combination rules for edge cases
edge_case_mask = (input_data[:, 1] > 2) & (input_data[:, 2] < -1)
# Apply business logic
final_predictions = ml_predictions.copy()
# Override high-risk cases
final_predictions[high_risk_mask] = 0 # Reject
# Conservative approach for low confidence
final_predictions[low_confidence_mask & (ml_probabilities < 0.6)] = 0
# Special handling for edge cases
final_predictions[edge_case_mask] = 1 # Approve with conditions
return final_predictions
with mlflow.start_run(run_name="Business_Rule_Function"):
# Log business rule configuration
mlflow.log_params(
{
"base_model": "RandomForest",
"business_rules": "high_risk_override, confidence_threshold, edge_cases",
"confidence_threshold": 0.6,
"high_risk_feature": "feature_0 < -2",
}
)
result = mlflow.evaluate(
business_rule_function, eval_data, targets="target", model_type="classifier"
)
# Calculate rule impact
ml_only_predictions = (
RandomForestClassifier(n_estimators=100, random_state=42)
.fit(X_train, y_train)
.predict(X_test)
)
rule_based_predictions = business_rule_function(X_test)
rule_changes = np.sum(ml_only_predictions != rule_based_predictions)
change_rate = rule_changes / len(ml_only_predictions)
mlflow.log_metrics(
{
"rule_changes": rule_changes,
"rule_change_rate": change_rate,
"ml_only_accuracy": (ml_only_predictions == y_test).mean(),
}
)
print(f"Business Rule Function Accuracy: {result.metrics['accuracy_score']:.3f}")
print(f"Rule Changes: {rule_changes} ({change_rate:.1%})")
函数测试与验证
- 参数化测试
- 生产监控
使用不同的参数配置测试函数
def create_parameterized_function(
threshold=0.5, use_scaling=True, feature_selection=None
):
"""Factory function that creates prediction functions with different parameters."""
def parameterized_prediction_function(input_data):
processed_data = input_data.copy()
# Optional scaling
if use_scaling:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
processed_data = scaler.fit_transform(processed_data)
# Optional feature selection
if feature_selection:
n_features = min(feature_selection, processed_data.shape[1])
processed_data = processed_data[:, :n_features]
# Model prediction
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(
processed_data if use_scaling or feature_selection else X_train, y_train
)
probabilities = model.predict_proba(processed_data)[:, 1]
predictions = (probabilities > threshold).astype(int)
return predictions
return parameterized_prediction_function
# Test different parameter combinations
parameter_configs = [
{"threshold": 0.3, "use_scaling": True, "feature_selection": None},
{"threshold": 0.5, "use_scaling": True, "feature_selection": 10},
{"threshold": 0.7, "use_scaling": False, "feature_selection": None},
{"threshold": 0.5, "use_scaling": False, "feature_selection": 15},
]
results = {}
for i, config in enumerate(parameter_configs):
with mlflow.start_run(run_name=f"Param_Config_{i+1}"):
# Log configuration
mlflow.log_params(config)
# Create and evaluate function
param_function = create_parameterized_function(**config)
result = mlflow.evaluate(
param_function, eval_data, targets="target", model_type="classifier"
)
results[f"config_{i+1}"] = {
"config": config,
"accuracy": result.metrics["accuracy_score"],
"f1_score": result.metrics["f1_score"],
}
print(f"Config {i+1}: Accuracy = {result.metrics['accuracy_score']:.3f}")
# Find best configuration
best_config = max(results.keys(), key=lambda k: results[k]["accuracy"])
print(f"Best configuration: {results[best_config]['config']}")
print(f"Best accuracy: {results[best_config]['accuracy']:.3f}")
为生产函数创建监控包装器
import time
def create_production_function_monitor(prediction_function):
"""Create monitoring wrapper for production functions."""
def monitored_function(input_data):
"""Wrapper that adds monitoring and health checks."""
start_time = time.time()
try:
# Input validation
if input_data is None or len(input_data) == 0:
raise ValueError("Empty input data")
# Health checks
if np.isnan(input_data).all():
raise ValueError("All input values are NaN")
if np.isinf(input_data).any():
raise ValueError("Input contains infinite values")
# Make prediction
predictions = prediction_function(input_data)
# Output validation
if predictions is None or len(predictions) == 0:
raise ValueError("Function returned empty predictions")
if len(predictions) != len(input_data):
raise ValueError(
f"Prediction count mismatch: {len(predictions)} vs {len(input_data)}"
)
# Log successful execution
execution_time = time.time() - start_time
with mlflow.start_run(run_name="Function_Health_Check"):
mlflow.log_metrics(
{
"execution_time": execution_time,
"input_samples": len(input_data),
"predictions_generated": len(predictions),
"nan_inputs": np.isnan(input_data).sum(),
"success": 1,
}
)
mlflow.log_params(
{"function_status": "healthy", "timestamp": time.time()}
)
return predictions
except Exception as e:
# Log error
execution_time = time.time() - start_time
with mlflow.start_run(run_name="Function_Error"):
mlflow.log_metrics({"execution_time": execution_time, "success": 0})
mlflow.log_params(
{
"error": str(e),
"function_status": "error",
"timestamp": time.time(),
}
)
raise
return monitored_function
# Usage
monitored_function = create_production_function_monitor(predict_function)
# Evaluate monitored function
with mlflow.start_run(run_name="Production_Function_Evaluation"):
result = mlflow.evaluate(
monitored_function, eval_data, targets="target", model_type="classifier"
)
关键用例和优势
MLflow 中的函数评估在以下几种场景中尤其有价值:
快速原型设计 - 非常适合立即测试预测逻辑,无需模型持久化的开销。开发者可以快速迭代算法并即时查看结果。
自定义业务逻辑 - 非常适合评估将机器学习预测与领域特定业务规则、法规要求或操作约束相结合的函数。
集成方法 - 非常适用于测试可能不符合标准模型框架的自定义集成方法,包括动态投票策略和基于置信度的组合。
管道开发 - 支持评估完整的数据处理管道,包括在一个函数中进行预处理、特征工程和预测。
最佳实践
在使用函数评估时,请考虑以下最佳实践:
- 无状态函数:尽可能将函数设计为无状态,以确保结果的可重现性
- 参数日志记录:始终记录函数参数和配置以实现可重现性
- 输入验证:在生产函数中包含输入验证和错误处理
- 性能监控:跟踪生产函数的执行时间和资源使用情况
- 版本控制:使用 MLflow 的标签和命名约定来跟踪函数版本
结论
MLflow 中的函数评估提供了一种轻量、灵活的方法来评估预测函数,而无需模型持久化的额外开销。此功能对于快速原型设计、测试复杂业务逻辑以及评估不符合传统模型范式的自定义算法至关重要。
函数评估的主要优势包括:
- 快速原型设计:无需模型日志记录即可立即测试预测逻辑
- 业务逻辑集成:评估将机器学习与业务规则相结合的函数
- 自定义算法:评估非标准预测方法和集成方法
- 开发工作流:与迭代开发过程无缝集成
- 性能测试:对函数性能进行基准测试和优化
无论您是在开发自定义集成方法、将业务规则与机器学习预测集成,还是快速原型设计新方法,MLflow 的函数评估都能提供有效的模型开发和测试所需的灵活性和洞察力。
函数评估的轻量级特性使其非常适合探索性分析、A/B 测试不同方法以及在提交完整模型部署工作流之前验证自定义预测逻辑。