mlflow.spark

mlflow.spark 模块提供了用于记录和加载 Spark MLlib 模型的 API。该模块导出了以下风格的 Spark MLlib 模型:

Spark MLlib(原生)格式

允许模型被加载为 Spark Transformer,以便在 Spark 会话中进行评分。具有此风格的模型可以在 Python 中加载为 PySpark PipelineModel 对象。这是主要的风格,并且始终会生成。

mlflow.pyfunc

通过实例化 SparkContext 并将输入数据读取为 Spark DataFrame 来支持在 Spark 外部进行部署以进行评分。还支持在 Spark 中作为 Spark UDF 进行部署。具有此风格的模型可以加载为 Python 函数以执行推理。此风格始终会生成。

mlflow.spark.autolog(disable=False, silent=False)[source]

注意

自动日志记录已知与以下包版本兼容:3.3.0 <= pyspark <= 4.0.1。当使用此范围外的包版本时,自动日志记录可能不会成功。

启用(或禁用)并配置当 Spark 数据源路径被读取时,记录其路径、版本(如果适用)和格式。此方法不是线程安全的,并假定已存在一个带有 mlflow-spark JAR 附件的 SparkSession。它应该在 Spark Driver 上调用,而不是在 Executor 上调用(即,不要在 Spark 并行化的函数内调用此方法)。使用的 mlflow-spark JAR 必须与 Spark 的 Scala 版本匹配。请参阅 Maven Repository 获取可用版本。此 API 需要 Spark 3.0 或更高版本。

数据源信息被缓存在内存中,并被记录到所有后续的 MLflow 运行中,包括活动的 MLflow 运行(如果读取数据时存在活动运行)。请注意,通过此 API 目前不支持 Spark ML(MLlib)模型的自动日志记录。数据源自动日志记录是尽力而为的,这意味着如果 Spark 负载很重或 MLflow 日志记录因任何原因失败(例如,如果 MLflow 服务器不可用),则可能会丢失日志。

对于自动日志记录的任何意外问题,除了您的 MLflow 代码生成的 stderr 和 stdout 外,还请检查 Spark Driver 和 Executor 的日志 - 数据源信息是从 Spark 中提取的,因此用于调试的相关日志可能会出现在 Spark 日志中。

注意

Spark 数据源自动日志记录仅支持在单线程中记录到 MLflow 运行。

Example
import mlflow.spark
import os
import shutil
from pyspark.sql import SparkSession

