Webhooks
- 此功能仍处于实验阶段,在未来版本中可能会发生更改。
- 文件后端不支持 webhooks。只有 SQL 后端支持 webhooks。
- 只有 OSS MLflow 支持 webhooks。Databricks 或其他托管 MLflow 服务可能不支持此功能。
概述
MLflow webhooks 可以在 Model Registry 和 Prompt Registry 中发生特定事件时提供实时通知。当您注册模型或提示、创建新版本或修改标签和别名时,MLflow 可以自动将 HTTP POST 请求发送到您指定的端点。这使得与 CI/CD 管道、通知系统和其他外部服务实现无缝集成。
主要特性
- Model Registry 和 Prompt Registry 事件的实时通知
- HMAC 签名验证,用于安全地传递 webhook
- 多种事件类型,包括模型/提示创建、版本化和标记
- 内置测试,用于验证 webhook 连接性
支持的事件
MLflow webhooks 支持以下 Model Registry 和 Prompt Registry 事件
事件 | 描述 | 载荷模式 |
---|---|---|
registered_model.created | 当创建新的已注册模型时触发 | RegisteredModelCreatedPayload |
model_version.created | 当创建新的模型版本时触发 | ModelVersionCreatedPayload |
model_version_tag.set | 当在模型版本上设置标签时触发 | ModelVersionTagSetPayload |
model_version_tag.deleted | 当从模型版本中删除标签时触发 | ModelVersionTagDeletedPayload |
model_version_alias.created | 当为模型版本创建别名时触发 | ModelVersionAliasCreatedPayload |
model_version_alias.deleted | 当从模型版本中删除别名时触发 | ModelVersionAliasDeletedPayload |
prompt.created | 当创建新的提示时触发 | PromptCreatedPayload |
prompt_version.created | 当创建新的提示版本时触发 | PromptVersionCreatedPayload |
prompt_tag.set | 当在提示上设置标签时触发 | PromptTagSetPayload |
prompt_tag.deleted | 当从提示中删除标签时触发 | PromptTagDeletedPayload |
prompt_version_tag.set | 当在提示版本上设置标签时触发 | PromptVersionTagSetPayload |
prompt_version_tag.deleted | 当从提示版本中删除标签时触发 | PromptVersionTagDeletedPayload |
prompt_alias.created | 当为提示版本创建别名时触发 | PromptAliasCreatedPayload |
prompt_alias.deleted | 当从提示中删除别名时触发 | PromptAliasDeletedPayload |
快速入门
创建 Webhook
from mlflow import MlflowClient
client = MlflowClient()
# Create a webhook for model version creation events
webhook = client.create_webhook(
name="model-version-notifier",
url="https://your-app.com/webhook",
events=["model_version.created"],
description="Notifies when new model versions are created",
secret="your-secret-key", # Optional: for HMAC signature verification
)
print(f"Created webhook: {webhook.webhook_id}")
测试 Webhook
在将 webhook 投入生产之前,请使用 MlflowClient.test_webhook()
使用示例载荷进行测试。
# Test the webhook with an example payload
result = client.test_webhook(webhook.webhook_id)
if result.success:
print(f"Webhook test successful! Status code: {result.response_status}")
else:
print(f"Webhook test failed. Status: {result.response_status}")
if result.error_message:
print(f"Error: {result.error_message}")
您也可以测试特定的事件类型
# Test with a specific event type
result = client.test_webhook(webhook.webhook_id, event="model_version.created")
当您调用 test_webhook()
时,MLflow 会将示例载荷发送到您的 webhook URL。这些测试载荷的结构与真实事件载荷相同。点击上表中载荷模式链接,查看每种事件类型的确切结构和示例。
测试多事件 Webhooks
如果您的 webhook 订阅了多个事件,test_webhook()
的行为取决于您是否指定了事件
- 不指定事件:MLflow 使用 webhook 事件列表中的第一个事件
- 指定特定事件:MLflow 使用指定的事件(必须在 webhook 事件列表中)
# Create webhook with multiple events
webhook = client.create_webhook(
name="multi-event-webhook",
url="https://your-domain.com/webhook",
events=[
"registered_model.created",
"model_version.created",
"model_version_tag.set",
],
secret="your-secret-key",
)
# Test with first event (registered_model.created)
result = client.test_webhook(webhook.webhook_id)
# Test with specific event
result = client.test_webhook(
webhook.webhook_id,
event=("model_version_tag.set"),
)
Webhook 管理
列出 Webhooks
使用 MlflowClient.list_webhooks()
来检索 webhooks。此方法返回分页结果。
# List webhooks with pagination
webhooks = client.list_webhooks(max_results=10)
for webhook in webhooks:
print(f"{webhook.name}: {webhook.url} (Status: {webhook.status})")
print(f" Events: {', '.join(webhook.events)}")
# Continue to next page if available
if webhooks.next_page_token:
next_page = client.list_webhooks(
max_results=10, page_token=webhooks.next_page_token
)
检索跨多个页面的所有 webhooks
# Retrieve all webhooks across pages
all_webhooks = []
page_token = None
while True:
page = client.list_webhooks(max_results=100, page_token=page_token)
all_webhooks.extend(page)
if not page.next_page_token:
break
page_token = page.next_page_token
print(f"Total webhooks: {len(all_webhooks)}")
获取特定 Webhook
使用 MlflowClient.get_webhook()
来检索特定 webhook 的详细信息。
# Get a specific webhook by ID
webhook = client.get_webhook(webhook_id)
print(f"Name: {webhook.name}")
print(f"URL: {webhook.url}")
print(f"Status: {webhook.status}")
print(f"Events: {webhook.events}")
更新 Webhook
使用 MlflowClient.update_webhook()
修改 webhook 配置。
# Update webhook configuration
client.update_webhook(
# Unspecified fields will remain unchanged
webhook_id=webhook.webhook_id,
status="DISABLED", # Temporarily disable the webhook
events=[
"model_version.created",
"model_version_tag.set",
],
)
删除 Webhook
使用 MlflowClient.delete_webhook()
删除 webhook。
# Delete a webhook
client.delete_webhook(webhook.webhook_id)
安全
HMAC 签名验证
当您使用密钥创建 webhook 时,MLflow 会使用 HMAC-SHA256 签名对每个请求进行签名。这允许您的端点验证请求是否确实来自 MLflow。签名包含在 X-MLflow-Signature
标头中,格式为:v1,<base64_encoded_signature>
。有关签名验证的完整实现,请参阅下面的 FastAPI 示例。
时间戳新鲜度检查
为了防止重放攻击,建议验证 webhook 时间戳是否是最近的。X-MLflow-Timestamp
标头包含一个 Unix 时间戳,表示 webhook 发送的时间。您应该拒绝时间戳过旧的 webhook(例如,超过 5 分钟)。
环境变量
MLFLOW_WEBHOOK_SECRET_ENCRYPTION_KEY
:用于安全存储 webhook 密钥的加密密钥MLFLOW_WEBHOOK_REQUEST_TIMEOUT
:webhook HTTP 请求的超时秒数(默认值:30)MLFLOW_WEBHOOK_REQUEST_MAX_RETRIES
:失败的 webhook 请求的最大重试次数(默认值:3)MLFLOW_WEBHOOK_DELIVERY_MAX_WORKERS
:Webhook 传递的最大工作线程数(默认值:10)
Webhook 载荷结构
MLflow webhooks 发送结构化的 JSON 载荷,格式如下:
{
"entity": "model_version",
"action": "created",
"timestamp": "2025-07-31T08:27:32.080217+00:00",
"data": {
"name": "example_model",
"version": "1",
"source": "models:/123",
"run_id": "abcd1234abcd5678",
"tags": {"example_key": "example_value"},
"description": "An example model version"
}
}
载荷字段
entity
:触发 webhook 的 MLflow 实体类型(例如,"registered_model"
、"model_version"
、"prompt"
、"prompt_version"
)action
:执行的操作(例如,"created"
、"updated"
、"deleted"
、"set"
)timestamp
:表示 webhook 发送时间的 ISO 8601 时间戳data
:实际的载荷数据,包含实体特定的信息(请参阅上方事件表中的载荷模式链接)
这种结构化的格式使得以下操作变得容易:
- 按实体类型或操作过滤 webhooks
- 使用专用处理程序处理不同的事件类型
- 提取元数据而不解析整个载荷
Webhook 传递可靠性
自动重试逻辑
MLflow 实现了自动重试逻辑,以确保可靠的 webhook 传递。当 webhook 请求失败时,MLflow 将自动重试以下状态码的请求。所有其他状态码均不重试:
状态码 | 类别 | 描述 |
---|---|---|
429 | 速率限制 | Too Many Requests - 速率限制错误 |
500 | 服务器错误 | Internal Server Error - 可能为临时性的服务器错误 |
502 | 服务器错误 | Bad Gateway - 网关错误 |
503 | 服务器错误 | Service Unavailable - 服务暂时不可用 |
504 | 服务器错误 | Gateway Timeout - 网关超时错误 |
重试行为
当发生可重试错误时,MLflow:
-
指数退避:使用带抖动的指数退避来防止惊群效应。
- 基础延迟:1s、2s、4s、8s 等。
- 最大退避:限制为 60 秒。
- 抖动:向每个延迟添加最多 1 秒的随机抖动(需要
urllib3
>= 2.0)。
-
遵守速率限制:对于 429 响应,MLflow 会检查
Retry-After
标头,并使用以下两者中的较大值:Retry-After
标头中指定的值- 计算出的指数退避时间
-
可配置重试:使用
MLFLOW_WEBHOOK_REQUEST_MAX_RETRIES
环境变量设置最大重试次数。
示例:FastAPI Webhook 接收器
这是一个完整的 FastAPI 应用程序示例,用于接收和处理 MLflow webhooks。
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
import hmac
import hashlib
import base64
import logging
import time
app = FastAPI()
logger = logging.getLogger(__name__)
# Your webhook secret (keep this secure!)
WEBHOOK_SECRET = "your-secret-key"
# Maximum allowed age for webhook timestamps (in seconds)
MAX_TIMESTAMP_AGE = 300 # 5 minutes
def verify_timestamp_freshness(
timestamp_str: str, max_age: int = MAX_TIMESTAMP_AGE
) -> bool:
"""Verify that the webhook timestamp is recent enough to prevent replay attacks"""
try:
webhook_timestamp = int(timestamp_str)
current_timestamp = int(time.time())
age = current_timestamp - webhook_timestamp
return 0 <= age <= max_age
except (ValueError, TypeError):
return False
def verify_mlflow_signature(
payload: str, signature: str, secret: str, delivery_id: str, timestamp: str
) -> bool:
"""Verify the HMAC signature from MLflow webhook"""
# Extract the base64 signature part (remove 'v1,' prefix)
if not signature.startswith("v1,"):
return False
signature_b64 = signature.removeprefix("v1,")
# Reconstruct the signed content: delivery_id.timestamp.payload
signed_content = f"{delivery_id}.{timestamp}.{payload}"
# Generate expected signature
expected_signature = hmac.new(
secret.encode("utf-8"), signed_content.encode("utf-8"), hashlib.sha256
).digest()
expected_signature_b64 = base64.b64encode(expected_signature).decode("utf-8")
return hmac.compare_digest(signature_b64, expected_signature_b64)
@app.post("/webhook")
async def handle_webhook(
request: Request,
x_mlflow_signature: Optional[str] = Header(None),
x_mlflow_delivery_id: Optional[str] = Header(None),
x_mlflow_timestamp: Optional[str] = Header(None),
):
"""Handle webhook with HMAC signature verification"""
# Get raw payload for signature verification
payload_bytes = await request.body()
payload = payload_bytes.decode("utf-8")
# Verify required headers are present
if not x_mlflow_signature:
raise HTTPException(status_code=400, detail="Missing signature header")
if not x_mlflow_delivery_id:
raise HTTPException(status_code=400, detail="Missing delivery ID header")
if not x_mlflow_timestamp:
raise HTTPException(status_code=400, detail="Missing timestamp header")
# Verify timestamp freshness to prevent replay attacks
if not verify_timestamp_freshness(x_mlflow_timestamp):
raise HTTPException(
status_code=400,
detail="Timestamp is too old or invalid (possible replay attack)",
)
# Verify signature
if not verify_mlflow_signature(
payload,
x_mlflow_signature,
WEBHOOK_SECRET,
x_mlflow_delivery_id,
x_mlflow_timestamp,
):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse payload
webhook_data = await request.json()
# Extract webhook metadata
entity = webhook_data.get("entity")
action = webhook_data.get("action")
timestamp = webhook_data.get("timestamp")
payload_data = webhook_data.get("data", {})
# Print the payload for debugging
print(f"Received webhook: {entity}.{action}")
print(f"Timestamp: {timestamp}")
print(f"Delivery ID: {x_mlflow_delivery_id}")
print(f"Payload: {payload_data}")
# Add your webhook processing logic here
# For example, handle different event types
if entity == "model_version" and action == "created":
model_name = payload_data.get("name")
version = payload_data.get("version")
print(f"New model version: {model_name} v{version}")
# Add your model version processing logic here
elif entity == "registered_model" and action == "created":
model_name = payload_data.get("name")
print(f"New registered model: {model_name}")
# Add your registered model processing logic here
elif entity == "model_version_tag" and action == "set":
model_name = payload_data.get("name")
version = payload_data.get("version")
tag_key = payload_data.get("key")
tag_value = payload_data.get("value")
print(f"Tag set on {model_name} v{version}: {tag_key}={tag_value}")
# Add your tag processing logic here
return {"status": "success"}
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
运行示例
-
安装依赖项
pip install fastapi uvicorn
-
设置带有 webhook 加密的 MLflow 服务器
# Generate a secure encryption key for webhook secrets
export MLFLOW_WEBHOOK_SECRET_ENCRYPTION_KEY=$(python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
# Start MLflow server with webhook support
mlflow server --backend-store-uri sqlite:///mlflow.db -
启动 webhook 接收器
python webhook_receiver.py
-
配置 MLflow webhook
from mlflow import MlflowClient
client = MlflowClient("https://:5000")
# Create webhook with HMAC verification
webhook = client.create_webhook(
name="fastapi-receiver",
url="https://your-domain.com/webhook",
events=["model_version.created"],
secret="your-secret-key",
) -
测试 webhook
# Test webhook connectivity
result = client.test_webhook(webhook.webhook_id)
print(f"Test result: {result.success}")
# Create a model version to trigger the webhook
client.create_registered_model("test-model")
client.create_model_version(
name="test-model", source="s3://bucket/model", run_id="abc123"
)
故障排除
常见问题
-
Webhook 未触发
- 验证 webhook 状态是否为“ACTIVE”
- 检查事件类型是否与您的操作匹配
- 确保您的 MLflow 服务器具有到 webhook URL 的网络访问权限
-
签名验证失败
- 确保您正在使用原始请求体进行验证
- 检查密钥是否完全匹配(无多余空格)
-
连接超时
- MLflow 对 webhook 请求有 30 秒的默认超时(可通过
MLFLOW_WEBHOOK_REQUEST_TIMEOUT
配置) - 确保您的端点响应快速,或根据需要增加超时时间。
- MLflow 对 webhook 请求有 30 秒的默认超时(可通过
API 参考
有关完整的 API 文档,请参阅: