跟踪版本与环境
在您的生成式AI应用程序中跟踪环境、应用程序版本和自定义上下文信息,可以实现跨不同部署阶段、版本和业务特定维度的全面可观测性。MLflow 提供灵活的机制,允许您使用标签将丰富的元数据附加到跟踪记录中。
为何跟踪环境与上下文?
将这些元数据附加到跟踪记录中,可以为以下方面提供关键洞察:
环境特定分析:比较 development
(开发)、staging
(预演) 和 production
(生产) 环境中的行为
版本管理:跟踪不同应用程序版本(例如,v1.0.1
, v1.2.0
)的性能和回归情况
自定义分类:添加业务特定上下文(例如,customer_tier: "premium"
, feature_flag: "new_algorithm"
)
部署验证:确保在不同部署目标之间行为一致
根本原因分析:快速将问题范围缩小到特定环境、版本或配置
上下文的标准与自定义标签
MLflow 使用标签(键值字符串对)来存储跟踪记录中的上下文信息。
自动填充的标签
MLflow 会根据您的执行环境自动捕获这些标准标签
mlflow.source.name
:生成跟踪记录的入口点或脚本(Python 脚本会自动填充文件名,Jupyter Notebook 会自动填充 Notebook 名称)mlflow.source.git.commit
:如果从 Git 仓库运行,会自动检测并填充提交哈希值mlflow.source.type
:如果在 Jupyter notebook 中运行,则为NOTEBOOK
;如果运行本地 Python 脚本,则为LOCAL
;否则为UNKNOWN
(自动检测)
如果需要,您可以使用 mlflow.update_current_trace()
或 mlflow.set_trace_tag()
来手动覆盖这些自动填充的标签,以实现更精细的控制。
保留的标准标签
一些标准标签具有特殊含义,但必须手动设置
mlflow.trace.session
:将来自多轮对话或用户会话的跟踪记录分组在一起mlflow.trace.user
:将跟踪记录与特定用户关联,以便进行以用户为中心的分析
自定义标签
您可以定义自定义标签来捕获任何业务特定或应用程序特定的上下文。常见示例包括:
environment
:例如,"production"
,"staging"
(来自DEPLOY_ENV
环境变量)app_version
:例如,"1.0.0"
(来自APP_VERSION
环境变量)deployment_id
:例如,"deploy-abc-123"
(来自DEPLOYMENT_ID
环境变量)region
:例如,"us-east-1"
(来自REGION
环境变量)- 功能标志和 A/B 测试变体
基本实现
以下是如何将各种类型的上下文作为标签添加到您的跟踪记录中
- 基本示例
- 使用上下文管理器
- Web 应用程序示例
import mlflow
import os
import platform
@mlflow.trace
def process_data_with_context(data: dict, app_config: dict):
"""Process data and add environment, version, and custom context."""
current_env = os.getenv("APP_ENVIRONMENT", "development")
current_app_version = app_config.get("version", "unknown")
current_model_version = app_config.get("model_in_use", "gpt-3.5-turbo")
# Define custom context tags
context_tags = {
"environment": current_env,
"app_version": current_app_version,
"model_version": current_model_version,
"python_version": platform.python_version(),
"operating_system": platform.system(),
"data_source": data.get("source", "batch"),
"processing_mode": "online" if current_env == "production" else "offline",
}
# Add tags to the current trace
mlflow.update_current_trace(tags=context_tags)
# Your application logic here...
result = (
f"Processed '{data['input']}' in {current_env} with app {current_app_version}"
)
return result
# Example usage
config = {"version": "1.1.0", "model_in_use": "claude-3-sonnet-20240229"}
input_data = {"input": "Summarize this document...", "source": "realtime_api"}
processed_result = process_data_with_context(input_data, config)
print(processed_result)
关键点
- 使用
os.getenv()
获取环境变量(例如,APP_ENVIRONMENT
,APP_VERSION
) - 将应用程序或模型配置传递到您的跟踪函数中
- 使用
platform
模块获取系统信息 mlflow.update_current_trace()
将所有键值对添加到当前活动跟踪中
对于更复杂的场景,您可以使用上下文管理器来确保标签的一致性
import mlflow
import os
from contextlib import contextmanager
@contextmanager
def trace_with_environment(operation_name: str):
"""Context manager that automatically adds environment context to traces"""
# Environment context
env_tags = {
"environment": os.getenv("ENVIRONMENT", "development"),
"app_version": os.getenv("APP_VERSION", "unknown"),
"deployment_id": os.getenv("DEPLOYMENT_ID", "local"),
"region": os.getenv("AWS_REGION", "local"),
"kubernetes_namespace": os.getenv("KUBERNETES_NAMESPACE"),
"container_image": os.getenv("CONTAINER_IMAGE"),
}
# Filter out None values
env_tags = {k: v for k, v in env_tags.items() if v is not None}
with mlflow.start_span(name=operation_name, attributes=env_tags) as span:
# Add tags to the trace level as well
mlflow.update_current_trace(tags=env_tags)
yield span
# Usage
def my_genai_pipeline(user_input: str):
with trace_with_environment("genai_pipeline"):
# Your pipeline logic here
return f"Processed: {user_input}"
result = my_genai_pipeline("What is the weather like?")
在生产 Web 应用程序中,上下文可以从环境变量、请求头或应用程序配置中获取
import mlflow
import os
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
app = FastAPI()
@mlflow.trace
@app.post("/chat")
async def handle_chat(request: Request):
# Get request data
data = await request.json()
message = data.get("message", "")
# Retrieve context from request headers
client_request_id = request.headers.get("X-Request-ID")
session_id = request.headers.get("X-Session-ID")
user_id = request.headers.get("X-User-ID")
user_agent = request.headers.get("User-Agent")
# Update the current trace with all context and environment metadata
mlflow.update_current_trace(
client_request_id=client_request_id,
tags={
# Session context - groups traces from multi-turn conversations
"mlflow.trace.session": session_id,
# User context - associates traces with specific users
"mlflow.trace.user": user_id,
# Environment metadata - tracks deployment context
"environment": os.getenv("ENVIRONMENT", "development"),
"app_version": os.getenv("APP_VERSION", "1.0.0"),
"deployment_id": os.getenv("DEPLOYMENT_ID", "unknown"),
"region": os.getenv("REGION", "us-east-1"),
# Request context
"user_agent": user_agent,
"request_method": request.method,
"endpoint": request.url.path,
},
)
# Your application logic for processing the chat message
response_text = f"Processed message: '{message}'"
return JSONResponse(content={"response": response_text})
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=5000, debug=True)
带有上下文头的请求示例
curl -X POST "http://127.0.0.1:5000/chat" \
-H "Content-Type: application/json" \
-H "X-Request-ID: req-abc-123-xyz-789" \
-H "X-Session-ID: session-def-456-uvw-012" \
-H "X-User-ID: user-jane-doe-12345" \
-d '{"message": "What is my account balance?"}'
查询和分析上下文数据
使用 MLflow 用户界面
在 MLflow 用户界面(“跟踪记录”选项卡)中,使用搜索功能按上下文标签筛选跟踪记录
tags.environment = 'production'
tags.app_version = '2.1.0'
tags.model_used = 'advanced_model' AND tags.client_variant = 'treatment'
tags.feature_flag_new_ui = 'true'
您可以按标签对跟踪记录进行分组,以比较不同上下文下的性能或错误率。
编程分析
使用 MLflow SDK 进行更复杂的分析或与其他工具集成
- 版本比较
- 环境分析
- 功能标志分析
比较不同应用程序版本的错误率和性能
import mlflow
from mlflow import MlflowClient
def compare_version_metrics(experiment_id: str, versions: list):
"""Compare error rates and performance across app versions"""
version_metrics = {}
for version in versions:
traces = mlflow.search_traces(
experiment_ids=[experiment_id],
filter_string=f"tags.environment = 'production' AND tags.app_version = '{version}'",
)
if traces.empty:
version_metrics[version] = {
"error_rate": None,
"avg_latency": None,
"total_traces": 0,
}
continue
# Calculate metrics
error_count = len(traces[traces["status"] == "ERROR"])
total_traces = len(traces)
error_rate = (error_count / total_traces) * 100
successful_traces = traces[traces["status"] == "OK"]
avg_latency = (
successful_traces["execution_time_ms"].mean()
if not successful_traces.empty
else 0
)
version_metrics[version] = {
"error_rate": error_rate,
"avg_latency": avg_latency,
"total_traces": total_traces,
}
return version_metrics
# Usage
metrics = compare_version_metrics("1", ["1.0.0", "1.1.0", "1.2.0"])
for version, data in metrics.items():
print(
f"Version {version}: {data['error_rate']:.1f}% errors, {data['avg_latency']:.1f}ms avg latency"
)
分析不同环境的性能差异
def analyze_environment_performance(experiment_id: str):
"""Compare performance across different environments"""
environments = ["development", "staging", "production"]
env_metrics = {}
for env in environments:
traces = mlflow.search_traces(
experiment_ids=[experiment_id],
filter_string=f"tags.environment = '{env}' AND status = 'OK'",
)
if not traces.empty:
env_metrics[env] = {
"count": len(traces),
"avg_latency": traces["execution_time_ms"].mean(),
"p95_latency": traces["execution_time_ms"].quantile(0.95),
"p99_latency": traces["execution_time_ms"].quantile(0.99),
}
return env_metrics
# Usage
env_performance = analyze_environment_performance("1")
for env, metrics in env_performance.items():
print(
f"{env}: {metrics['count']} traces, "
f"avg: {metrics['avg_latency']:.1f}ms, "
f"p95: {metrics['p95_latency']:.1f}ms"
)
分析功能标志对性能的影响
def analyze_feature_flag_impact(experiment_id: str, flag_name: str):
"""Analyze performance impact of a feature flag"""
# Get traces with feature flag enabled
flag_on_traces = mlflow.search_traces(
experiment_ids=[experiment_id],
filter_string=f"tags.feature_flag_{flag_name} = 'true' AND status = 'OK'",
)
# Get traces with feature flag disabled
flag_off_traces = mlflow.search_traces(
experiment_ids=[experiment_id],
filter_string=f"tags.feature_flag_{flag_name} = 'false' AND status = 'OK'",
)
results = {}
if not flag_on_traces.empty:
results["flag_on"] = {
"count": len(flag_on_traces),
"avg_latency": flag_on_traces["execution_time_ms"].mean(),
"error_rate": 0, # Only looking at successful traces
}
if not flag_off_traces.empty:
results["flag_off"] = {
"count": len(flag_off_traces),
"avg_latency": flag_off_traces["execution_time_ms"].mean(),
"error_rate": 0, # Only looking at successful traces
}
# Calculate performance impact
if "flag_on" in results and "flag_off" in results:
latency_change = (
results["flag_on"]["avg_latency"] - results["flag_off"]["avg_latency"]
)
latency_change_pct = (latency_change / results["flag_off"]["avg_latency"]) * 100
results["impact"] = {
"latency_change_ms": latency_change,
"latency_change_percent": latency_change_pct,
}
return results
# Usage
flag_analysis = analyze_feature_flag_impact("1", "new_retriever")
if "impact" in flag_analysis:
impact = flag_analysis["impact"]
print(
f"Feature flag impact: {impact['latency_change_ms']:.1f}ms "
f"({impact['latency_change_percent']:.1f}% change)"
)
最佳实践
标签策略
标准化标签键:为您的自定义标签使用一致的命名约定(例如,snake_case
)
部署上下文的环境变量:在您的 CI/CD 或部署过程中设置环境变量,以获取版本和环境信息
自动化上下文附加:确保上下文标签由您的应用程序或部署脚本自动应用
平衡粒度与简洁性:捕获足够的上下文以便进行有用的分析,但要避免过度标记,以免使跟踪记录难以管理
性能考量
最小化标签数量:虽然添加标签的开销很小,但在高吞吐量系统中应避免附加过多的标签
使用简短的标签值:保持标签值简洁,以减少存储开销
一致的标签:确保您的标签策略在所有服务和部署环境中保持一致
安全与隐私
避免敏感数据:不要将个人身份信息 (PII) 或敏感信息直接存储在标签中
使用匿名标识符:在跟踪用户时,使用匿名标识符而非个人信息
审查标签内容:定期审计您的标签,确保它们不包含敏感信息
后续步骤
MLflow 跟踪用户界面:学习如何使用用户界面按环境和版本筛选及分析跟踪记录
搜索跟踪记录:掌握高级搜索语法以进行复杂的基于上下文的查询
通过 SDK 查询跟踪记录:构建自定义分析和监控工作流
手动跟踪:添加具有上下文感知跨度的详细检测
通过实施全面的环境和版本跟踪,您可以在您的生成式AI应用程序中构建强大的可观测性,从而支持从开发到生产部署的整个过程。