跳到主要内容

Spark MLlib 与 MLflow

在本综合指南中,我们将引导您了解如何将 Spark MLlib 与 MLflow 结合使用,以进行实验跟踪、模型管理和生产部署。我们将涵盖基本的模型日志记录、管道跟踪和部署模式,这些模式将使您能够快速高效地进行分布式机器学习。

快速入门:基本模型日志记录

最简单的入门方法是将 Spark MLlib 模型直接记录到 MLflow 中

import mlflow
import mlflow.spark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MLflowSparkExample").getOrCreate()

# Prepare training data
training = spark.createDataFrame(
[
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
],
["id", "text", "label"],
)

# Create ML Pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Train and log the model
with mlflow.start_run():
model = pipeline.fit(training)

# Log the entire pipeline
model_info = mlflow.spark.log_model(spark_model=model, name="spark-pipeline")

# Log parameters manually
mlflow.log_params(
{
"max_iter": lr.getMaxIter(),
"reg_param": lr.getRegParam(),
"num_features": hashingTF.getNumFeatures(),
}
)

print(f"Model logged with URI: {model_info.model_uri}")

这个简单的例子自动记录

  • 完整的 Spark ML 管道,包含所有阶段
  • 来自每个管道阶段的模型参数
  • 以 Spark 原生和 PyFunc 格式训练的模型

模型格式和加载

原生 Spark 格式保留了 Spark ML 管道的全部功能

# Load as native Spark model (requires Spark session)
spark_model = mlflow.spark.load_model(model_info.model_uri)

