MLflow 插件
MLflow 的插件架构能够与第三方工具和自定义基础设施无缝集成。作为一个与框架无关的平台,MLflow 提供了开发者 API,用于扩展存储、身份验证、执行后端和模型评估等功能。
快速入门
安装和使用插件
尝试内置的测试插件,看看插件是如何工作的
# Clone MLflow and install example plugin
git clone https://github.com/mlflow/mlflow
cd mlflow
pip install -e tests/resources/mlflow-test-plugin
# Use the plugin with custom tracking URI scheme
MLFLOW_TRACKING_URI=file-plugin:$(PWD)/mlruns python examples/quickstart/mlflow_tracking.py
# Launch MLflow UI to view results
mlflow server --backend-store-uri ./mlruns
打开 https://:5000 查看你追踪的实验
插件优势
插件允许你将 MLflow 与你现有的基础设施集成,而无需修改 MLflow 核心代码,从而确保平滑的升级和维护。
插件类型 & 用例
MLflow 支持八种类型的插件,每种类型都满足不同的集成需求
存储和持久化
插件类型 | 目的 | 用例示例 |
---|---|---|
跟踪存储 | 自定义实验数据存储 | 企业数据库,云数据仓库 |
工件仓库 | 自定义工件存储 | 内部 blob 存储,专用文件系统 |
模型注册表存储 | 自定义模型注册表后端 | 企业模型目录,版本控制系统 |
身份验证和标头
插件类型 | 目的 | 用例示例 |
---|---|---|
请求身份验证提供程序 | 自定义身份验证 | OAuth,API 密钥,基于证书的身份验证 |
请求标头提供程序 | 自定义 HTTP 标头 | 环境识别,合规性标头 |
运行上下文提供程序 | 自动运行元数据 | Git 信息,环境详细信息,自定义标签 |
执行和评估
插件类型 | 目的 | 用例示例 |
---|---|---|
项目后端 | 自定义执行环境 | 内部集群,作业调度器,云平台 |
模型评估器 | 自定义评估指标 | 特定领域的验证,自定义测试套件 |
部署 | 自定义服务平台 | 内部服务基础设施,边缘部署 |
开发自定义插件
插件结构
将插件创建为独立的 Python 包
# setup.py
from setuptools import setup
setup(
name="my-mlflow-plugin",
version="0.1.0",
install_requires=["mlflow>=2.0.0"],
entry_points={
# Define plugin entry points
"mlflow.tracking_store": "my-scheme=my_plugin.store:MyTrackingStore",
"mlflow.artifact_repository": "my-scheme=my_plugin.artifacts:MyArtifactRepo",
"mlflow.run_context_provider": "unused=my_plugin.context:MyContextProvider",
"mlflow.request_auth_provider": "unused=my_plugin.auth:MyAuthProvider",
"mlflow.model_evaluator": "my-evaluator=my_plugin.evaluator:MyEvaluator",
"mlflow.project_backend": "my-backend=my_plugin.backend:MyBackend",
"mlflow.deployments": "my-target=my_plugin.deployment",
"mlflow.app": "my-app=my_plugin.app:create_app",
},
)
存储插件
- 跟踪存储
- 工件仓库
- 模型注册表存储
# my_plugin/store.py
from mlflow.store.tracking.abstract_store import AbstractStore
class MyTrackingStore(AbstractStore):
"""Custom tracking store for scheme 'my-scheme://'"""
def __init__(self, store_uri):
super().__init__()
self.store_uri = store_uri
# Initialize your custom storage backend
def create_experiment(self, name, artifact_location=None, tags=None):
# Implement experiment creation logic
pass
def log_metric(self, run_id, metric):
# Implement metric logging logic
pass
def log_param(self, run_id, param):
# Implement parameter logging logic
pass
# Implement other required AbstractStore methods...
# my_plugin/artifacts.py
from mlflow.store.artifact.artifact_repo import ArtifactRepository
class MyArtifactRepo(ArtifactRepository):
"""Custom artifact repository for scheme 'my-scheme://'"""
def __init__(self, artifact_uri):
super().__init__(artifact_uri)
# Initialize your artifact storage backend
def log_artifact(self, local_file, artifact_path=None):
# Upload file to your storage system
pass
def log_artifacts(self, local_dir, artifact_path=None):
# Upload directory to your storage system
pass
def list_artifacts(self, path=None):
# List artifacts in your storage system
pass
def download_artifacts(self, artifact_path, dst_path=None):
# Download artifacts from your storage system
pass
# my_plugin/registry.py
from mlflow.store.model_registry.abstract_store import AbstractStore
class MyModelRegistryStore(AbstractStore):
"""Custom model registry store for scheme 'my-scheme://'"""
def __init__(self, store_uri):
super().__init__()
self.store_uri = store_uri
# Initialize your model registry backend
def create_registered_model(self, name, tags=None, description=None):
# Implement model registration logic
pass
def create_model_version(
self, name, source, run_id=None, tags=None, run_link=None, description=None
):
# Implement model version creation logic
pass
def get_registered_model(self, name):
# Implement model retrieval logic
pass
# Implement other required AbstractStore methods...
身份验证插件
- 请求身份验证提供程序
- 运行上下文提供程序
- 请求标头提供程序
# my_plugin/auth.py
from mlflow.tracking.request_auth.abstract_request_auth_provider import (
RequestAuthProvider,
)
class MyAuthProvider(RequestAuthProvider):
"""Custom authentication provider"""
def get_name(self):
return "my_auth_provider"
def get_auth(self):
# Return authentication object for HTTP requests
# Can be anything that requests.auth accepts
return MyCustomAuth()
class MyCustomAuth:
"""Custom authentication class"""
def __call__(self, request):
# Add authentication headers to request
token = self._get_token()
request.headers["Authorization"] = f"Bearer {token}"
return request
def _get_token(self):
# Implement token retrieval logic
# E.g., read from file, environment, or API call
pass
用法
export MLFLOW_TRACKING_AUTH=my_auth_provider
python your_mlflow_script.py
# my_plugin/context.py
from mlflow.tracking.context.abstract_context import RunContextProvider
class MyContextProvider(RunContextProvider):
"""Automatically add custom tags to runs"""
def in_context(self):
# Return True if this context applies
return True
def tags(self):
# Return dictionary of tags to add to runs
return {
"environment": self._get_environment(),
"team": self._get_team(),
"cost_center": self._get_cost_center(),
}
def _get_environment(self):
# Detect environment (dev/staging/prod)
pass
def _get_team(self):
# Get team from environment or config
pass
def _get_cost_center(self):
# Get cost center for billing
pass
# my_plugin/headers.py
from mlflow.tracking.request_header.abstract_request_header_provider import (
RequestHeaderProvider,
)
class MyHeaderProvider(RequestHeaderProvider):
"""Add custom headers to MLflow requests"""
def in_context(self):
return True
def request_headers(self):
return {
"X-Client-Version": self._get_client_version(),
"X-Environment": self._get_environment(),
"X-User-Agent": "MyOrganization-MLflow-Client",
}
def _get_client_version(self):
# Return your client version
return "1.0.0"
def _get_environment(self):
# Detect environment context
return os.getenv("DEPLOYMENT_ENV", "development")
执行插件
项目后端插件
# my_plugin/backend.py
from mlflow.projects.backend import AbstractBackend
from mlflow.projects.submitted_run import SubmittedRun
class MyBackend(AbstractBackend):
"""Custom execution backend for MLflow Projects"""
def run(
self,
project_uri,
entry_point,
parameters,
version,
backend_config,
tracking_uri,
experiment_id,
):
"""Execute project on custom infrastructure"""
# Parse backend configuration
cluster_config = backend_config.get("cluster_config", {})
# Submit job to your execution system
job_id = self._submit_job(
project_uri=project_uri,
entry_point=entry_point,
parameters=parameters,
cluster_config=cluster_config,
)
# Return SubmittedRun for monitoring
return MySubmittedRun(job_id, tracking_uri)
def _submit_job(self, project_uri, entry_point, parameters, cluster_config):
# Implement job submission to your infrastructure
# Return job ID for monitoring
pass
class MySubmittedRun(SubmittedRun):
"""Handle for submitted run"""
def __init__(self, job_id, tracking_uri):
self.job_id = job_id
self.tracking_uri = tracking_uri
super().__init__()
def wait(self):
# Wait for job completion and return success status
return self._poll_job_status()
def cancel(self):
# Cancel the running job
self._cancel_job()
def get_status(self):
# Get current job status
return self._get_job_status()
模型评估插件
# my_plugin/evaluator.py
from mlflow.models.evaluation import ModelEvaluator
from mlflow.models import EvaluationResult
class MyEvaluator(ModelEvaluator):
"""Custom model evaluator"""
def can_evaluate(self, *, model_type, evaluator_config, **kwargs):
"""Check if this evaluator can handle the model type"""
supported_types = ["classifier", "regressor"]
return model_type in supported_types
def evaluate(
self, *, model, model_type, dataset, run_id, evaluator_config, **kwargs
):
"""Perform custom evaluation"""
# Get predictions
predictions = model.predict(dataset.features_data)
# Compute custom metrics
metrics = self._compute_custom_metrics(
predictions, dataset.labels_data, evaluator_config
)
# Generate custom artifacts
artifacts = self._generate_artifacts(predictions, dataset, evaluator_config)
return EvaluationResult(metrics=metrics, artifacts=artifacts)
def _compute_custom_metrics(self, predictions, labels, config):
# Implement domain-specific metrics
return {
"custom_score": self._calculate_custom_score(predictions, labels),
"business_metric": self._calculate_business_metric(predictions, labels),
}
def _generate_artifacts(self, predictions, dataset, config):
# Generate custom plots, reports, etc.
return {}
流行的社区插件
- 存储解决方案
- 模型部署
- 模型评估
- 执行后端
- 企业解决方案
SQL Server 插件
将工件直接存储在 SQL Server 数据库中
pip install mlflow[sqlserver]
import mlflow
# Use SQL Server as artifact store
db_uri = "mssql+pyodbc://user:pass@host:port/db?driver=ODBC+Driver+17+for+SQL+Server"
mlflow.create_experiment("sql_experiment", artifact_location=db_uri)
with mlflow.start_run():
mlflow.onnx.log_model(model, name="model") # Stored as BLOB in SQL Server
阿里云 OSS 插件
与阿里云对象存储服务集成
pip install mlflow[aliyun-oss]
import os
import mlflow
# Configure OSS credentials
os.environ["MLFLOW_OSS_ENDPOINT_URL"] = "https://oss-region.aliyuncs.com"
os.environ["MLFLOW_OSS_KEY_ID"] = "your_access_key"
os.environ["MLFLOW_OSS_KEY_SECRET"] = "your_secret_key"
# Use OSS as artifact store
mlflow.create_experiment("oss_experiment", artifact_location="oss://bucket/path")
XetHub 插件
使用 XetHub 进行版本化的工件存储
pip install mlflow[xethub]
import mlflow
# Authenticate with XetHub (via CLI or environment variables)
mlflow.create_experiment(
"xet_experiment", artifact_location="xet://username/repo/branch"
)
Elasticsearch 插件
使用 Elasticsearch 进行实验跟踪
pip install mlflow-elasticsearchstore
插件 | 目标平台 | 安装 |
---|---|---|
mlflow-redisai | RedisAI | pip install mlflow-redisai |
mlflow-torchserve | TorchServe | pip install mlflow-torchserve |
mlflow-ray-serve | Ray Serve | pip install mlflow-ray-serve |
mlflow-azureml | Azure ML | Azure ML 内置 |
oci-mlflow | Oracle Cloud | pip install oci-mlflow |
示例部署用法
import mlflow.deployments
# Deploy to custom target
client = mlflow.deployments.get_deploy_client("redisai")
client.create_deployment(
name="my_model", model_uri="models:/MyModel/1", config={"device": "GPU"}
)
Giskard 插件
全面的模型验证和偏差检测
pip install mlflow-giskard
import mlflow
# Evaluate with Giskard
result = mlflow.evaluate(
model,
eval_data,
evaluators=["giskard"],
evaluator_config={
"giskard": {
"test_suite": "full_suite",
"bias_tests": True,
"performance_tests": True,
}
},
)
检测漏洞
- 性能偏差
- 伦理偏差
- 数据泄露
- 过度自信/信心不足
- 虚假关联
Trubrics 插件
高级模型验证框架
pip install trubrics-sdk
插件 | 平台 | 用例 |
---|---|---|
mlflow-yarn | Hadoop/YARN | 大数据处理集群 |
oci-mlflow | Oracle Cloud | Oracle 云基础设施 |
用法示例
# Run MLflow project on YARN
mlflow run . --backend yarn --backend-config yarn-config.json
JFrog Artifactory 插件
企业工件治理
pip install mlflow[jfrog]
主要特性
- 工件存储在 JFrog Artifactory 中
- 完整的生命周期管理
- 企业安全和合规性
- 与 JFrog 平台工具集成
设置
export ARTIFACTORY_AUTH_TOKEN="your_token"
mlflow server \
--host 0.0.0.0 \
--port 5000 \
--artifacts-destination "artifactory://artifactory.company.com/artifactory/ml-models"
用法
import mlflow
from transformers import pipeline
mlflow.set_tracking_uri("http://your-mlflow-server:5000")
classifier = pipeline("sentiment-analysis", model="bert-base-uncased")
with mlflow.start_run():
mlflow.transformers.log_model(
transformers_model=classifier, name="model"
) # Automatically stored in JFrog Artifactory
配置
# Optional: Use HTTP instead of HTTPS
export ARTIFACTORY_NO_SSL=true
# Optional: Enable debug logging
export ARTIFACTORY_DEBUG=true
# Optional: Skip artifact deletion during garbage collection
export ARTIFACTORY_ARTIFACTS_DELETE_SKIP=true
测试你的插件
- 单元测试
- 集成测试
- 性能测试
# tests/test_my_plugin.py
import pytest
import mlflow
from my_plugin.store import MyTrackingStore
class TestMyTrackingStore:
def setup_method(self):
self.store = MyTrackingStore("my-scheme://test")
def test_create_experiment(self):
experiment_id = self.store.create_experiment("test_exp")
assert experiment_id is not None
def test_log_metric(self):
experiment_id = self.store.create_experiment("test_exp")
run = self.store.create_run(experiment_id, "user", "test_run")
metric = mlflow.entities.Metric("accuracy", 0.95, 12345, 0)
self.store.log_metric(run.info.run_id, metric)
# Verify metric was logged correctly
stored_run = self.store.get_run(run.info.run_id)
assert "accuracy" in stored_run.data.metrics
assert stored_run.data.metrics["accuracy"] == 0.95
def test_log_artifact(self):
# Test artifact logging functionality
pass
# tests/test_integration.py
import tempfile
import mlflow
from my_plugin import setup_test_environment
def test_end_to_end_workflow():
with setup_test_environment():
# Set tracking URI to use your plugin
mlflow.set_tracking_uri("my-scheme://test")
# Test full MLflow workflow
with mlflow.start_run():
mlflow.log_param("alpha", 0.5)
mlflow.log_metric("rmse", 0.8)
# Create and log a simple model
with tempfile.NamedTemporaryFile() as f:
f.write(b"model data")
f.flush()
mlflow.log_artifact(f.name, "model")
# Verify everything was stored correctly
runs = mlflow.search_runs()
assert len(runs) == 1
assert runs.iloc[0]["params.alpha"] == "0.5"
assert runs.iloc[0]["metrics.rmse"] == 0.8
# tests/test_performance.py
import time
import mlflow
import pytest
import threading
from my_plugin.store import MyTrackingStore
class TestPerformance:
def test_bulk_logging_performance(self):
store = MyTrackingStore("my-scheme://perf-test")
experiment_id = store.create_experiment("perf_test")
run = store.create_run(experiment_id, "user", "perf_run")
# Test bulk metric logging
start_time = time.time()
for i in range(1000):
metric = mlflow.entities.Metric(f"metric_{i}", i * 0.1, 12345, i)
store.log_metric(run.info.run_id, metric)
elapsed = time.time() - start_time
assert elapsed < 10.0 # Should complete within 10 seconds
# Verify all metrics were logged
stored_run = store.get_run(run.info.run_id)
assert len(stored_run.data.metrics) == 1000
def test_concurrent_access(self):
store = MyTrackingStore("my-scheme://concurrent-test")
results = []
def create_experiment(name):
exp_id = store.create_experiment(f"concurrent_{name}")
results.append(exp_id)
threads = [
threading.Thread(target=create_experiment, args=[i]) for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(set(results)) == 10 # All unique experiment IDs
分发和发布
包结构
my-mlflow-plugin/
├── setup.py # Package configuration
├── README.md # Plugin documentation
├── my_plugin/
│ ├── __init__.py
│ ├── store.py # Tracking store implementation
│ ├── artifacts.py # Artifact repository implementation
│ ├── auth.py # Authentication provider
│ └── evaluator.py # Model evaluator
├── tests/
│ ├── test_store.py
│ ├── test_artifacts.py
│ └── test_integration.py
└── examples/
└── example_usage.py
发布到 PyPI
# Build distribution packages
python setup.py sdist bdist_wheel
# Upload to PyPI
pip install twine
twine upload dist/*
文档模板
# My MLflow Plugin
## Installation
```bash
pip install my-mlflow-plugin
配置
export MY_PLUGIN_CONFIG="value"
用法
import mlflow
mlflow.set_tracking_uri("my-scheme://config")
功能
- 功能 1
- 功能 2
示例
有关完整的用法示例,请参阅 examples/ 目录。
最佳实践
插件开发
- 遵循 MLflow 接口 - 实现所有必需的抽象方法
- 优雅地处理错误 - 为配置问题提供清晰的错误消息
- 支持身份验证 - 与现有的凭证系统集成
- 添加全面的日志记录 - 帮助用户调试配置问题
- 版本兼容性 - 针对多个 MLflow 版本进行测试
性能优化
- 批量操作 - 尽可能实现高效的批量日志记录
- 连接池 - 重用与外部系统的连接
- 异步操作 - 在有益的情况下,对存储操作使用异步 I/O
- 缓存 - 缓存经常访问的元数据
安全注意事项
- 凭证管理 - 永远不要记录或暴露敏感凭证
- 输入验证 - 验证所有用户输入和 URI
- 访问控制 - 尊重现有的身份验证和授权
- 安全通信 - 对网络通信使用 TLS/SSL
测试策略
- 单元测试 - 测试各个插件组件
- 集成测试 - 使用 MLflow 测试完整的工作流程
- 性能测试 - 验证可接受的性能特征
- 兼容性测试 - 使用不同的 MLflow 版本进行测试
准备好扩展 MLflow 了吗? 从示例测试插件开始,了解所有插件类型的使用方法,然后构建你的自定义集成!