MLflow 项目
MLflow 项目提供了一种标准格式,用于打包和共享可复现的数据科学代码。基于简单的约定,项目可以实现跨不同环境和平台的无缝协作和自动执行。
快速入门
运行您的第一个项目
将任何 Git 仓库或本地目录作为 MLflow 项目执行
# Run a project from GitHub
mlflow run https://github.com/mlflow/mlflow-example.git -P alpha=0.5
# Run a local project
mlflow run . -P data_file=data.csv -P regularization=0.1
# Run with specific entry point
mlflow run . -e validate -P data_file=data.csv
# Run projects programmatically
import mlflow
# Execute remote project
result = mlflow.run(
"https://github.com/mlflow/mlflow-example.git",
parameters={"alpha": 0.5, "l1_ratio": 0.01},
experiment_name="elasticnet_experiment",
)
# Execute local project
result = mlflow.run(
".", entry_point="train", parameters={"epochs": 100}, synchronous=True
)
项目结构
任何包含 MLproject
文件或包含 .py
/.sh
文件的目录都可以作为 MLflow 项目运行。 无需复杂的设置!
核心概念
项目组件
每个 MLflow 项目都包含三个关键要素
项目名称
项目的可读标识符,通常在 MLproject
文件中定义。
入口点
可以在项目中执行的命令。入口点定义
- 参数 - 具有类型和默认值的输入
- 命令 - 入口点运行时执行的内容
- 环境 - 执行上下文和依赖项
环境
包含运行项目所需的所有依赖项的软件环境。 MLflow 支持多种环境类型
环境 | 用例 | 依赖项 |
---|---|---|
Virtualenv (推荐) | 来自 PyPI 的 Python 包 | python_env.yaml |
Conda | Python + 本地库 | conda.yaml |
Docker | 复杂的依赖项,非 Python | Dockerfile |
系统 | 使用当前环境 | 无 |
项目结构 & 配置
基于约定的项目
没有 MLproject
文件的项目使用这些约定
my-project/
├── train.py # Executable entry point
├── validate.sh # Shell script entry point
├── conda.yaml # Optional: Conda environment
├── python_env.yaml # Optional: Python environment
└── data/ # Project data and assets
默认行为
- 名称:目录名
- 入口点:任何
.py
或.sh
文件 - 环境:来自
conda.yaml
的 Conda 环境,或仅 Python 环境 - 参数:通过命令行以
--key value
传递
MLproject 文件配置
要进行高级控制,请创建 MLproject
文件
name: My ML Project
# Environment specification (choose one)
python_env: python_env.yaml
# conda_env: conda.yaml
# docker_env:
# image: python:3.9
entry_points:
main:
parameters:
data_file: path
regularization: {type: float, default: 0.1}
max_epochs: {type: int, default: 100}
command: "python train.py --reg {regularization} --epochs {max_epochs} {data_file}"
validate:
parameters:
model_path: path
test_data: path
command: "python validate.py {model_path} {test_data}"
hyperparameter_search:
parameters:
search_space: uri
n_trials: {type: int, default: 50}
command: "python hyperparam_search.py --trials {n_trials} --config {search_space}"
参数类型
MLflow 支持四种参数类型,具有自动验证和转换
类型 | 描述 | 示例 | 特殊处理 |
---|---|---|---|
字符串 | 文本数据 | "hello world" | 无 |
浮点数 | 十进制数 | 0.1 , 3.14 | 验证 |
int | 整数 | 42 , 100 | 验证 |
path | 本地文件路径 | data.csv , s3://bucket/file | 将远程 URI 下载到本地文件 |
uri | 任何 URI | s3://bucket/ , ./local/path | 将相对路径转换为绝对路径 |
参数解析
path
参数在执行前自动将远程文件(S3、GCS 等)下载到本地存储。 对于可以直接从远程存储读取的应用程序,请使用 uri
。
环境管理
Python 虚拟环境(推荐)
为纯 Python 依赖项创建一个 python_env.yaml
文件
# python_env.yaml
python: "3.9.16"
# Optional: build dependencies
build_dependencies:
- pip
- setuptools
- wheel==0.37.1
# Runtime dependencies
dependencies:
- mlflow>=2.0.0
- scikit-learn==1.2.0
- pandas>=1.5.0
- numpy>=1.21.0
# MLproject
name: Python Project
python_env: python_env.yaml
entry_points:
main:
command: "python train.py"
Conda 环境
对于需要本地库或复杂依赖项的项目
# conda.yaml
name: ml-project
channels:
- conda-forge
- defaults
dependencies:
- python=3.9
- cudnn=8.2.1 # CUDA libraries
- scikit-learn
- pip
- pip:
- mlflow>=2.0.0
- tensorflow==2.10.0
# MLproject
name: Deep Learning Project
conda_env: conda.yaml
entry_points:
train:
parameters:
gpu_count: {type: int, default: 1}
command: "python train_model.py --gpus {gpu_count}"
Conda 条款
通过使用 Conda,您同意 Anaconda 的服务条款。
Docker 环境
为了获得最大的可重复性和复杂的系统依赖项
# Dockerfile
FROM python:3.9-slim
RUN apt-get update && apt-get install -y \
build-essential \
git \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install -r requirements.txt
WORKDIR /mlflow/projects/code
# MLproject
name: Containerized Project
docker_env:
image: my-ml-image:latest
volumes: ["/host/data:/container/data"]
environment:
- ["CUDA_VISIBLE_DEVICES", "0,1"]
- "AWS_PROFILE" # Copy from host
entry_points:
train:
command: "python distributed_training.py"
高级 Docker 选项
docker_env:
image: 012345678910.dkr.ecr.us-west-2.amazonaws.com/ml-training:v1.0
volumes:
- "/local/data:/data"
- "/tmp:/tmp"
environment:
- ["MODEL_REGISTRY", "s3://my-bucket/models"]
- ["EXPERIMENT_NAME", "production-training"]
- "MLFLOW_TRACKING_URI" # Copy from host
环境管理器选择
控制使用哪个环境管理器
# Force virtualenv (ignores conda.yaml)
mlflow run . --env-manager virtualenv
# Use local environment (no isolation)
mlflow run . --env-manager local
# Use conda (default if conda.yaml present)
mlflow run . --env-manager conda
执行 & 部署
本地执行
# Basic execution
mlflow run .
# With parameters
mlflow run . -P lr=0.01 -P batch_size=32
# Specific entry point
mlflow run . -e hyperparameter_search -P n_trials=100
# Custom environment
mlflow run . --env-manager virtualenv
远程执行
Databricks 平台
# Run on Databricks cluster
mlflow run . --backend databricks --backend-config cluster-config.json
// cluster-config.json
{
"cluster_spec": {
"new_cluster": {
"node_type_id": "i3.xlarge",
"num_workers": 2,
"spark_version": "11.3.x-scala2.12"
}
},
"run_name": "distributed-training"
}
Kubernetes 集群
# Run on Kubernetes
mlflow run . --backend kubernetes --backend-config k8s-config.json
// k8s-config.json
{
"kube-context": "my-cluster",
"repository-uri": "gcr.io/my-project/ml-training",
"kube-job-template-path": "k8s-job-template.yaml"
}
# k8s-job-template.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: "{replaced-with-project-name}"
namespace: mlflow
spec:
ttlSecondsAfterFinished: 3600
backoffLimit: 2
template:
spec:
containers:
- name: "{replaced-with-project-name}"
image: "{replaced-with-image-uri}"
command: ["{replaced-with-entry-point-command}"]
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
env:
- name: MLFLOW_TRACKING_URI
value: "https://my-mlflow-server.com"
restartPolicy: Never
Python API
import mlflow
from mlflow.projects import run
# Synchronous execution
result = run(
uri="https://github.com/mlflow/mlflow-example.git",
entry_point="main",
parameters={"alpha": 0.5},
backend="local",
synchronous=True,
)
# Asynchronous execution
submitted_run = run(
uri=".",
entry_point="train",
parameters={"epochs": 100},
backend="databricks",
backend_config="cluster-config.json",
synchronous=False,
)
# Monitor progress
if submitted_run.wait():
print("Training completed successfully!")
run_data = mlflow.get_run(submitted_run.run_id)
print(f"Final accuracy: {run_data.data.metrics['accuracy']}")
构建工作流
多步管道
将多个项目组合成复杂 ML 工作流
import mlflow
from mlflow.tracking import MlflowClient
def ml_pipeline():
client = MlflowClient()
# Step 1: Data preprocessing
prep_run = mlflow.run(
"./preprocessing", parameters={"input_path": "s3://bucket/raw-data"}
)
# Wait for completion and get output
if prep_run.wait():
prep_run_data = client.get_run(prep_run.run_id)
processed_data_path = prep_run_data.data.params["output_path"]
# Step 2: Feature engineering
feature_run = mlflow.run(
"./feature_engineering", parameters={"data_path": processed_data_path}
)
if feature_run.wait():
feature_data = client.get_run(feature_run.run_id)
features_path = feature_data.data.params["features_output"]
# Step 3: Parallel model training
model_runs = []
algorithms = ["random_forest", "xgboost", "neural_network"]
for algo in algorithms:
run = mlflow.run(
"./training",
entry_point=algo,
parameters={"features_path": features_path, "algorithm": algo},
synchronous=False, # Run in parallel
)
model_runs.append(run)
# Wait for all models and select best
best_model = None
best_metric = 0
for run in model_runs:
if run.wait():
run_data = client.get_run(run.run_id)
accuracy = run_data.data.metrics.get("accuracy", 0)
if accuracy > best_metric:
best_metric = accuracy
best_model = run.run_id
# Step 4: Deploy best model
if best_model:
mlflow.run(
"./deployment",
parameters={"model_run_id": best_model, "stage": "production"},
)
# Execute pipeline
ml_pipeline()
超参数优化
import mlflow
import itertools
from concurrent.futures import ThreadPoolExecutor
def hyperparameter_search():
# Define parameter grid
param_grid = {
"learning_rate": [0.01, 0.1, 0.2],
"n_estimators": [100, 200, 500],
"max_depth": [3, 6, 10],
}
# Generate all combinations
param_combinations = [
dict(zip(param_grid.keys(), values))
for values in itertools.product(*param_grid.values())
]
def train_model(params):
return mlflow.run("./training", parameters=params, synchronous=False)
# Launch parallel training jobs
with ThreadPoolExecutor(max_workers=5) as executor:
submitted_runs = list(executor.map(train_model, param_combinations))
# Collect results
results = []
for run in submitted_runs:
if run.wait():
run_data = mlflow.get_run(run.run_id)
results.append(
{
"run_id": run.run_id,
"params": run_data.data.params,
"metrics": run_data.data.metrics,
}
)
# Find best model
best_run = max(results, key=lambda x: x["metrics"].get("f1_score", 0))
print(f"Best model: {best_run['run_id']}")
print(f"Best F1 score: {best_run['metrics']['f1_score']}")
return best_run
# Execute hyperparameter search
best_model = hyperparameter_search()
高级功能
Docker 镜像构建
在执行期间构建自定义镜像
# Build new image based on project's base image
mlflow run . --backend kubernetes --build-image
# Use pre-built image
mlflow run . --backend kubernetes
# Programmatic image building
mlflow.run(
".",
backend="kubernetes",
backend_config="k8s-config.json",
build_image=True, # Creates new image with project code
docker_auth={ # Registry authentication
"username": "myuser",
"password": "mytoken",
},
)
Git 集成
MLflow 自动跟踪 Git 信息
# Run specific commit
mlflow run https://github.com/mlflow/mlflow-example.git --version <commit hash>
# Run branch
mlflow run https://github.com/mlflow/mlflow-example.git --version feature-branch
# Run from subdirectory
mlflow run https://github.com/my-repo.git#subdirectory/my-project
环境变量传播
关键环境变量会自动传递到执行环境
export MLFLOW_TRACKING_URI="https://my-tracking-server.com"
export AWS_PROFILE="ml-experiments"
export CUDA_VISIBLE_DEVICES="0,1"
# These variables are available in the project execution environment
mlflow run .
自定义后端开发
创建自定义执行后端
# custom_backend.py
from mlflow.projects.backend import AbstractBackend
class MyCustomBackend(AbstractBackend):
def run(
self,
project_uri,
entry_point,
parameters,
version,
backend_config,
tracking_uri,
experiment_id,
):
# Custom execution logic
# Return SubmittedRun object
pass
注册为插件
# setup.py
setup(
entry_points={
"mlflow.project_backend": [
"my-backend=my_package.custom_backend:MyCustomBackend"
]
}
)
最佳实践
项目组织
ml-project/
├── MLproject # Project configuration
├── python_env.yaml # Environment dependencies
├── src/ # Source code
│ ├── train.py
│ ├── evaluate.py
│ └── utils/
├── data/ # Sample/test data
├── configs/ # Configuration files
│ ├── model_config.yaml
│ └── hyperparams.json
├── tests/ # Unit tests
└── README.md # Project documentation
环境管理
开发提示
- 对于纯 Python 项目,使用 virtualenv
- 当您需要系统库(CUDA、Intel MKL)时,请使用 conda
- 对于复杂的依赖项或生产部署,请使用 Docker
- 在生产环境中锁定确切的版本
性能优化
# Fast iteration during development
python_env: python_env.yaml
entry_points:
develop:
command: "python train.py"
production:
parameters:
full_dataset: {type: path}
epochs: {type: int, default: 100}
command: "python train.py --data {full_dataset} --epochs {epochs}"
参数管理
# Good: Typed parameters with defaults
entry_points:
train:
parameters:
learning_rate: {type: float, default: 0.01}
batch_size: {type: int, default: 32}
data_path: path
output_dir: {type: str, default: "./outputs"}
command: "python train.py --lr {learning_rate} --batch {batch_size} --data {data_path} --output {output_dir}"
可重复性
# Include environment info in tracking
import mlflow
import platform
import sys
with mlflow.start_run():
# Log environment info
mlflow.log_param("python_version", sys.version)
mlflow.log_param("platform", platform.platform())
# Log Git commit if available
try:
import git
repo = git.Repo(".")
mlflow.log_param("git_commit", repo.head.commit.hexsha)
except:
pass
故障排除
常见问题
Docker 权限被拒绝
# Solution: Add user to docker group or use sudo
sudo usermod -aG docker $USER
# Then restart shell/session
Conda 环境创建失败
# Solution: Clean conda cache and retry
conda clean --all
mlflow run . --env-manager conda
私有存储库的 Git 身份验证
# Solution: Use SSH with key authentication
mlflow run git@github.com:private/repo.git
# Or HTTPS with token
mlflow run https://token:x-oauth-basic@github.com/private/repo.git
Kubernetes 作业失败
# Debug: Check job status
kubectl get jobs -n mlflow
kubectl describe job <job-name> -n mlflow
kubectl logs -n mlflow job/<job-name>
调试技巧
启用详细日志记录
export MLFLOW_LOGGING_LEVEL=DEBUG
mlflow run . -v
先在本地测试
# Test with local environment before remote deployment
mlflow run . --env-manager local
# Then test with environment isolation
mlflow run . --env-manager virtualenv
验证项目结构
from mlflow.projects import load_project
# Load and inspect project
project = load_project(".")
print(f"Project name: {project.name}")
print(f"Entry points: {list(project._entry_points.keys())}")
print(f"Environment type: {project.env_type}")
准备好开始了吗? 查看我们的 MLflow 项目示例,获取实践教程和实际用例。