跳到主要内容

Webhooks

警告
  • 此功能仍处于实验阶段,在未来版本中可能会发生更改。
  • 文件后端不支持 webhooks。只有 SQL 后端支持 webhooks。
  • 只有 OSS MLflow 支持 webhooks。Databricks 或其他托管 MLflow 服务可能不支持此功能。

概述

MLflow webhooks 可以在 Model Registry 和 Prompt Registry 中发生特定事件时提供实时通知。当您注册模型或提示、创建新版本或修改标签和别名时,MLflow 可以自动将 HTTP POST 请求发送到您指定的端点。这使得与 CI/CD 管道、通知系统和其他外部服务实现无缝集成。

主要特性

  • Model Registry 和 Prompt Registry 事件的实时通知
  • HMAC 签名验证,用于安全地传递 webhook
  • 多种事件类型,包括模型/提示创建、版本化和标记
  • 内置测试,用于验证 webhook 连接性

支持的事件

MLflow webhooks 支持以下 Model Registry 和 Prompt Registry 事件

事件描述载荷模式
registered_model.created当创建新的已注册模型时触发RegisteredModelCreatedPayload
model_version.created当创建新的模型版本时触发ModelVersionCreatedPayload
model_version_tag.set当在模型版本上设置标签时触发ModelVersionTagSetPayload
model_version_tag.deleted当从模型版本中删除标签时触发ModelVersionTagDeletedPayload
model_version_alias.created当为模型版本创建别名时触发ModelVersionAliasCreatedPayload
model_version_alias.deleted当从模型版本中删除别名时触发ModelVersionAliasDeletedPayload
prompt.created当创建新的提示时触发PromptCreatedPayload
prompt_version.created当创建新的提示版本时触发PromptVersionCreatedPayload
prompt_tag.set当在提示上设置标签时触发PromptTagSetPayload
prompt_tag.deleted当从提示中删除标签时触发PromptTagDeletedPayload
prompt_version_tag.set当在提示版本上设置标签时触发PromptVersionTagSetPayload
prompt_version_tag.deleted当从提示版本中删除标签时触发PromptVersionTagDeletedPayload
prompt_alias.created当为提示版本创建别名时触发PromptAliasCreatedPayload
prompt_alias.deleted当从提示中删除别名时触发PromptAliasDeletedPayload

快速入门

创建 Webhook

from mlflow import MlflowClient

client = MlflowClient()

# Create a webhook for model version creation events
webhook = client.create_webhook(
name="model-version-notifier",
url="https://your-app.com/webhook",
events=["model_version.created"],
description="Notifies when new model versions are created",
secret="your-secret-key", # Optional: for HMAC signature verification
)

print(f"Created webhook: {webhook.webhook_id}")

测试 Webhook

在将 webhook 投入生产之前,请使用 MlflowClient.test_webhook() 使用示例载荷进行测试。

# Test the webhook with an example payload
result = client.test_webhook(webhook.webhook_id)

if result.success:
print(f"Webhook test successful! Status code: {result.response_status}")
else:
print(f"Webhook test failed. Status: {result.response_status}")
if result.error_message:
print(f"Error: {result.error_message}")

您也可以测试特定的事件类型

# Test with a specific event type
result = client.test_webhook(webhook.webhook_id, event="model_version.created")

当您调用 test_webhook() 时,MLflow 会将示例载荷发送到您的 webhook URL。这些测试载荷的结构与真实事件载荷相同。点击上表中载荷模式链接,查看每种事件类型的确切结构和示例。

测试多事件 Webhooks

如果您的 webhook 订阅了多个事件,test_webhook() 的行为取决于您是否指定了事件

  • 不指定事件:MLflow 使用 webhook 事件列表中的第一个事件
  • 指定特定事件:MLflow 使用指定的事件(必须在 webhook 事件列表中)
