Spark MLlib 与 MLflow
在本综合指南中,我们将引导您了解如何将 Spark MLlib 与 MLflow 结合使用,以进行实验跟踪、模型管理和生产部署。我们将涵盖基本的模型日志记录、管道跟踪和部署模式,这些模式将使您能够快速高效地进行分布式机器学习。
快速入门:基本模型日志记录
最简单的入门方法是将 Spark MLlib 模型直接记录到 MLflow 中
import mlflow
import mlflow.spark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("MLflowSparkExample").getOrCreate()
# Prepare training data
training = spark.createDataFrame(
[
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
],
["id", "text", "label"],
)
# Create ML Pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Train and log the model
with mlflow.start_run():
model = pipeline.fit(training)
# Log the entire pipeline
model_info = mlflow.spark.log_model(spark_model=model, name="spark-pipeline")
# Log parameters manually
mlflow.log_params(
{
"max_iter": lr.getMaxIter(),
"reg_param": lr.getRegParam(),
"num_features": hashingTF.getNumFeatures(),
}
)
print(f"Model logged with URI: {model_info.model_uri}")
这个简单的例子自动记录
- 完整的 Spark ML 管道,包含所有阶段
- 来自每个管道阶段的模型参数
- 以 Spark 原生和 PyFunc 格式训练的模型
模型格式和加载
- 原生 Spark 格式
- PyFunc 格式
- 格式转换
原生 Spark 格式保留了 Spark ML 管道的全部功能
# Load as native Spark model (requires Spark session)
spark_model = mlflow.spark.load_model(model_info.model_uri)
# Use for distributed batch scoring
test_data = spark.createDataFrame(
[(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")],
["id", "text"],
)
predictions = spark_model.transform(test_data)
predictions.show()
最适合:大规模批处理,现有 Spark 基础设施
PyFunc 格式允许在 Spark 环境之外进行部署
import pandas as pd
# Load as PyFunc model (no Spark session required)
pyfunc_model = mlflow.pyfunc.load_model(model_info.model_uri)
# Use with pandas DataFrame
test_data = pd.DataFrame(
{"text": ["spark machine learning", "hadoop distributed computing"]}
)
# Predictions work seamlessly
predictions = pyfunc_model.predict(test_data)
print(predictions)
最适合:REST API 部署,边缘计算,非 Spark 环境
自动转换过程
- 输入处理:PyFunc 自动将 pandas DataFrames 转换为 Spark DataFrames
- Spark 上下文:如果不存在,则创建一个本地 Spark 会话
- 输出处理:将 Spark ML 向量输出转换为数组以实现 pandas 兼容性
- 性能权衡:初始化开销 vs 部署灵活性
何时使用每种格式
- 原生 Spark:大规模批处理,现有 Spark 基础设施
- PyFunc:REST API 部署,边缘计算,非 Spark 环境
管道跟踪和管理
- 基本管道
- 超参数调优
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
# Load your dataset
data = spark.read.csv("path/to/dataset.csv", header=True, inferSchema=True)
with mlflow.start_run(run_name="Feature Pipeline"):
# Create feature engineering pipeline
feature_cols = ["age", "income", "credit_score"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
scaler = StandardScaler(inputCol="raw_features", outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100)
# Create complete pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])
# Train pipeline
model = pipeline.fit(data)
# Log pipeline parameters
mlflow.log_params(
{
"num_features": len(feature_cols),
"num_trees": rf.getNumTrees(),
"max_depth": rf.getMaxDepth(),
}
)
# Log the complete pipeline
mlflow.spark.log_model(spark_model=model, artifact_path="feature_pipeline")
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
with mlflow.start_run(run_name="Hyperparameter Tuning"):
# Create base pipeline
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Create parameter grid
param_grid = (
ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.1, 1.0])
.addGrid(lr.maxIter, [50, 100])
.build()
)
# Create cross-validator
evaluator = BinaryClassificationEvaluator(labelCol="label")
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
)
# Fit cross-validator
cv_model = cv.fit(train_data)
# Log best parameters
best_model = cv_model.bestModel
best_lr_stage = best_model.stages[-1]
mlflow.log_params(
{
"best_regParam": best_lr_stage.getRegParam(),
"best_maxIter": best_lr_stage.getMaxIter(),
}
)
# Evaluate on test set
test_predictions = cv_model.transform(test_data)
test_auc = evaluator.evaluate(test_predictions)
mlflow.log_metric("test_auc", test_auc)
# Log the best model
mlflow.spark.log_model(
spark_model=cv_model.bestModel, artifact_path="best_cv_model"
)
Spark 数据源自动记录
- 启用自动记录
- 设置要求
MLflow 提供 Spark 数据源信息的自动日志记录
import mlflow.spark
# Enable Spark datasource autologging
mlflow.spark.autolog()
# Now all datasource reads are automatically logged
with mlflow.start_run():
# These datasource operations are automatically tracked
raw_data = spark.read.parquet("s3://my-bucket/training-data/")
processed_data = spark.read.csv(
"hdfs://cluster/processed/features.csv", header=True
)
# Train your model - datasource info is logged automatically
model = pipeline.fit(processed_data)
# Model training and datasource information both captured
mlflow.spark.log_model(model, artifact_path="model_with_datasource_tracking")
要求
- Spark 版本:需要 Spark 3.0 或更高版本
- MLflow-Spark JAR:必须包含在 Spark 会话配置中
- 环境:不支持 Databricks 共享/无服务器集群
JAR 配置
from pyspark.sql import SparkSession
# Configure Spark session with MLflow JAR
spark = (
SparkSession.builder.appName("MLflowAutologgingApp")
.config("spark.jars.packages", "org.mlflow:mlflow-spark_2.12:2.16.2")
.getOrCreate()
)
记录的内容
- 路径信息:数据源的完整路径
- 格式详细信息:文件格式 (parquet, delta, csv 等)
- 版本信息:对于版本化的源,如 Delta Lake
模型签名和架构管理
- Spark ML 向量
- 手动签名
from mlflow.models import infer_signature
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import array_to_vector
# Create data with vector features
vector_data = spark.createDataFrame(
[([3.0, 4.0], 0), ([5.0, 6.0], 1)], ["features_array", "label"]
).select(array_to_vector("features_array").alias("features"), "label")
# Train model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(vector_data)
# Get predictions for signature
predictions = model.transform(vector_data)
# Infer signature automatically
signature = infer_signature(vector_data, predictions.select("prediction"))
with mlflow.start_run():
mlflow.spark.log_model(
spark_model=model,
artifact_path="vector_model",
signature=signature,
input_example=vector_data.limit(2).toPandas(),
)
from mlflow.types import DataType, Schema, ColSpec
from mlflow.types.schema import SparkMLVector
from mlflow.models.signature import ModelSignature
# Create detailed model signature
input_schema = Schema(
[
ColSpec(DataType.string, "text"),
ColSpec(DataType.double, "numeric_feature"),
ColSpec(SparkMLVector(), "vector_feature"),
]
)
output_schema = Schema([ColSpec(DataType.double, "prediction")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# Log model with explicit signature
with mlflow.start_run():
mlflow.spark.log_model(
spark_model=model, artifact_path="production_model", signature=signature
)
跨平台部署
- ONNX 转换
- 模型注册表
将 Spark MLlib 模型转换为 ONNX 格式以进行跨平台部署
# Note: This requires onnxmltools (Spark ML support is experimental)
# pip install onnxmltools
import onnxmltools
with mlflow.start_run(run_name="ONNX Conversion"):
# Train your Spark ML model
model = pipeline.fit(training_data)
# Log original Spark model
spark_model_info = mlflow.spark.log_model(
spark_model=model, artifact_path="spark_model"
)
try:
# Convert to ONNX using onnxmltools
# Note: Spark ML conversion is experimental and may have limitations
onnx_model = onnxmltools.convert_sparkml(
model, name="SparkMLPipeline", target_opset=None # Use default opset
)
# Save ONNX model as artifact
onnx_model_path = "model.onnx"
onnxmltools.utils.save_model(onnx_model, onnx_model_path)
mlflow.log_artifact(onnx_model_path)
mlflow.log_param("onnx_conversion_successful", True)
# Log ONNX model info
opset_version = onnx_model.opset_import[0].version
mlflow.log_param("onnx_opset_version", opset_version)
except Exception as e:
mlflow.log_param("onnx_conversion_error", str(e))
mlflow.log_param("onnx_conversion_successful", False)
# ONNX conversion for Spark ML is experimental and may not work
# for all model types. Consider using PyFunc format instead.
注意: Spark ML 到 ONNX 的转换在 onnxmltools 中是实验性的,可能不支持所有 Spark ML 运算符。 对于生产部署,请考虑使用 MLflow 的 PyFunc 格式以获得更广泛的兼容性。
from mlflow import MlflowClient
client = MlflowClient()
# Register model with production metadata
with mlflow.start_run():
# Train and evaluate model
model = pipeline.fit(train_data)
# Log model with registration
model_info = mlflow.spark.log_model(
spark_model=model,
artifact_path="production_candidate",
registered_model_name="CustomerSegmentationModel",
)
# Add production readiness tags
mlflow.set_tags(
{
"validation_passed": "true",
"deployment_target": "batch_scoring",
"model_type": "classification",
}
)
# Promote model through stages
model_version = client.get_latest_versions(
"CustomerSegmentationModel", stages=["None"]
)[0]
# Transition to Staging
client.transition_model_version_stage(
name="CustomerSegmentationModel", version=model_version.version, stage="Staging"
)
生产部署
- 批量推理
- 模型评估
def production_batch_scoring(model_uri, input_path, output_path):
"""Simple production batch scoring pipeline."""
with mlflow.start_run(run_name="Batch_Scoring"):
# Load production model
model = mlflow.spark.load_model(model_uri)
# Load input data
input_data = spark.read.parquet(input_path)
# Generate predictions
predictions = model.transform(input_data)
# Add metadata
predictions_with_metadata = predictions.withColumn(
"prediction_timestamp", F.current_timestamp()
)
# Write predictions
(predictions_with_metadata.write.mode("overwrite").parquet(output_path))
# Log job metrics
record_count = predictions.count()
mlflow.log_metrics({"records_processed": record_count, "job_success": 1})
return output_path
# Usage
production_batch_scoring(
model_uri="models:/CustomerSegmentationModel/Production",
input_path="s3://data-lake/daily-customers/",
output_path="s3://predictions/customer-segments/",
)
def evaluate_spark_model(model, test_data, model_name):
"""Evaluate Spark ML model with comprehensive metrics."""
with mlflow.start_run(run_name=f"Evaluation_{model_name}"):
# Generate predictions
predictions = model.transform(test_data)
# Calculate metrics based on problem type
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator,
)
# Binary classification metrics
binary_evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = binary_evaluator.evaluate(predictions)
# Multiclass metrics
mc_evaluator = MulticlassClassificationEvaluator(labelCol="label")
accuracy = mc_evaluator.evaluate(
predictions, {mc_evaluator.metricName: "accuracy"}
)
f1 = mc_evaluator.evaluate(predictions, {mc_evaluator.metricName: "f1"})
# Log evaluation metrics
mlflow.log_metrics({"auc": auc, "accuracy": accuracy, "f1_score": f1})
# Feature importance (if available)
if hasattr(model.stages[-1], "featureImportances"):
feature_importance = model.stages[-1].featureImportances.toArray()
# Log top 5 feature importances
for i, importance in enumerate(feature_importance[:5]):
mlflow.log_metric(f"feature_importance_{i}", importance)
return {"auc": auc, "accuracy": accuracy, "f1_score": f1}
# Usage
evaluation_results = evaluate_spark_model(model, test_data, "RandomForest")
错误处理和最佳实践
- 稳健的训练
- 故障排除
- 性能提示
def train_spark_model_with_error_handling(data_path, model_config):
"""Production-ready model training with error handling."""
with mlflow.start_run(run_name="Robust_Training"):
try:
# Load and validate data
data = spark.read.parquet(data_path)
record_count = data.count()
if record_count == 0:
raise ValueError("Input dataset is empty")
mlflow.log_metric("input_record_count", record_count)
# Create and train pipeline
pipeline = create_pipeline(model_config)
model = pipeline.fit(data)
# Validate model can make predictions
test_sample = data.limit(10)
predictions = model.transform(test_sample)
prediction_count = predictions.count()
if prediction_count != 10:
raise ValueError("Model validation failed")
# Log successful model
model_info = mlflow.spark.log_model(
spark_model=model, artifact_path="robust_model"
)
mlflow.log_param("training_status", "success")
return model_info
except Exception as e:
# Log error information
mlflow.log_param("training_status", "failed")
mlflow.log_param("error_message", str(e))
raise
def create_pipeline(config):
"""Create ML pipeline from configuration."""
# Simple pipeline creation logic
feature_cols = config.get("feature_columns", [])
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
algorithm = config.get("algorithm", "logistic_regression")
if algorithm == "logistic_regression":
classifier = LogisticRegression(featuresCol="features", labelCol="label")
elif algorithm == "random_forest":
classifier = RandomForestClassifier(featuresCol="features", labelCol="label")
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
return Pipeline(stages=[assembler, classifier])
常见问题和解决方案
序列化问题
# Test model serialization
def test_model_serialization(model):
try:
with tempfile.TemporaryDirectory() as temp_dir:
mlflow.spark.save_model(model, temp_dir)
loaded_model = mlflow.spark.load_model(temp_dir)
return True
except Exception as e:
print(f"Serialization failed: {e}")
return False
内存问题
# Configure Spark for large models
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Cache strategically
large_dataset.cache() # Only when reused multiple times
PyFunc 加载问题
# Ensure Spark session exists for PyFunc
from mlflow.utils._spark_utils import _get_active_spark_session
if _get_active_spark_session() is None:
spark = SparkSession.builder.appName("MLflowPyFunc").getOrCreate()
高效日志记录
def efficient_spark_ml_logging():
"""Configure efficient logging for Spark ML."""
with mlflow.start_run():
# Log parameters early (lightweight)
mlflow.log_params({"algorithm": "random_forest", "num_trees": 100})
# Train model
model = pipeline.fit(large_dataset)
# Log metrics before model (in case model logging fails)
metrics = {"accuracy": 0.95}
mlflow.log_metrics(metrics)
# Log model with minimal examples for large datasets
mlflow.spark.log_model(
spark_model=model,
artifact_path="efficient_model",
input_example=sample_data.limit(3).toPandas(), # Small sample only
)
Spark 配置
# Optimize Spark for MLflow operations
def configure_spark_for_mlflow():
"""Configure Spark session for optimal MLflow performance."""
spark_config = {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
}
for key, value in spark_config.items():
spark.conf.set(key, value)
结论
MLflow 的 Spark MLlib 集成为跟踪和管理分布式机器学习工作流程提供了一个全面的解决方案。 无论您是构建简单的分类模型还是复杂的多阶段管道,MLflow 都可以帮助您保持可重复性并高效地部署模型。
将 MLflow 与 Spark MLlib 结合使用的主要好处包括
- 完整的管道跟踪:自动记录包含所有参数和工件的多阶段 ML 管道
- 灵活的部署:部署为原生 Spark 模型以进行批处理,或部署为 PyFunc 包装器以实现通用兼容性
- 数据沿袭:通过 Spark 数据源自动记录自动跟踪数据源
- 跨平台支持:ONNX 转换支持跨不同环境的部署
- 生产就绪:模型注册表集成和强大的错误处理,适用于企业部署
本指南中显示的模式为构建可扩展的、可重复的分布式机器学习系统奠定了坚实的基础。 从基本的模型日志记录开始,以立即获得实验跟踪的好处,然后随着需求的增长采用高级功能,例如数据源自动记录和模型注册表集成。