MLflow 模型服务
MLflow 提供全面的模型服务功能,可将您的机器学习模型部署为 REST API,用于实时推理。无论您是使用 MLflow 开源版还是 Databricks 托管式 MLflow,都可以在本地、云环境或通过托管端点提供模型服务。
概述
MLflow 服务将您训练的模型转换为可用于生产的推理服务器,这些服务器可以处理 HTTP 请求并返回预测结果。该服务基础设施支持各种部署模式,从本地开发服务器到可伸缩的云部署。
主要特性
- 🔌 REST API 端点:自动生成用于模型推理的标准化 REST 端点
- 🧬 多种模型格式:通过 MLflow 的 Flavor 系统支持各种机器学习框架
- 🧠 自定义应用程序:构建具有自定义逻辑和预处理功能的复杂服务应用程序
- 📈 可伸缩部署:部署到各种目标,包括本地服务器、云平台和 Kubernetes
- 🗂️ 模型注册表集成:与 MLflow 模型注册表无缝集成,实现版本管理
服务选项
MLflow 开源版服务
MLflow 开源版提供了多种服务选项
- 本地服务:使用
mlflow models serve
进行开发和测试的快速部署 - 自定义 PyFunc 模型:高级服务,具有自定义预处理、后处理和业务逻辑
- Docker 部署:容器化服务,用于在不同环境间实现一致部署
- 云平台集成:部署到 AWS SageMaker、Azure ML 及其他云服务
Databricks 托管式 MLflow
Databricks 提供了额外的托管服务功能
- 模型服务端点:完全托管、自动伸缩并内置监控的端点
- 基础模型 API:通过按令牌付费的端点直接访问基础模型
- 高级安全性:企业级安全性,具有访问控制和审计日志功能
- 实时监控:内置指标、日志记录和性能监控
快速入门
基本模型服务
用于简单的模型服务设置
# Serve a logged model
mlflow models serve -m "models:/<model-id>" -p 5000
# Serve a registered model
mlflow models serve -m "models:/<model-name>/<model-version>" -p 5000
# Serve a model from local path
mlflow models serve -m ./path/to/model -p 5000
进行预测
模型部署后,您可以通过 HTTP 请求进行预测
curl -X POST https://:5000/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [[1, 2, 3, 4]]}'
架构
MLflow 服务使用标准化的架构
- 🧠 模型加载:模型使用各自的 MLflow Flavor 加载
- 🌐 HTTP 服务器:基于 FastAPI 的服务器处理传入请求
- 🔄 预测管道:请求通过模型的预测方法进行处理
- 📦 响应格式化:结果以标准化的 JSON 格式返回
最佳实践
性能优化
- 根据模型要求使用适当的硬件资源
- 实现请求批处理以提高吞吐量
- 考虑模型量化以实现更快的推理
- 监控内存使用情况并进行相应优化
安全注意事项
- 实施适当的身份验证和授权
- 在生产环境中使用 HTTPS
- 验证输入数据以防止安全漏洞
- 定期更新依赖项并监控安全问题
监控和可观测性
- 设置全面的日志记录以进行调试和审计
- 监控关键指标,如延迟、吞吐量和错误率
- 实施健康检查以确保服务可靠性
- 对复杂的服务管道使用分布式跟踪
常见用例
- 实时推理
- 批处理
- A/B 测试
- 多模型服务
在 Web 应用程序、移动应用程序或微服务架构中提供模型服务以进行实时预测。
import requests
import json
# Single prediction
data = {
"dataframe_split": {
"columns": ["feature1", "feature2", "feature3"],
"data": [[1.0, 2.0, 3.0]],
}
}
response = requests.post(
"https://:5000/invocations",
headers={"Content-Type": "application/json"},
data=json.dumps(data),
)
print(response.json())
使用服务端点对大型数据集进行批处理推理,并控制资源使用。
import mlflow
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct
# Parameters
model_name = "YOUR_MODEL_NAME"
model_version = "YOUR_MODEL_VERSION"
input_table = "YOUR_INPUT_TABLE_NAME"
output_table = "YOUR_OUTPUT_TABLE_NAME"
# Load data
df = spark.table(input_table)
# Apply model using Spark UDF
model_uri = f"models:/{model_name}/{model_version}"
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)
# Make predictions
predictions_df = df.withColumn(
"prediction", predict_udf(struct([col(c) for c in df.columns]))
)
# Save results
predictions_df.write.mode("overwrite").saveAsTable(output_table)
有关已部署服务端点上 AI 函数的内置批处理推理支持,请参阅 Databricks 批处理推理文档。
同时部署多个模型版本以比较性能并逐步推出改进。
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import requests
import random
import json
import logging
import uvicorn
app = FastAPI()
# Model endpoints
MODEL_A_URL = "https://:5000/invocations" # Current model
MODEL_B_URL = "https://:5001/invocations" # New model
# Traffic split configuration
TRAFFIC_SPLIT = {
"model_a": 0.8, # 80% to current model
"model_b": 0.2, # 20% to new model
}
@app.post("/predict")
async def predict(request: Request):
# Route traffic based on split
rand = random.random()
if rand < TRAFFIC_SPLIT["model_a"]:
endpoint = MODEL_A_URL
model_version = "A"
else:
endpoint = MODEL_B_URL
model_version = "B"
# Forward request
try:
req_json = await request.json()
response = requests.post(
endpoint,
headers={"Content-Type": "application/json"},
data=json.dumps(req_json),
timeout=30,
)
result = response.json()
# Log for analysis
logging.info(f"Model: {model_version}, Request: {req_json}, Response: {result}")
return JSONResponse(
content={"prediction": result, "model_version": model_version}
)
except Exception as e:
logging.error(f"Error with model {model_version}: {e}")
return JSONResponse(content={"error": str(e)}, status_code=500)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
从单个端点提供多个模型服务,用于集成预测或基于输入特征的模型路由。
import mlflow
import pandas as pd
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
app = FastAPI()
# Load specialized models
models = {
"fraud_detection": mlflow.pyfunc.load_model("models:/<fraud-model-id>"),
"recommendation": mlflow.pyfunc.load_model("models:/<recommendation-model-id>"),
"classification": mlflow.pyfunc.load_model("models:/<classification-model-id>"),
}
def route_request(input_data):
"""Route request to appropriate model based on input characteristics"""
# Example routing logic
if "transaction_amount" in input_data.columns:
return "fraud_detection"
elif "user_id" in input_data.columns and "item_id" in input_data.columns:
return "recommendation"
else:
return "classification"
@app.post("/predict")
async def smart_predict(request: Request):
data = await request.json()
input_df = pd.DataFrame(data["data"], columns=data["columns"])
# Route to appropriate model
model_name = route_request(input_df)
model = models[model_name]
# Make prediction
prediction = model.predict(input_df)
return JSONResponse(
content={"model_used": model_name, "prediction": prediction.tolist()}
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5000)
后续步骤
有关 MLflow 服务功能的更多详细信息,请参阅官方 MLflow 文档并尝试每个部分中提供的示例。