# Create webhook with multiple events
webhook = client.create_webhook(
name="multi-event-webhook",
url="https://your-domain.com/webhook",
events=[
"registered_model.created",
"model_version.created",
"model_version_tag.set",
],
secret="your-secret-key",
)

# Test with first event (registered_model.created)
result = client.test_webhook(webhook.webhook_id)

# Test with specific event
result = client.test_webhook(
webhook.webhook_id,
event=("model_version_tag.set"),
)

Webhook 管理

列出 Webhooks

使用 MlflowClient.list_webhooks() 来检索 webhooks。此方法返回分页结果。

# List webhooks with pagination
webhooks = client.list_webhooks(max_results=10)
for webhook in webhooks:
print(f"{webhook.name}: {webhook.url} (Status: {webhook.status})")
print(f" Events: {', '.join(webhook.events)}")

# Continue to next page if available
if webhooks.next_page_token:
next_page = client.list_webhooks(
max_results=10, page_token=webhooks.next_page_token
)

检索跨多个页面的所有 webhooks

# Retrieve all webhooks across pages
all_webhooks = []
page_token = None

while True:
page = client.list_webhooks(max_results=100, page_token=page_token)
all_webhooks.extend(page)

if not page.next_page_token:
break
page_token = page.next_page_token

print(f"Total webhooks: {len(all_webhooks)}")

获取特定 Webhook

使用 MlflowClient.get_webhook() 来检索特定 webhook 的详细信息。

# Get a specific webhook by ID
webhook = client.get_webhook(webhook_id)
print(f"Name: {webhook.name}")
print(f"URL: {webhook.url}")
print(f"Status: {webhook.status}")
print(f"Events: {webhook.events}")

更新 Webhook

使用 MlflowClient.update_webhook() 修改 webhook 配置。

# Update webhook configuration
client.update_webhook(
# Unspecified fields will remain unchanged
webhook_id=webhook.webhook_id,
status="DISABLED", # Temporarily disable the webhook
events=[
"model_version.created",
"model_version_tag.set",
],
)

删除 Webhook

使用 MlflowClient.delete_webhook() 删除 webhook。

# Delete a webhook
client.delete_webhook(webhook.webhook_id)

安全

HMAC 签名验证

当您使用密钥创建 webhook 时,MLflow 会使用 HMAC-SHA256 签名对每个请求进行签名。这允许您的端点验证请求是否确实来自 MLflow。签名包含在 X-MLflow-Signature 标头中,格式为:v1,<base64_encoded_signature>。有关签名验证的完整实现,请参阅下面的 FastAPI 示例。

时间戳新鲜度检查

为了防止重放攻击,建议验证 webhook 时间戳是否是最近的。X-MLflow-Timestamp 标头包含一个 Unix 时间戳,表示 webhook 发送的时间。您应该拒绝时间戳过旧的 webhook(例如,超过 5 分钟)。

环境变量

  • MLFLOW_WEBHOOK_SECRET_ENCRYPTION_KEY:用于安全存储 webhook 密钥的加密密钥
  • MLFLOW_WEBHOOK_REQUEST_TIMEOUT:webhook HTTP 请求的超时秒数(默认值:30)
  • MLFLOW_WEBHOOK_REQUEST_MAX_RETRIES:失败的 webhook 请求的最大重试次数(默认值:3)
  • MLFLOW_WEBHOOK_DELIVERY_MAX_WORKERS:Webhook 传递的最大工作线程数(默认值:10)

Webhook 载荷结构

MLflow webhooks 发送结构化的 JSON 载荷,格式如下:

{
"entity": "model_version",
"action": "created",
"timestamp": "2025-07-31T08:27:32.080217+00:00",
"data": {
"name": "example_model",
"version": "1",
"source": "models:/123",
"run_id": "abcd1234abcd5678",
"tags": {"example_key": "example_value"},
"description": "An example model version"
}
}