# Create and persist some dummy data
# Note: the 2.12 in 'org.mlflow:mlflow-spark_2.12:2.16.2' below indicates the Scala
# version, please match this with that of Spark. The 2.16.2 indicates the mlflow version.
# Note: On environments like Databricks with pre-created SparkSessions,
# ensure the org.mlflow:mlflow-spark_2.12:2.16.2 is attached as a library to
# your cluster
spark = (
    SparkSession.builder.config(
        "spark.jars.packages",
        "org.mlflow:mlflow-spark_2.12:2.16.2",
    )
    .master("local[*]")
    .getOrCreate()
)
df = spark.createDataFrame(
    [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")],
    ["id", "text"],
)
import tempfile

tempdir = tempfile.mkdtemp()
df.write.csv(os.path.join(tempdir, "my-data-path"), header=True)
# Enable Spark datasource autologging.
mlflow.spark.autolog()
loaded_df = spark.read.csv(
    os.path.join(tempdir, "my-data-path"), header=True, inferSchema=True
)
# Call toPandas() to trigger a read of the Spark datasource. Datasource info
# (path and format) is logged to the current active run, or the
# next-created MLflow run if no run is currently active
with mlflow.start_run() as active_run:
    pandas_df = loaded_df.toPandas()
参数
  • disable – 如果为 True,则禁用 Spark 数据源自动日志记录集成。如果为 False,则启用 Spark 数据源自动日志记录集成。

  • silent – 如果为 True,则在 Spark 数据源自动日志记录期间抑制 MLflow 的所有事件日志和警告。如果为 False,则在 Spark 数据源自动日志记录期间显示所有事件和警告。

mlflow.spark.get_default_conda_env(is_spark_connect_model=False)[source]
返回

通过调用 save_model()log_model() 生成的 MLflow Models 的默认 Conda 环境。此 Conda 环境包含调用者系统上安装的当前 PySpark 版本。PySpark 的 dev 版本在生成的 Conda 环境中被替换为稳定版本(例如,如果您运行的 PySpark 版本是 2.4.5.dev0,调用此方法会生成一个依赖于 PySpark 版本 2.4.5 的 Conda 环境)。

mlflow.spark.get_default_pip_requirements(is_spark_connect_model=False)[source]
返回

此风格生成的 MLflow Models 的默认 pip 需求列表。调用 save_model()log_model() 会生成一个 pip 环境,其中至少包含这些需求。

mlflow.spark.load_model(model_uri, dfs_tmpdir=None, dst_path=None)[source]

从路径加载 Spark MLlib 模型。

参数
  • model_uri

    The location, in URI format, of the MLflow model, for example

    • /Users/me/path/to/local/model

    • relative/path/to/local/model

    • s3://my_bucket/path/to/model

    • runs:/<mlflow_run_id>/run-relative/path/to/model

    • models:/<model_name>/<model_version>

    • models:/<model_name>/<stage>

    For more information about supported URI schemes, see Referencing Artifacts.

  • dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(在本地模式下运行)上的临时目录路径。模型将从该目标加载。默认为 /tmp/mlflow

  • dst_path – The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created.

返回

pyspark.ml.pipeline.PipelineModel

示例
import mlflow

model = mlflow.spark.load_model("spark-model")
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame(
    [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")],
    ["id", "text"],
)
# Make predictions on test documents
prediction = model.transform(test)
mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, code_paths=None, dfs_tmpdir=None, registered_model_name=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix, str, bytes, tuple] = None, await_registration_for=300, pip_requirements=None, extra_pip_requirements=None, metadata=None)[source]

将 Spark MLlib 模型记录为当前运行的 MLflow 工件。这使用了 MLlib 持久化格式,并生成了一个带有 Spark 风格的 MLflow 模型。

注意:如果没有活动的运行,它将实例化一个运行以获取 run_id。