# Use for distributed batch scoring
test_data = spark.createDataFrame(
[(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")],
["id", "text"],
)

predictions = spark_model.transform(test_data)
predictions.show()

最适合:大规模批处理,现有 Spark 基础设施

管道跟踪和管理

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier

# Load your dataset
data = spark.read.csv("path/to/dataset.csv", header=True, inferSchema=True)

with mlflow.start_run(run_name="Feature Pipeline"):
# Create feature engineering pipeline
feature_cols = ["age", "income", "credit_score"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
scaler = StandardScaler(inputCol="raw_features", outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100)

# Create complete pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Train pipeline
model = pipeline.fit(data)

# Log pipeline parameters
mlflow.log_params(
{
"num_features": len(feature_cols),
"num_trees": rf.getNumTrees(),
"max_depth": rf.getMaxDepth(),
}
)

# Log the complete pipeline
mlflow.spark.log_model(spark_model=model, artifact_path="feature_pipeline")

Spark 数据源自动记录

MLflow 提供 Spark 数据源信息的自动日志记录

import mlflow.spark

# Enable Spark datasource autologging
mlflow.spark.autolog()

# Now all datasource reads are automatically logged
with mlflow.start_run():
# These datasource operations are automatically tracked
raw_data = spark.read.parquet("s3://my-bucket/training-data/")
processed_data = spark.read.csv(
"hdfs://cluster/processed/features.csv", header=True
)

# Train your model - datasource info is logged automatically
model = pipeline.fit(processed_data)

# Model training and datasource information both captured
mlflow.spark.log_model(model, artifact_path="model_with_datasource_tracking")

模型签名和架构管理

from mlflow.models import infer_signature
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import array_to_vector

# Create data with vector features
vector_data = spark.createDataFrame(
[([3.0, 4.0], 0), ([5.0, 6.0], 1)], ["features_array", "label"]
).select(array_to_vector("features_array").alias("features"), "label")

# Train model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(vector_data)

# Get predictions for signature
predictions = model.transform(vector_data)

# Infer signature automatically
signature = infer_signature(vector_data, predictions.select("prediction"))

with mlflow.start_run():
mlflow.spark.log_model(
spark_model=model,
artifact_path="vector_model",
signature=signature,
input_example=vector_data.limit(2).toPandas(),
)

跨平台部署

将 Spark MLlib 模型转换为 ONNX 格式以进行跨平台部署

# Note: This requires onnxmltools (Spark ML support is experimental)
# pip install onnxmltools

import onnxmltools

with mlflow.start_run(run_name="ONNX Conversion"):
# Train your Spark ML model
model = pipeline.fit(training_data)

# Log original Spark model
spark_model_info = mlflow.spark.log_model(
spark_model=model, artifact_path="spark_model"
)

try:
# Convert to ONNX using onnxmltools
# Note: Spark ML conversion is experimental and may have limitations
onnx_model = onnxmltools.convert_sparkml(
model, name="SparkMLPipeline", target_opset=None # Use default opset
)

# Save ONNX model as artifact
onnx_model_path = "model.onnx"
onnxmltools.utils.save_model(onnx_model, onnx_model_path)

mlflow.log_artifact(onnx_model_path)
mlflow.log_param("onnx_conversion_successful", True)

# Log ONNX model info
opset_version = onnx_model.opset_import[0].version
mlflow.log_param("onnx_opset_version", opset_version)

except Exception as e:
mlflow.log_param("onnx_conversion_error", str(e))
mlflow.log_param("onnx_conversion_successful", False)

# ONNX conversion for Spark ML is experimental and may not work
# for all model types. Consider using PyFunc format instead.

注意: Spark ML 到 ONNX 的转换在 onnxmltools 中是实验性的,可能不支持所有 Spark ML 运算符。 对于生产部署,请考虑使用 MLflow 的 PyFunc 格式以获得更广泛的兼容性。

生产部署

def production_batch_scoring(model_uri, input_path, output_path):
"""Simple production batch scoring pipeline."""

with mlflow.start_run(run_name="Batch_Scoring"):
# Load production model
model = mlflow.spark.load_model(model_uri)

# Load input data
input_data = spark.read.parquet(input_path)

# Generate predictions
predictions = model.transform(input_data)

# Add metadata
predictions_with_metadata = predictions.withColumn(
"prediction_timestamp", F.current_timestamp()
)

# Write predictions
(predictions_with_metadata.write.mode("overwrite").parquet(output_path))

# Log job metrics
record_count = predictions.count()
mlflow.log_metrics({"records_processed": record_count, "job_success": 1})

return output_path


# Usage
production_batch_scoring(
model_uri="models:/CustomerSegmentationModel/Production",
input_path="s3://data-lake/daily-customers/",
output_path="s3://predictions/customer-segments/",
)

错误处理和最佳实践

def train_spark_model_with_error_handling(data_path, model_config):
"""Production-ready model training with error handling."""

with mlflow.start_run(run_name="Robust_Training"):
try:
# Load and validate data
data = spark.read.parquet(data_path)
record_count = data.count()

if record_count == 0:
raise ValueError("Input dataset is empty")

mlflow.log_metric("input_record_count", record_count)

# Create and train pipeline
pipeline = create_pipeline(model_config)
model = pipeline.fit(data)

# Validate model can make predictions
test_sample = data.limit(10)
predictions = model.transform(test_sample)
prediction_count = predictions.count()

if prediction_count != 10:
raise ValueError("Model validation failed")

# Log successful model
model_info = mlflow.spark.log_model(
spark_model=model, artifact_path="robust_model"
)

mlflow.log_param("training_status", "success")
return model_info

except Exception as e:
# Log error information
mlflow.log_param("training_status", "failed")
mlflow.log_param("error_message", str(e))
raise


def create_pipeline(config):
"""Create ML pipeline from configuration."""

# Simple pipeline creation logic
feature_cols = config.get("feature_columns", [])
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

algorithm = config.get("algorithm", "logistic_regression")
if algorithm == "logistic_regression":
classifier = LogisticRegression(featuresCol="features", labelCol="label")
elif algorithm == "random_forest":
classifier = RandomForestClassifier(featuresCol="features", labelCol="label")
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")

return Pipeline(stages=[assembler, classifier])

结论

MLflow 的 Spark MLlib 集成为跟踪和管理分布式机器学习工作流程提供了一个全面的解决方案。 无论您是构建简单的分类模型还是复杂的多阶段管道,MLflow 都可以帮助您保持可重复性并高效地部署模型。

将 MLflow 与 Spark MLlib 结合使用的主要好处包括

  • 完整的管道跟踪:自动记录包含所有参数和工件的多阶段 ML 管道
  • 灵活的部署:部署为原生 Spark 模型以进行批处理,或部署为 PyFunc 包装器以实现通用兼容性
  • 数据沿袭:通过 Spark 数据源自动记录自动跟踪数据源
  • 跨平台支持:ONNX 转换支持跨不同环境的部署
  • 生产就绪:模型注册表集成和强大的错误处理,适用于企业部署

本指南中显示的模式为构建可扩展的、可重复的分布式机器学习系统奠定了坚实的基础。 从基本的模型日志记录开始,以立即获得实验跟踪的好处,然后随着需求的增长采用高级功能,例如数据源自动记录和模型注册表集成。