跳到主要内容

MLflow 项目

MLflow 项目提供了一种标准格式,用于打包和共享可复现的数据科学代码。基于简单的约定,项目可以实现跨不同环境和平台的无缝协作和自动执行。

快速入门

运行您的第一个项目

将任何 Git 仓库或本地目录作为 MLflow 项目执行

# Run a project from GitHub
mlflow run https://github.com/mlflow/mlflow-example.git -P alpha=0.5

# Run a local project
mlflow run . -P data_file=data.csv -P regularization=0.1

# Run with specific entry point
mlflow run . -e validate -P data_file=data.csv
# Run projects programmatically
import mlflow

# Execute remote project
result = mlflow.run(
"https://github.com/mlflow/mlflow-example.git",
parameters={"alpha": 0.5, "l1_ratio": 0.01},
experiment_name="elasticnet_experiment",
)

# Execute local project
result = mlflow.run(
".", entry_point="train", parameters={"epochs": 100}, synchronous=True
)
项目结构

任何包含 MLproject 文件或包含 .py/.sh 文件的目录都可以作为 MLflow 项目运行。 无需复杂的设置!

核心概念

项目组件

每个 MLflow 项目都包含三个关键要素

项目名称

项目的可读标识符,通常在 MLproject 文件中定义。

入口点

可以在项目中执行的命令。入口点定义

  • 参数 - 具有类型和默认值的输入
  • 命令 - 入口点运行时执行的内容
  • 环境 - 执行上下文和依赖项

环境

包含运行项目所需的所有依赖项的软件环境。 MLflow 支持多种环境类型

环境用例依赖项
Virtualenv (推荐)来自 PyPI 的 Python 包python_env.yaml
CondaPython + 本地库conda.yaml
Docker复杂的依赖项,非 PythonDockerfile
系统使用当前环境

项目结构 & 配置

基于约定的项目

没有 MLproject 文件的项目使用这些约定

my-project/
├── train.py # Executable entry point
├── validate.sh # Shell script entry point
├── conda.yaml # Optional: Conda environment
├── python_env.yaml # Optional: Python environment
└── data/ # Project data and assets

默认行为

  • 名称:目录名
  • 入口点:任何 .py.sh 文件
  • 环境:来自 conda.yaml 的 Conda 环境,或仅 Python 环境
  • 参数:通过命令行以 --key value 传递

MLproject 文件配置

要进行高级控制,请创建 MLproject 文件

name: My ML Project

# Environment specification (choose one)
python_env: python_env.yaml
# conda_env: conda.yaml
# docker_env:
# image: python:3.9

entry_points:
main:
parameters:
data_file: path
regularization: {type: float, default: 0.1}
max_epochs: {type: int, default: 100}
command: "python train.py --reg {regularization} --epochs {max_epochs} {data_file}"

validate:
parameters:
model_path: path
test_data: path
command: "python validate.py {model_path} {test_data}"

hyperparameter_search:
parameters:
search_space: uri
n_trials: {type: int, default: 50}
command: "python hyperparam_search.py --trials {n_trials} --config {search_space}"

参数类型

MLflow 支持四种参数类型,具有自动验证和转换

类型描述示例特殊处理
字符串文本数据"hello world"
浮点数十进制数0.1, 3.14验证
int整数42, 100验证
path本地文件路径data.csv, s3://bucket/file将远程 URI 下载到本地文件
uri任何 URIs3://bucket/, ./local/path将相对路径转换为绝对路径
参数解析

path 参数在执行前自动将远程文件(S3、GCS 等)下载到本地存储。 对于可以直接从远程存储读取的应用程序,请使用 uri

环境管理

为纯 Python 依赖项创建一个 python_env.yaml 文件

# python_env.yaml
python: "3.9.16"

# Optional: build dependencies
build_dependencies:
- pip
- setuptools
- wheel==0.37.1