参数
  • spark_model

    要保存的 Spark 模型 - MLflow 只能保存实现 MLReadable 和 MLWritable 的 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代。

    注意

    提供的 Spark 模型的 transform 方法必须生成一个名为“prediction”的列,该列用作 MLflow pyfunc 模型的输出。大多数 Spark 模型默认生成名为“prediction”的输出列,其中包含预测标签。要将概率分类模型的概率列设置为输出列,您需要将“probabilityCol”参数设置为“prediction”,并将“predictionCol”参数设置为空字符串(例如,model.setProbabilityCol(“prediction”).setPredictionCol(“”))。

  • artifact_path – 运行相对工件路径。

  • conda_env

    Conda 环境的字典表示形式或 conda 环境 yaml 文件的路径。如果提供,则描述了模型应运行的环境。至少,它应该指定 get_default_conda_env() 中包含的依赖项。如果为 None,则将具有通过 mlflow.models.infer_pip_requirements() 推断出的 pip 需求的 Conda 环境添加到模型中。如果推断失败,则回退使用 get_default_pip_requirements。来自 conda_env 的 pip 需求被写入 pip requirements.txt 文件,完整的 Conda 环境被写入 conda.yaml。以下是 Conda 环境的字典表示的 *示例*:

    {
        "name": "mlflow-env",
        "channels": ["conda-forge"],
        "dependencies": [
            "python=3.8.15",
            {
                "pip": [
                    "pyspark==x.y.z"
                ],
            },
        ],
    }
    

  • code_paths

    A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.

    For a detailed explanation of code_paths functionality, recommended usage patterns and limitations, see the code_paths usage guide.

  • dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(在本地模式下运行)上的临时目录路径。模型将被写入此目标,然后复制到模型的工件目录中。这是必需的,因为 Spark ML 模型在集群上运行时会读取和写入 DFS。如果此操作成功完成,则删除在 DFS 上创建的所有临时文件。默认为 /tmp/mlflow。对于在 pyspark.ml.connect 模块中定义的模型,此参数将被忽略。

  • registered_model_name – 如果提供,则在 registered_model_name 下创建一个模型版本,如果给定名称的注册模型不存在,也会创建该注册模型。

  • signature

    一个 Model Signature 对象,描述模型的输入和输出 Schema。可以使用 mlflow.models.signatureinfer_signature 函数推断模型签名。注意,如果您的 Spark 模型包含 Spark ML 向量类型的输入或输出列,您应该为该列创建 SparkMLVector 向量类型,infer_signature 函数也可以正确地从 Spark DataFrame 输入/输出推断 SparkMLVector 向量类型。在将具有 SparkMLVector 向量类型输入的 Spark ML 模型加载为 MLflow pyfunc 模型时,它接受 Array[double] 类型输入。MLflow 内部将数组转换为 Spark ML 向量,然后调用 Spark 模型进行推理。同样,如果模型具有向量类型输出,MLflow 会将 Spark ML 向量输出数据内部转换为 Array[double] 类型推理结果。

    from mlflow.models import infer_signature
    from pyspark.sql.functions import col
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.functions import array_to_vector
    import pandas as pd
    import mlflow
    
    train_df = spark.createDataFrame(
        [([3.0, 4.0], 0), ([5.0, 6.0], 1)], schema="features array<double>, label long"
    ).select(array_to_vector("features").alias("features"), col("label"))
    lor = LogisticRegression(maxIter=2)
    lor.setPredictionCol("").setProbabilityCol("prediction")
    lor_model = lor.fit(train_df)
    
    test_df = train_df.select("features")
    prediction_df = lor_model.transform(train_df).select("prediction")
    
    signature = infer_signature(test_df, prediction_df)
    
    with mlflow.start_run() as run:
        model_info = mlflow.spark.log_model(
            lor_model,
            "model",
            signature=signature,
        )
    
    # The following signature is outputted:
    # inputs:
    #   ['features': SparkML vector (required)]
    # outputs:
    #   ['prediction': SparkML vector (required)]
    print(model_info.signature)
    
    loaded = mlflow.pyfunc.load_model(model_info.model_uri)
    
    test_dataset = pd.DataFrame({"features": [[1.0, 2.0]]})
    
    # `loaded.predict` accepts `Array[double]` type input column,
    # and generates `Array[double]` type output column.
    print(loaded.predict(test_dataset))
    

  • input_example – 一个或多个有效的模型输入实例。输入示例用作要馈送给模型的数据的提示。它将被转换为 Pandas DataFrame,然后使用 Pandas 的面向拆分(split-oriented)格式序列化为 json,或者转换为 numpy 数组,其中示例将通过转换为列表来序列化为 json。字节将进行 base64 编码。当 signature 参数为 None 时,输入示例用于推断模型签名。

  • await_registration_for – 等待模型版本完成创建并处于 READY 状态的秒数。默认情况下,函数等待五分钟。指定 0 或 None 可跳过等待。

  • pip_requirements – pip 需求字符串的可迭代对象(例如 ["pyspark", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如 "requirements.txt")。如果提供,则描述了模型应运行的环境。如果为 None,则通过 mlflow.models.infer_pip_requirements() 从当前软件环境中推断出默认需求列表。如果需求推断失败,则回退使用 get_default_pip_requirements。需求和约束都会被自动解析并分别写入模型中的 requirements.txtconstraints.txt 文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的 pip 部分。

  • extra_pip_requirements

    pip 需求字符串的可迭代对象(例如 ["pandas", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如 "requirements.txt")。如果提供,则描述了将追加到基于用户当前软件环境自动生成的默认 pip 需求集中的额外 pip 需求。需求和约束都会被自动解析并分别写入模型中的 requirements.txtconstraints.txt 文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的 pip 部分。

    警告

    以下参数不能同时指定

    • conda_env

    • pip_requirements

    • extra_pip_requirements

    此示例演示了如何使用 pip_requirementsextra_pip_requirements 指定 pip requirements。

  • metadata – 传递给模型并存储在 MLmodel 文件中的自定义元数据字典。

返回

一个 ModelInfo 实例,其中包含已记录模型的元数据。

Example
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

training = spark.createDataFrame(
    [
        (0, "a b c d e spark", 1.0),
        (1, "b d", 0.0),
        (2, "spark f g h", 1.0),
        (3, "hadoop mapreduce", 0.0),
    ],
    ["id", "text", "label"],
)
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
mlflow.spark.log_model(model, "spark-model")
mlflow.spark.save_model(spark_model, path, mlflow_model=None, conda_env=None, code_paths=None, dfs_tmpdir=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix, str, bytes, tuple] = None, pip_requirements=None, extra_pip_requirements=None, metadata=None)[source]

将 Spark MLlib 模型保存到本地路径。

默认情况下,此函数使用 Spark MLlib 持久化机制保存模型。

参数
  • spark_model – 要保存的 Spark 模型 - MLflow 只能保存实现 MLReadable 和 MLWritable 的 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代。

  • path – 要保存模型的本地路径。

  • mlflow_model – 正在添加到此风格的 MLflow 模型配置。

  • conda_env

    Conda 环境的字典表示形式或 conda 环境 yaml 文件的路径。如果提供,则描述了模型应运行的环境。至少,它应该指定 get_default_conda_env() 中包含的依赖项。如果为 None,则将具有通过 mlflow.models.infer_pip_requirements() 推断出的 pip 需求的 Conda 环境添加到模型中。如果推断失败,则回退使用 get_default_pip_requirements。来自 conda_env 的 pip 需求被写入 pip requirements.txt 文件,完整的 Conda 环境被写入 conda.yaml。以下是 Conda 环境的字典表示的 *示例*:

    {
        "name": "mlflow-env",
        "channels": ["conda-forge"],
        "dependencies": [
            "python=3.8.15",
            {
                "pip": [
                    "pyspark==x.y.z"
                ],
            },
        ],
    }
    

  • code_paths

    A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.

    For a detailed explanation of code_paths functionality, recommended usage patterns and limitations, see the code_paths usage guide.

  • dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(在本地模式下运行)上的临时目录路径。模型将被写入此目标,然后复制到请求的本地路径。这是必需的,因为 Spark ML 模型在集群上运行时会读取和写入 DFS。如果此操作成功完成,则删除在 DFS 上创建的所有临时文件。默认为 /tmp/mlflow

  • signature – 请参阅 mlflow.spark.log_model()signature 参数的文档。

  • input_example – 一个或多个有效的模型输入实例。输入示例用作要馈送给模型的数据的提示。它将被转换为 Pandas DataFrame,然后使用 Pandas 的面向拆分(split-oriented)格式序列化为 json,或者转换为 numpy 数组,其中示例将通过转换为列表来序列化为 json。字节将进行 base64 编码。当 signature 参数为 None 时,输入示例用于推断模型签名。

  • pip_requirements – pip 需求字符串的可迭代对象(例如 ["pyspark", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如 "requirements.txt")。如果提供,则描述了模型应运行的环境。如果为 None,则通过 mlflow.models.infer_pip_requirements() 从当前软件环境中推断出默认需求列表。如果需求推断失败,则回退使用 get_default_pip_requirements。需求和约束都会被自动解析并分别写入模型中的 requirements.txtconstraints.txt 文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的 pip 部分。

  • extra_pip_requirements

    pip 需求字符串的可迭代对象(例如 ["pandas", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如 "requirements.txt")。如果提供,则描述了将追加到基于用户当前软件环境自动生成的默认 pip 需求集中的额外 pip 需求。需求和约束都会被自动解析并分别写入模型中的 requirements.txtconstraints.txt 文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的 pip 部分。

    警告

    以下参数不能同时指定

    • conda_env

    • pip_requirements

    • extra_pip_requirements

    此示例演示了如何使用 pip_requirementsextra_pip_requirements 指定 pip requirements。

  • metadata – 传递给模型并存储在 MLmodel 文件中的自定义元数据字典。

示例
from mlflow import spark
from pyspark.ml.pipeline import PipelineModel

# your pyspark.ml.pipeline.PipelineModel type
model = ...
mlflow.spark.save_model(model, "spark-model")