mlflow.spark
mlflow.spark 模块提供了用于记录和加载 Spark MLlib 模型的 API。该模块导出了以下风格的 Spark MLlib 模型:
- Spark MLlib(原生)格式
允许模型被加载为 Spark Transformer,以便在 Spark 会话中进行评分。具有此风格的模型可以在 Python 中加载为 PySpark PipelineModel 对象。这是主要的风格,并且始终会生成。
mlflow.pyfunc通过实例化 SparkContext 并将输入数据读取为 Spark DataFrame 来支持在 Spark 外部进行部署以进行评分。还支持在 Spark 中作为 Spark UDF 进行部署。具有此风格的模型可以加载为 Python 函数以执行推理。此风格始终会生成。
- mlflow.spark.autolog(disable=False, silent=False)[source]
注意
自动日志记录已知与以下包版本兼容:
3.3.0<=pyspark<=4.0.1。当使用此范围外的包版本时,自动日志记录可能不会成功。启用(或禁用)并配置当 Spark 数据源路径被读取时,记录其路径、版本(如果适用)和格式。此方法不是线程安全的,并假定已存在一个带有 mlflow-spark JAR 附件的 SparkSession。它应该在 Spark Driver 上调用,而不是在 Executor 上调用(即,不要在 Spark 并行化的函数内调用此方法)。使用的 mlflow-spark JAR 必须与 Spark 的 Scala 版本匹配。请参阅 Maven Repository 获取可用版本。此 API 需要 Spark 3.0 或更高版本。
数据源信息被缓存在内存中,并被记录到所有后续的 MLflow 运行中,包括活动的 MLflow 运行(如果读取数据时存在活动运行)。请注意,通过此 API 目前不支持 Spark ML(MLlib)模型的自动日志记录。数据源自动日志记录是尽力而为的,这意味着如果 Spark 负载很重或 MLflow 日志记录因任何原因失败(例如,如果 MLflow 服务器不可用),则可能会丢失日志。
对于自动日志记录的任何意外问题,除了您的 MLflow 代码生成的 stderr 和 stdout 外,还请检查 Spark Driver 和 Executor 的日志 - 数据源信息是从 Spark 中提取的,因此用于调试的相关日志可能会出现在 Spark 日志中。
注意
Spark 数据源自动日志记录仅支持在单线程中记录到 MLflow 运行。
import mlflow.spark import os import shutil from pyspark.sql import SparkSession # Create and persist some dummy data # Note: the 2.12 in 'org.mlflow:mlflow-spark_2.12:2.16.2' below indicates the Scala # version, please match this with that of Spark. The 2.16.2 indicates the mlflow version. # Note: On environments like Databricks with pre-created SparkSessions, # ensure the org.mlflow:mlflow-spark_2.12:2.16.2 is attached as a library to # your cluster spark = ( SparkSession.builder.config( "spark.jars.packages", "org.mlflow:mlflow-spark_2.12:2.16.2", ) .master("local[*]") .getOrCreate() ) df = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) import tempfile tempdir = tempfile.mkdtemp() df.write.csv(os.path.join(tempdir, "my-data-path"), header=True) # Enable Spark datasource autologging. mlflow.spark.autolog() loaded_df = spark.read.csv( os.path.join(tempdir, "my-data-path"), header=True, inferSchema=True ) # Call toPandas() to trigger a read of the Spark datasource. Datasource info # (path and format) is logged to the current active run, or the # next-created MLflow run if no run is currently active with mlflow.start_run() as active_run: pandas_df = loaded_df.toPandas()
- 参数
disable – 如果为
True,则禁用 Spark 数据源自动日志记录集成。如果为False,则启用 Spark 数据源自动日志记录集成。silent – 如果为
True,则在 Spark 数据源自动日志记录期间抑制 MLflow 的所有事件日志和警告。如果为False,则在 Spark 数据源自动日志记录期间显示所有事件和警告。
- mlflow.spark.get_default_conda_env(is_spark_connect_model=False)[source]
- 返回
通过调用
save_model()和log_model()生成的 MLflow Models 的默认 Conda 环境。此 Conda 环境包含调用者系统上安装的当前 PySpark 版本。PySpark 的dev版本在生成的 Conda 环境中被替换为稳定版本(例如,如果您运行的 PySpark 版本是2.4.5.dev0,调用此方法会生成一个依赖于 PySpark 版本2.4.5的 Conda 环境)。
- mlflow.spark.get_default_pip_requirements(is_spark_connect_model=False)[source]
- 返回
此风格生成的 MLflow Models 的默认 pip 需求列表。调用
save_model()和log_model()会生成一个 pip 环境,其中至少包含这些需求。
- mlflow.spark.load_model(model_uri, dfs_tmpdir=None, dst_path=None)[source]
从路径加载 Spark MLlib 模型。
- 参数
model_uri –
The location, in URI format, of the MLflow model, for example
/Users/me/path/to/local/modelrelative/path/to/local/models3://my_bucket/path/to/modelruns:/<mlflow_run_id>/run-relative/path/to/modelmodels:/<model_name>/<model_version>models:/<model_name>/<stage>
For more information about supported URI schemes, see Referencing Artifacts.
dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(在本地模式下运行)上的临时目录路径。模型将从该目标加载。默认为
/tmp/mlflow。dst_path – The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created.
- 返回
pyspark.ml.pipeline.PipelineModel
import mlflow model = mlflow.spark.load_model("spark-model") # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) # Make predictions on test documents prediction = model.transform(test)
- mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, code_paths=None, dfs_tmpdir=None, registered_model_name=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix, str, bytes, tuple] = None, await_registration_for=300, pip_requirements=None, extra_pip_requirements=None, metadata=None)[source]
将 Spark MLlib 模型记录为当前运行的 MLflow 工件。这使用了 MLlib 持久化格式,并生成了一个带有 Spark 风格的 MLflow 模型。
注意:如果没有活动的运行,它将实例化一个运行以获取 run_id。
- 参数
spark_model –
要保存的 Spark 模型 - MLflow 只能保存实现 MLReadable 和 MLWritable 的 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代。
注意
提供的 Spark 模型的 transform 方法必须生成一个名为“prediction”的列,该列用作 MLflow pyfunc 模型的输出。大多数 Spark 模型默认生成名为“prediction”的输出列,其中包含预测标签。要将概率分类模型的概率列设置为输出列,您需要将“probabilityCol”参数设置为“prediction”,并将“predictionCol”参数设置为空字符串(例如,model.setProbabilityCol(“prediction”).setPredictionCol(“”))。
artifact_path – 运行相对工件路径。
conda_env –
Conda 环境的字典表示形式或 conda 环境 yaml 文件的路径。如果提供,则描述了模型应运行的环境。至少,它应该指定 get_default_conda_env() 中包含的依赖项。如果为
None,则将具有通过mlflow.models.infer_pip_requirements()推断出的 pip 需求的 Conda 环境添加到模型中。如果推断失败,则回退使用 get_default_pip_requirements。来自conda_env的 pip 需求被写入 piprequirements.txt文件,完整的 Conda 环境被写入conda.yaml。以下是 Conda 环境的字典表示的 *示例*:{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths –
A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.
For a detailed explanation of
code_pathsfunctionality, recommended usage patterns and limitations, see the code_paths usage guide.dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(在本地模式下运行)上的临时目录路径。模型将被写入此目标,然后复制到模型的工件目录中。这是必需的,因为 Spark ML 模型在集群上运行时会读取和写入 DFS。如果此操作成功完成,则删除在 DFS 上创建的所有临时文件。默认为
/tmp/mlflow。对于在 pyspark.ml.connect 模块中定义的模型,此参数将被忽略。registered_model_name – 如果提供,则在
registered_model_name下创建一个模型版本,如果给定名称的注册模型不存在,也会创建该注册模型。signature –
一个 Model Signature 对象,描述模型的输入和输出 Schema。可以使用
mlflow.models.signature的 infer_signature 函数推断模型签名。注意,如果您的 Spark 模型包含 Spark ML 向量类型的输入或输出列,您应该为该列创建SparkMLVector向量类型,infer_signature 函数也可以正确地从 Spark DataFrame 输入/输出推断SparkMLVector向量类型。在将具有SparkMLVector向量类型输入的 Spark ML 模型加载为 MLflow pyfunc 模型时,它接受Array[double]类型输入。MLflow 内部将数组转换为 Spark ML 向量,然后调用 Spark 模型进行推理。同样,如果模型具有向量类型输出,MLflow 会将 Spark ML 向量输出数据内部转换为Array[double]类型推理结果。from mlflow.models import infer_signature from pyspark.sql.functions import col from pyspark.ml.classification import LogisticRegression from pyspark.ml.functions import array_to_vector import pandas as pd import mlflow train_df = spark.createDataFrame( [([3.0, 4.0], 0), ([5.0, 6.0], 1)], schema="features array<double>, label long" ).select(array_to_vector("features").alias("features"), col("label")) lor = LogisticRegression(maxIter=2) lor.setPredictionCol("").setProbabilityCol("prediction") lor_model = lor.fit(train_df) test_df = train_df.select("features") prediction_df = lor_model.transform(train_df).select("prediction") signature = infer_signature(test_df, prediction_df) with mlflow.start_run() as run: model_info = mlflow.spark.log_model( lor_model, "model", signature=signature, ) # The following signature is outputted: # inputs: # ['features': SparkML vector (required)] # outputs: # ['prediction': SparkML vector (required)] print(model_info.signature) loaded = mlflow.pyfunc.load_model(model_info.model_uri) test_dataset = pd.DataFrame({"features": [[1.0, 2.0]]}) # `loaded.predict` accepts `Array[double]` type input column, # and generates `Array[double]` type output column. print(loaded.predict(test_dataset))
input_example – 一个或多个有效的模型输入实例。输入示例用作要馈送给模型的数据的提示。它将被转换为 Pandas DataFrame,然后使用 Pandas 的面向拆分(split-oriented)格式序列化为 json,或者转换为 numpy 数组,其中示例将通过转换为列表来序列化为 json。字节将进行 base64 编码。当
signature参数为None时,输入示例用于推断模型签名。await_registration_for – 等待模型版本完成创建并处于
READY状态的秒数。默认情况下,函数等待五分钟。指定 0 或 None 可跳过等待。pip_requirements – pip 需求字符串的可迭代对象(例如
["pyspark", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt")。如果提供,则描述了模型应运行的环境。如果为None,则通过mlflow.models.infer_pip_requirements()从当前软件环境中推断出默认需求列表。如果需求推断失败,则回退使用 get_default_pip_requirements。需求和约束都会被自动解析并分别写入模型中的requirements.txt和constraints.txt文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的pip部分。extra_pip_requirements –
pip 需求字符串的可迭代对象(例如
["pandas", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt")。如果提供,则描述了将追加到基于用户当前软件环境自动生成的默认 pip 需求集中的额外 pip 需求。需求和约束都会被自动解析并分别写入模型中的requirements.txt和constraints.txt文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的pip部分。警告
以下参数不能同时指定
conda_envpip_requirementsextra_pip_requirements
此示例演示了如何使用
pip_requirements和extra_pip_requirements指定 pip requirements。metadata – 传递给模型并存储在 MLmodel 文件中的自定义元数据字典。
- 返回
一个
ModelInfo实例,其中包含已记录模型的元数据。
from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer training = spark.createDataFrame( [ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), ], ["id", "text", "label"], ) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) mlflow.spark.log_model(model, "spark-model")
- mlflow.spark.save_model(spark_model, path, mlflow_model=None, conda_env=None, code_paths=None, dfs_tmpdir=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix, str, bytes, tuple] = None, pip_requirements=None, extra_pip_requirements=None, metadata=None)[source]
将 Spark MLlib 模型保存到本地路径。
默认情况下,此函数使用 Spark MLlib 持久化机制保存模型。
- 参数
spark_model – 要保存的 Spark 模型 - MLflow 只能保存实现 MLReadable 和 MLWritable 的 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代。
path – 要保存模型的本地路径。
mlflow_model – 正在添加到此风格的 MLflow 模型配置。
conda_env –
Conda 环境的字典表示形式或 conda 环境 yaml 文件的路径。如果提供,则描述了模型应运行的环境。至少,它应该指定 get_default_conda_env() 中包含的依赖项。如果为
None,则将具有通过mlflow.models.infer_pip_requirements()推断出的 pip 需求的 Conda 环境添加到模型中。如果推断失败,则回退使用 get_default_pip_requirements。来自conda_env的 pip 需求被写入 piprequirements.txt文件,完整的 Conda 环境被写入conda.yaml。以下是 Conda 环境的字典表示的 *示例*:{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths –
A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.
For a detailed explanation of
code_pathsfunctionality, recommended usage patterns and limitations, see the code_paths usage guide.dfs_tmpdir – 分布式(Hadoop)文件系统(DFS)或本地文件系统(在本地模式下运行)上的临时目录路径。模型将被写入此目标,然后复制到请求的本地路径。这是必需的,因为 Spark ML 模型在集群上运行时会读取和写入 DFS。如果此操作成功完成,则删除在 DFS 上创建的所有临时文件。默认为
/tmp/mlflow。signature – 请参阅
mlflow.spark.log_model()中signature参数的文档。input_example – 一个或多个有效的模型输入实例。输入示例用作要馈送给模型的数据的提示。它将被转换为 Pandas DataFrame,然后使用 Pandas 的面向拆分(split-oriented)格式序列化为 json,或者转换为 numpy 数组,其中示例将通过转换为列表来序列化为 json。字节将进行 base64 编码。当
signature参数为None时,输入示例用于推断模型签名。pip_requirements – pip 需求字符串的可迭代对象(例如
["pyspark", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt")。如果提供,则描述了模型应运行的环境。如果为None,则通过mlflow.models.infer_pip_requirements()从当前软件环境中推断出默认需求列表。如果需求推断失败,则回退使用 get_default_pip_requirements。需求和约束都会被自动解析并分别写入模型中的requirements.txt和constraints.txt文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的pip部分。extra_pip_requirements –
pip 需求字符串的可迭代对象(例如
["pandas", "-r requirements.txt", "-c constraints.txt"])或本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt")。如果提供,则描述了将追加到基于用户当前软件环境自动生成的默认 pip 需求集中的额外 pip 需求。需求和约束都会被自动解析并分别写入模型中的requirements.txt和constraints.txt文件。需求也会被写入模型 Conda 环境(conda.yaml)文件的pip部分。警告
以下参数不能同时指定
conda_envpip_requirementsextra_pip_requirements
此示例演示了如何使用
pip_requirements和extra_pip_requirements指定 pip requirements。metadata – 传递给模型并存储在 MLmodel 文件中的自定义元数据字典。