# Runtime dependencies
dependencies:
- mlflow>=2.0.0
- scikit-learn==1.2.0
- pandas>=1.5.0
- numpy>=1.21.0
# MLproject
name: Python Project
python_env: python_env.yaml

entry_points:
main:
command: "python train.py"

Conda 环境

对于需要本地库或复杂依赖项的项目

# conda.yaml
name: ml-project
channels:
- conda-forge
- defaults
dependencies:
- python=3.9
- cudnn=8.2.1 # CUDA libraries
- scikit-learn
- pip
- pip:
- mlflow>=2.0.0
- tensorflow==2.10.0
# MLproject
name: Deep Learning Project
conda_env: conda.yaml

entry_points:
train:
parameters:
gpu_count: {type: int, default: 1}
command: "python train_model.py --gpus {gpu_count}"
Conda 条款

通过使用 Conda,您同意 Anaconda 的服务条款

Docker 环境

为了获得最大的可重复性和复杂的系统依赖项

# Dockerfile
FROM python:3.9-slim

RUN apt-get update && apt-get install -y \
build-essential \
git \
&& rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install -r requirements.txt

WORKDIR /mlflow/projects/code
# MLproject
name: Containerized Project
docker_env:
image: my-ml-image:latest
volumes: ["/host/data:/container/data"]
environment:
- ["CUDA_VISIBLE_DEVICES", "0,1"]
- "AWS_PROFILE" # Copy from host

entry_points:
train:
command: "python distributed_training.py"

高级 Docker 选项

docker_env:
image: 012345678910.dkr.ecr.us-west-2.amazonaws.com/ml-training:v1.0
volumes:
- "/local/data:/data"
- "/tmp:/tmp"
environment:
- ["MODEL_REGISTRY", "s3://my-bucket/models"]
- ["EXPERIMENT_NAME", "production-training"]
- "MLFLOW_TRACKING_URI" # Copy from host

环境管理器选择

控制使用哪个环境管理器

# Force virtualenv (ignores conda.yaml)
mlflow run . --env-manager virtualenv

# Use local environment (no isolation)
mlflow run . --env-manager local

# Use conda (default if conda.yaml present)
mlflow run . --env-manager conda

执行 & 部署

本地执行

# Basic execution
mlflow run .

# With parameters
mlflow run . -P lr=0.01 -P batch_size=32

# Specific entry point
mlflow run . -e hyperparameter_search -P n_trials=100

# Custom environment
mlflow run . --env-manager virtualenv

远程执行

Databricks 平台

# Run on Databricks cluster
mlflow run . --backend databricks --backend-config cluster-config.json
// cluster-config.json
{
"cluster_spec": {
"new_cluster": {
"node_type_id": "i3.xlarge",
"num_workers": 2,
"spark_version": "11.3.x-scala2.12"
}
},
"run_name": "distributed-training"
}

Kubernetes 集群

# Run on Kubernetes
mlflow run . --backend kubernetes --backend-config k8s-config.json
// k8s-config.json
{
"kube-context": "my-cluster",
"repository-uri": "gcr.io/my-project/ml-training",
"kube-job-template-path": "k8s-job-template.yaml"
}
# k8s-job-template.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: "{replaced-with-project-name}"
namespace: mlflow
spec:
ttlSecondsAfterFinished: 3600
backoffLimit: 2
template:
spec:
containers:
- name: "{replaced-with-project-name}"
image: "{replaced-with-image-uri}"
command: ["{replaced-with-entry-point-command}"]
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
env:
- name: MLFLOW_TRACKING_URI
value: "https://my-mlflow-server.com"
restartPolicy: Never

Python API

import mlflow
from mlflow.projects import run

# Synchronous execution
result = run(
uri="https://github.com/mlflow/mlflow-example.git",
entry_point="main",
parameters={"alpha": 0.5},
backend="local",
synchronous=True,
)

# Asynchronous execution
submitted_run = run(
uri=".",
entry_point="train",
parameters={"epochs": 100},
backend="databricks",
backend_config="cluster-config.json",
synchronous=False,
)

# Monitor progress
if submitted_run.wait():
print("Training completed successfully!")
run_data = mlflow.get_run(submitted_run.run_id)
print(f"Final accuracy: {run_data.data.metrics['accuracy']}")

构建工作流

多步管道

将多个项目组合成复杂 ML 工作流

import mlflow
from mlflow.tracking import MlflowClient


def ml_pipeline():
client = MlflowClient()

# Step 1: Data preprocessing
prep_run = mlflow.run(
"./preprocessing", parameters={"input_path": "s3://bucket/raw-data"}
)

# Wait for completion and get output
if prep_run.wait():
prep_run_data = client.get_run(prep_run.run_id)
processed_data_path = prep_run_data.data.params["output_path"]

# Step 2: Feature engineering
feature_run = mlflow.run(
"./feature_engineering", parameters={"data_path": processed_data_path}
)

if feature_run.wait():
feature_data = client.get_run(feature_run.run_id)
features_path = feature_data.data.params["features_output"]

# Step 3: Parallel model training
model_runs = []
algorithms = ["random_forest", "xgboost", "neural_network"]

for algo in algorithms:
run = mlflow.run(
"./training",
entry_point=algo,
parameters={"features_path": features_path, "algorithm": algo},
synchronous=False, # Run in parallel
)
model_runs.append(run)

# Wait for all models and select best
best_model = None
best_metric = 0

for run in model_runs:
if run.wait():
run_data = client.get_run(run.run_id)
accuracy = run_data.data.metrics.get("accuracy", 0)
if accuracy > best_metric:
best_metric = accuracy
best_model = run.run_id

# Step 4: Deploy best model
if best_model:
mlflow.run(
"./deployment",
parameters={"model_run_id": best_model, "stage": "production"},
)


# Execute pipeline
ml_pipeline()

超参数优化

import mlflow
import itertools
from concurrent.futures import ThreadPoolExecutor


def hyperparameter_search():
# Define parameter grid
param_grid = {
"learning_rate": [0.01, 0.1, 0.2],
"n_estimators": [100, 200, 500],
"max_depth": [3, 6, 10],
}

# Generate all combinations
param_combinations = [
dict(zip(param_grid.keys(), values))
for values in itertools.product(*param_grid.values())
]

def train_model(params):
return mlflow.run("./training", parameters=params, synchronous=False)

# Launch parallel training jobs
with ThreadPoolExecutor(max_workers=5) as executor:
submitted_runs = list(executor.map(train_model, param_combinations))

# Collect results
results = []
for run in submitted_runs:
if run.wait():
run_data = mlflow.get_run(run.run_id)
results.append(
{
"run_id": run.run_id,
"params": run_data.data.params,
"metrics": run_data.data.metrics,
}
)

# Find best model
best_run = max(results, key=lambda x: x["metrics"].get("f1_score", 0))
print(f"Best model: {best_run['run_id']}")
print(f"Best F1 score: {best_run['metrics']['f1_score']}")

return best_run


# Execute hyperparameter search
best_model = hyperparameter_search()

高级功能

Docker 镜像构建

在执行期间构建自定义镜像

# Build new image based on project's base image
mlflow run . --backend kubernetes --build-image

# Use pre-built image
mlflow run . --backend kubernetes
# Programmatic image building
mlflow.run(
".",
backend="kubernetes",
backend_config="k8s-config.json",
build_image=True, # Creates new image with project code
docker_auth={ # Registry authentication
"username": "myuser",
"password": "mytoken",
},
)

Git 集成

MLflow 自动跟踪 Git 信息

# Run specific commit
mlflow run https://github.com/mlflow/mlflow-example.git --version <commit hash>

# Run branch
mlflow run https://github.com/mlflow/mlflow-example.git --version feature-branch