载荷字段

  • entity:触发 webhook 的 MLflow 实体类型(例如,"registered_model""model_version""prompt""prompt_version"
  • action:执行的操作(例如,"created""updated""deleted""set"
  • timestamp:表示 webhook 发送时间的 ISO 8601 时间戳
  • data:实际的载荷数据,包含实体特定的信息(请参阅上方事件表中的载荷模式链接)

这种结构化的格式使得以下操作变得容易:

  • 按实体类型或操作过滤 webhooks
  • 使用专用处理程序处理不同的事件类型
  • 提取元数据而不解析整个载荷

Webhook 传递可靠性

自动重试逻辑

MLflow 实现了自动重试逻辑,以确保可靠的 webhook 传递。当 webhook 请求失败时,MLflow 将自动重试以下状态码的请求。所有其他状态码均不重试:

状态码类别描述
429速率限制Too Many Requests - 速率限制错误
500服务器错误Internal Server Error - 可能为临时性的服务器错误
502服务器错误Bad Gateway - 网关错误
503服务器错误Service Unavailable - 服务暂时不可用
504服务器错误Gateway Timeout - 网关超时错误

重试行为

当发生可重试错误时,MLflow:

  1. 指数退避:使用带抖动的指数退避来防止惊群效应。

    • 基础延迟:1s、2s、4s、8s 等。
    • 最大退避:限制为 60 秒。
    • 抖动:向每个延迟添加最多 1 秒的随机抖动(需要 urllib3 >= 2.0)。
  2. 遵守速率限制:对于 429 响应,MLflow 会检查 Retry-After 标头,并使用以下两者中的较大值:

    • Retry-After 标头中指定的值
    • 计算出的指数退避时间
  3. 可配置重试:使用 MLFLOW_WEBHOOK_REQUEST_MAX_RETRIES 环境变量设置最大重试次数。

示例:FastAPI Webhook 接收器

这是一个完整的 FastAPI 应用程序示例,用于接收和处理 MLflow webhooks。

from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
import hmac
import hashlib
import base64
import logging
import time

app = FastAPI()
logger = logging.getLogger(__name__)

# Your webhook secret (keep this secure!)
WEBHOOK_SECRET = "your-secret-key"

# Maximum allowed age for webhook timestamps (in seconds)
MAX_TIMESTAMP_AGE = 300 # 5 minutes


def verify_timestamp_freshness(
timestamp_str: str, max_age: int = MAX_TIMESTAMP_AGE
) -> bool:
"""Verify that the webhook timestamp is recent enough to prevent replay attacks"""
try:
webhook_timestamp = int(timestamp_str)
current_timestamp = int(time.time())
age = current_timestamp - webhook_timestamp
return 0 <= age <= max_age
except (ValueError, TypeError):
return False


def verify_mlflow_signature(
payload: str, signature: str, secret: str, delivery_id: str, timestamp: str
) -> bool:
"""Verify the HMAC signature from MLflow webhook"""
# Extract the base64 signature part (remove 'v1,' prefix)
if not signature.startswith("v1,"):
return False

signature_b64 = signature.removeprefix("v1,")
# Reconstruct the signed content: delivery_id.timestamp.payload
signed_content = f"{delivery_id}.{timestamp}.{payload}"
# Generate expected signature
expected_signature = hmac.new(
secret.encode("utf-8"), signed_content.encode("utf-8"), hashlib.sha256
).digest()
expected_signature_b64 = base64.b64encode(expected_signature).decode("utf-8")
return hmac.compare_digest(signature_b64, expected_signature_b64)


@app.post("/webhook")
async def handle_webhook(
request: Request,
x_mlflow_signature: Optional[str] = Header(None),
x_mlflow_delivery_id: Optional[str] = Header(None),
x_mlflow_timestamp: Optional[str] = Header(None),
):
"""Handle webhook with HMAC signature verification"""

# Get raw payload for signature verification
payload_bytes = await request.body()
payload = payload_bytes.decode("utf-8")

# Verify required headers are present
if not x_mlflow_signature:
raise HTTPException(status_code=400, detail="Missing signature header")
if not x_mlflow_delivery_id:
raise HTTPException(status_code=400, detail="Missing delivery ID header")
if not x_mlflow_timestamp:
raise HTTPException(status_code=400, detail="Missing timestamp header")

# Verify timestamp freshness to prevent replay attacks
if not verify_timestamp_freshness(x_mlflow_timestamp):
raise HTTPException(
status_code=400,
detail="Timestamp is too old or invalid (possible replay attack)",
)

# Verify signature
if not verify_mlflow_signature(
payload,
x_mlflow_signature,
WEBHOOK_SECRET,
x_mlflow_delivery_id,
x_mlflow_timestamp,
):
raise HTTPException(status_code=401, detail="Invalid signature")

# Parse payload
webhook_data = await request.json()

# Extract webhook metadata
entity = webhook_data.get("entity")
action = webhook_data.get("action")
timestamp = webhook_data.get("timestamp")
payload_data = webhook_data.get("data", {})

# Print the payload for debugging
print(f"Received webhook: {entity}.{action}")
print(f"Timestamp: {timestamp}")
print(f"Delivery ID: {x_mlflow_delivery_id}")
print(f"Payload: {payload_data}")

# Add your webhook processing logic here
# For example, handle different event types
if entity == "model_version" and action == "created":
model_name = payload_data.get("name")
version = payload_data.get("version")
print(f"New model version: {model_name} v{version}")
# Add your model version processing logic here
elif entity == "registered_model" and action == "created":
model_name = payload_data.get("name")
print(f"New registered model: {model_name}")
# Add your registered model processing logic here
elif entity == "model_version_tag" and action == "set":
model_name = payload_data.get("name")
version = payload_data.get("version")
tag_key = payload_data.get("key")
tag_value = payload_data.get("value")
print(f"Tag set on {model_name} v{version}: {tag_key}={tag_value}")
# Add your tag processing logic here

return {"status": "success"}


@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy"}


if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)

运行示例

  1. 安装依赖项

    pip install fastapi uvicorn
  2. 设置带有 webhook 加密的 MLflow 服务器

    # Generate a secure encryption key for webhook secrets
    export MLFLOW_WEBHOOK_SECRET_ENCRYPTION_KEY=$(python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")

    # Start MLflow server with webhook support
    mlflow server --backend-store-uri sqlite:///mlflow.db
  3. 启动 webhook 接收器

    python webhook_receiver.py
  4. 配置 MLflow webhook

    from mlflow import MlflowClient

    client = MlflowClient("https://:5000")

    # Create webhook with HMAC verification
    webhook = client.create_webhook(
    name="fastapi-receiver",
    url="https://your-domain.com/webhook",
    events=["model_version.created"],
    secret="your-secret-key",
    )
  5. 测试 webhook

    # Test webhook connectivity
    result = client.test_webhook(webhook.webhook_id)
    print(f"Test result: {result.success}")

    # Create a model version to trigger the webhook
    client.create_registered_model("test-model")
    client.create_model_version(
    name="test-model", source="s3://bucket/model", run_id="abc123"
    )

故障排除

常见问题

  1. Webhook 未触发

    • 验证 webhook 状态是否为“ACTIVE”
    • 检查事件类型是否与您的操作匹配
    • 确保您的 MLflow 服务器具有到 webhook URL 的网络访问权限
  2. 签名验证失败

    • 确保您正在使用原始请求体进行验证
    • 检查密钥是否完全匹配(无多余空格)
  3. 连接超时

    • MLflow 对 webhook 请求有 30 秒的默认超时(可通过 MLFLOW_WEBHOOK_REQUEST_TIMEOUT 配置)
    • 确保您的端点响应快速,或根据需要增加超时时间。

API 参考

有关完整的 API 文档,请参阅: