mlflow.spark
The mlflow.spark 模块提供了一个用于记录和加载 Spark MLlib 模型的 API。此模块使用以下风格导出 Spark MLlib 模型:
- Spark MLlib(原生)格式
允许将模型加载为 Spark Transformer,以便在 Spark 会话中进行评分。具有此风格的模型可以作为 PySpark PipelineModel 对象在 Python 中加载。这是主要风格,并且始终生成。
mlflow.pyfunc支持在 Spark 外部进行部署,方法是实例化一个 SparkContext 并将输入数据读取为 Spark DataFrame,然后再进行评分。还支持在 Spark 中作为 Spark UDF 进行部署。具有此风格的模型可以作为 Python 函数加载以执行推理。此风格始终生成。
- mlflow.spark.autolog(disable=False, silent=False)[源]
注意
已知 Autologging 与以下软件包版本兼容:
3.3.0<=pyspark<=4.1.1。在超出此范围的软件包版本中使用时,Autologging 可能不会成功。启用(或禁用)和配置在读取 Spark 数据源路径、版本(如果适用)和格式时的日志记录。此方法不是线程安全的,并且假定已存在一个带有 SparkSession 且已附加 mlflow-spark JAR。应在 Spark 驱动程序上调用此方法,而不是在执行器上调用(即,不要在由 Spark 并行化的函数中调用此方法)。使用的 mlflow-spark JAR 必须与 Spark 的 Scala 版本匹配。请参阅 Maven 仓库以获取可用版本。此 API 需要 Spark 3.0 或更高版本。
数据源信息缓存在内存中,并记录到所有后续的 MLflow 运行中,包括活动的 MLflow 运行(如果读取数据时存在)。请注意,此 API 目前不支持记录 Spark ML (MLlib) 模型。数据源自动记录是尽力而为的,这意味着如果 Spark 负载很重或 MLflow 记录因任何原因失败(例如,如果 MLflow 服务器不可用),记录可能会被丢弃。
对于与自动记录相关的任何意外问题,请检查 Spark 驱动程序和执行器日志,以及从 MLflow 代码生成的 stderr 和 stdout - 数据源信息从 Spark 中拉取,因此与调试相关的日志可能会出现在 Spark 日志中。
注意
Spark 数据源自动记录仅支持在单个线程中记录到 MLflow 运行
import mlflow.spark import os import shutil from pyspark.sql import SparkSession # Create and persist some dummy data # Note: the 2.12 in 'org.mlflow:mlflow-spark_2.12:2.16.2' below indicates the Scala # version, please match this with that of Spark. The 2.16.2 indicates the mlflow version. # Note: On environments like Databricks with pre-created SparkSessions, # ensure the org.mlflow:mlflow-spark_2.12:2.16.2 is attached as a library to # your cluster spark = ( SparkSession.builder.config( "spark.jars.packages", "org.mlflow:mlflow-spark_2.12:2.16.2", ) .master("local[*]") .getOrCreate() ) df = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) import tempfile tempdir = tempfile.mkdtemp() df.write.csv(os.path.join(tempdir, "my-data-path"), header=True) # Enable Spark datasource autologging. mlflow.spark.autolog() loaded_df = spark.read.csv( os.path.join(tempdir, "my-data-path"), header=True, inferSchema=True ) # Call toPandas() to trigger a read of the Spark datasource. Datasource info # (path and format) is logged to the current active run, or the # next-created MLflow run if no run is currently active with mlflow.start_run() as active_run: pandas_df = loaded_df.toPandas()
- 参数
disable – 如果为
True,则禁用 Spark 数据源自动记录集成。如果为False,则启用 Spark 数据源自动记录集成。silent – 如果为
True,则在 Spark 数据源自动记录期间抑制来自 MLflow 的所有事件日志和警告。如果为False,则在 Spark 数据源自动记录期间显示所有事件和警告。
- mlflow.spark.get_default_conda_env(is_spark_connect_model=False)[源]
- 返回
调用
save_model()和log_model()生成的 MLflow 模型的默认 Conda 环境。此 Conda 环境包含调用者系统上安装的当前版本的 PySpark。dev版本的 PySpark 会在结果的 Conda 环境中替换为稳定版本(例如,如果您正在运行 PySpark 版本2.4.5.dev0,调用此方法会生成一个对 PySpark 版本2.4.5有依赖的 Conda 环境)。
- mlflow.spark.get_default_pip_requirements(is_spark_connect_model=False)[源]
- 返回
此风格生成的 MLflow 模型的默认 pip 要求列表。调用
save_model()和log_model()会生成一个至少包含这些要求的 pip 环境。
- mlflow.spark.load_model(model_uri, dfs_tmpdir=None, dst_path=None)[源]
从路径加载 Spark MLlib 模型。
- 参数
model_uri –
The location, in URI format, of the MLflow model, for example
/Users/me/path/to/local/modelrelative/path/to/local/models3://my_bucket/path/to/modelruns:/<mlflow_run_id>/run-relative/path/to/modelmodels:/<model_name>/<model_version>models:/<model_name>/<stage>
For more information about supported URI schemes, see Referencing Artifacts.
dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(如果以本地模式运行)上的临时目录路径。模型将从该目标加载。默认为
/tmp/mlflow。dst_path – The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created.
- 返回
pyspark.ml.pipeline.PipelineModel
import mlflow model = mlflow.spark.load_model("spark-model") # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) # Make predictions on test documents prediction = model.transform(test)
- mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, code_paths=None, dfs_tmpdir=None, registered_model_name=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix, str, bytes, tuple] = None, await_registration_for=300, pip_requirements=None, extra_pip_requirements=None, metadata=None)[源]
将 Spark MLlib 模型作为 MLflow artifact 记录到当前运行中。这使用 MLlib 持久化格式,并生成具有 Spark 风格的 MLflow 模型。
注意:如果没有活动的运行,它将实例化一个运行以获取 run_id。
- 参数
spark_model –
要保存的 Spark 模型 - MLflow 只能保存 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代,这些后代实现了 MLReadable 和 MLWritable。
注意
提供的 Spark 模型的 transform 方法必须生成一个名为“prediction”的列,该列用作 MLflow pyfunc 模型的输出。大多数 Spark 模型默认生成名称为“prediction”的输出列,其中包含预测标签。要将概率列设置为概率分类模型的输出列,您需要将“probabilityCol”参数设置为“prediction”,并将“predictionCol”参数设置为“”。(例如 model.setProbabilityCol(“prediction”).setPredictionCol(“”))
artifact_path – 相对运行的 artifact 路径。
conda_env –
Conda 环境的字典表示形式或本地文件系统上 conda 环境 yaml 文件的路径。如果提供,这描述了模型应运行的环境。至少,它应指定 get_default_conda_env() 中包含的依赖项。如果为
None,则会向模型添加通过mlflow.models.infer_pip_requirements()从当前系统中推断出的 pip 要求。如果要求推断失败,它将回退到使用 get_default_pip_requirements。来自conda_env的 pip 要求将被写入 piprequirements.txt文件,完整的 conda 环境将被写入conda.yaml。以下是 conda 环境的示例字典表示{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths –
A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.
For a detailed explanation of
code_pathsfunctionality, recommended usage patterns and limitations, see the code_paths usage guide.dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(如果以本地模式运行)上的临时目录路径。模型将在此目标中写入,然后复制到请求的模型 artifact 目录中。这是必需的,因为 Spark ML 模型在集群上运行时会从 DFS 读取和写入。如果此操作成功完成,则删除在 DFS 上创建的所有临时文件。默认为
/tmp/mlflow。对于在 pyspark.ml.connect 模块中定义的模型,此参数将被忽略。registered_model_name – 如果提供,则在
registered_model_name下创建一个模型版本,如果给定名称的注册模型不存在,也会创建该注册模型。signature –
一个 Model Signature 对象,描述模型的输入和输出 Schema。可以使用 mlflow.models.signature 的 infer_signature 函数推断模型签名。请注意,如果您的 Spark 模型包含 Spark ML 向量类型输入或输出列,您应该为该列创建
SparkMLVector向量类型,infer_signature 函数还可以从 Spark Dataframe 输入/输出正确推断SparkMLVector向量类型。当将具有SparkMLVector向量类型输入的 Spark ML 模型加载为 MLflow pyfunc 模型时,它接受Array[double]类型输入。MLflow 在内部将数组转换为 Spark ML 向量,然后调用 Spark 模型进行推理。类似地,如果模型具有向量类型输出,MLflow 会将 Spark ML 向量输出数据内部转换为Array[double]类型的推理结果。from mlflow.models import infer_signature from pyspark.sql.functions import col from pyspark.ml.classification import LogisticRegression from pyspark.ml.functions import array_to_vector import pandas as pd import mlflow train_df = spark.createDataFrame( [([3.0, 4.0], 0), ([5.0, 6.0], 1)], schema="features array<double>, label long" ).select(array_to_vector("features").alias("features"), col("label")) lor = LogisticRegression(maxIter=2) lor.setPredictionCol("").setProbabilityCol("prediction") lor_model = lor.fit(train_df) test_df = train_df.select("features") prediction_df = lor_model.transform(train_df).select("prediction") signature = infer_signature(test_df, prediction_df) with mlflow.start_run() as run: model_info = mlflow.spark.log_model( lor_model, "model", signature=signature, ) # The following signature is outputted: # inputs: # ['features': SparkML vector (required)] # outputs: # ['prediction': SparkML vector (required)] print(model_info.signature) loaded = mlflow.pyfunc.load_model(model_info.model_uri) test_dataset = pd.DataFrame({"features": [[1.0, 2.0]]}) # `loaded.predict` accepts `Array[double]` type input column, # and generates `Array[double]` type output column. print(loaded.predict(test_dataset))
input_example – 一个或多个有效的模型输入实例。输入示例用作要馈送给模型的数据的提示。它将被转换为 Pandas DataFrame,然后使用 Pandas 的面向拆分(split-oriented)格式序列化为 json,或者转换为 numpy 数组,其中示例将通过转换为列表来序列化为 json。字节将进行 base64 编码。当
signature参数为None时,输入示例用于推断模型签名。await_registration_for – 等待模型版本完成创建并处于
READY状态的秒数。默认情况下,函数等待五分钟。指定 0 或 None 可跳过等待。pip_requirements – 要么是 pip 要求字符串的可迭代对象(例如
["pyspark", "-r requirements.txt", "-c constraints.txt"]),要么是本地文件系统上的 pip 要求文件的字符串路径(例如"requirements.txt")。如果提供,这描述了模型应运行的环境。如果为None,则通过mlflow.models.infer_pip_requirements()从当前软件环境中推断出默认要求列表。如果要求推断失败,它将回退到使用 get_default_pip_requirements。要求和约束都会被自动解析并写入requirements.txt和constraints.txt文件,分别存储为模型的一部分。要求也会被写入模型 conda 环境 (conda.yaml) 文件的pip部分。extra_pip_requirements –
要么是 pip 要求字符串的可迭代对象(例如
["pandas", "-r requirements.txt", "-c constraints.txt"]),要么是本地文件系统上的 pip 要求文件的字符串路径(例如"requirements.txt")。如果提供,这描述了附加到基于用户当前软件环境自动生成的默认 pip 要求集上的额外 pip 要求。要求和约束都会被自动解析并写入requirements.txt和constraints.txt文件,分别存储为模型的一部分。要求也会被写入模型 conda 环境 (conda.yaml) 文件的pip部分。警告
以下参数不能同时指定
conda_envpip_requirementsextra_pip_requirements
此示例演示了如何使用
pip_requirements和extra_pip_requirements指定 pip requirements。metadata – 传递给模型并存储在 MLmodel 文件中的自定义元数据字典。
- 返回
一个
ModelInfo实例,其中包含已记录模型的元数据。
from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer training = spark.createDataFrame( [ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), ], ["id", "text", "label"], ) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) mlflow.spark.log_model(model, "spark-model")
- mlflow.spark.save_model(spark_model, path, mlflow_model=None, conda_env=None, code_paths=None, dfs_tmpdir=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix, str, bytes, tuple] = None, pip_requirements=None, extra_pip_requirements=None, metadata=None)[源]
将 Spark MLlib 模型保存到本地路径。
默认情况下,此函数使用 Spark MLlib 持久化机制保存模型。
- 参数
spark_model – 要保存的 Spark 模型 - MLflow 只能保存 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代,这些后代实现了 MLReadable 和 MLWritable。
path – 要保存模型的本地路径。
mlflow_model – 要将此风格添加到的 MLflow 模型配置。
conda_env –
Conda 环境的字典表示形式或本地文件系统上 conda 环境 yaml 文件的路径。如果提供,这描述了模型应运行的环境。至少,它应指定 get_default_conda_env() 中包含的依赖项。如果为
None,则会向模型添加通过mlflow.models.infer_pip_requirements()从当前系统中推断出的 pip 要求。如果要求推断失败,它将回退到使用 get_default_pip_requirements。来自conda_env的 pip 要求将被写入 piprequirements.txt文件,完整的 conda 环境将被写入conda.yaml。以下是 conda 环境的示例字典表示{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths –
A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.
For a detailed explanation of
code_pathsfunctionality, recommended usage patterns and limitations, see the code_paths usage guide.dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(如果以本地模式运行)上的临时目录路径。模型将在此目标中写入,然后复制到请求的本地路径。这是必需的,因为 Spark ML 模型在集群上运行时会从 DFS 读取和写入。如果此操作成功完成,则删除在 DFS 上创建的所有临时文件。默认为
/tmp/mlflow。signature – 请参阅
mlflow.spark.log_model()中参数signature的文档。input_example – 一个或多个有效的模型输入实例。输入示例用作要馈送给模型的数据的提示。它将被转换为 Pandas DataFrame,然后使用 Pandas 的面向拆分(split-oriented)格式序列化为 json,或者转换为 numpy 数组,其中示例将通过转换为列表来序列化为 json。字节将进行 base64 编码。当
signature参数为None时,输入示例用于推断模型签名。pip_requirements – 要么是 pip 要求字符串的可迭代对象(例如
["pyspark", "-r requirements.txt", "-c constraints.txt"]),要么是本地文件系统上的 pip 要求文件的字符串路径(例如"requirements.txt")。如果提供,这描述了模型应运行的环境。如果为None,则通过mlflow.models.infer_pip_requirements()从当前软件环境中推断出默认要求列表。如果要求推断失败,它将回退到使用 get_default_pip_requirements。要求和约束都会被自动解析并写入requirements.txt和constraints.txt文件,分别存储为模型的一部分。要求也会被写入模型 conda 环境 (conda.yaml) 文件的pip部分。extra_pip_requirements –
要么是 pip 要求字符串的可迭代对象(例如
["pandas", "-r requirements.txt", "-c constraints.txt"]),要么是本地文件系统上的 pip 要求文件的字符串路径(例如"requirements.txt")。如果提供,这描述了附加到基于用户当前软件环境自动生成的默认 pip 要求集上的额外 pip 要求。要求和约束都会被自动解析并写入requirements.txt和constraints.txt文件,分别存储为模型的一部分。要求也会被写入模型 conda 环境 (conda.yaml) 文件的pip部分。警告
以下参数不能同时指定
conda_envpip_requirementsextra_pip_requirements
此示例演示了如何使用
pip_requirements和extra_pip_requirements指定 pip requirements。metadata – 传递给模型并存储在 MLmodel 文件中的自定义元数据字典。