# Run from subdirectory
mlflow run https://github.com/my-repo.git#subdirectory/my-project

环境变量传播

关键环境变量会自动传递到执行环境

export MLFLOW_TRACKING_URI="https://my-tracking-server.com"
export AWS_PROFILE="ml-experiments"
export CUDA_VISIBLE_DEVICES="0,1"

# These variables are available in the project execution environment
mlflow run .

自定义后端开发

创建自定义执行后端

# custom_backend.py
from mlflow.projects.backend import AbstractBackend


class MyCustomBackend(AbstractBackend):
def run(
self,
project_uri,
entry_point,
parameters,
version,
backend_config,
tracking_uri,
experiment_id,
):
# Custom execution logic
# Return SubmittedRun object
pass

注册为插件

# setup.py
setup(
entry_points={
"mlflow.project_backend": [
"my-backend=my_package.custom_backend:MyCustomBackend"
]
}
)

最佳实践

项目组织

ml-project/
├── MLproject # Project configuration
├── python_env.yaml # Environment dependencies
├── src/ # Source code
│ ├── train.py
│ ├── evaluate.py
│ └── utils/
├── data/ # Sample/test data
├── configs/ # Configuration files
│ ├── model_config.yaml
│ └── hyperparams.json
├── tests/ # Unit tests
└── README.md # Project documentation

环境管理

开发提示

  • 对于纯 Python 项目,使用 virtualenv
  • 当您需要系统库(CUDA、Intel MKL)时,请使用 conda
  • 对于复杂的依赖项或生产部署,请使用 Docker
  • 在生产环境中锁定确切的版本

性能优化

# Fast iteration during development
python_env: python_env.yaml

entry_points:
develop:
command: "python train.py"

production:
parameters:
full_dataset: {type: path}
epochs: {type: int, default: 100}
command: "python train.py --data {full_dataset} --epochs {epochs}"

参数管理

# Good: Typed parameters with defaults
entry_points:
train:
parameters:
learning_rate: {type: float, default: 0.01}
batch_size: {type: int, default: 32}
data_path: path
output_dir: {type: str, default: "./outputs"}
command: "python train.py --lr {learning_rate} --batch {batch_size} --data {data_path} --output {output_dir}"

可重复性

# Include environment info in tracking
import mlflow
import platform
import sys

with mlflow.start_run():
# Log environment info
mlflow.log_param("python_version", sys.version)
mlflow.log_param("platform", platform.platform())

# Log Git commit if available
try:
import git

repo = git.Repo(".")
mlflow.log_param("git_commit", repo.head.commit.hexsha)
except:
pass

故障排除

常见问题

Docker 权限被拒绝

# Solution: Add user to docker group or use sudo
sudo usermod -aG docker $USER
# Then restart shell/session

Conda 环境创建失败

# Solution: Clean conda cache and retry
conda clean --all
mlflow run . --env-manager conda

私有存储库的 Git 身份验证

# Solution: Use SSH with key authentication
mlflow run git@github.com:private/repo.git
# Or HTTPS with token
mlflow run https://token:x-oauth-basic@github.com/private/repo.git

Kubernetes 作业失败

# Debug: Check job status
kubectl get jobs -n mlflow
kubectl describe job <job-name> -n mlflow
kubectl logs -n mlflow job/<job-name>

调试技巧

启用详细日志记录

export MLFLOW_LOGGING_LEVEL=DEBUG
mlflow run . -v

先在本地测试

# Test with local environment before remote deployment
mlflow run . --env-manager local

# Then test with environment isolation
mlflow run . --env-manager virtualenv

验证项目结构

from mlflow.projects import load_project

# Load and inspect project
project = load_project(".")
print(f"Project name: {project.name}")
print(f"Entry points: {list(project._entry_points.keys())}")
print(f"Environment type: {project.env_type}")

准备好开始了吗? 查看我们的 MLflow 项目示例,获取实践教程和实际用例。