低级客户端API(高级)
MLflow 客户端 API 提供对跟踪生命周期的直接、细粒度控制。虽然 高级 API 优雅地处理了大多数用例,但客户端 API 对于需要显式控制跟踪创建、自定义跟踪 ID 或与现有可观察性系统集成的更高级场景至关重要。
谨慎使用
开始之前:我们建议仅当高级 API 无法满足您的要求时才使用客户端 API
- ❌ 无自动父子关系检测
- 🛠️ 需要手动异常处理
- 🚫 与自动跟踪集成不兼容
- 🎛️ 完全控制跟踪生命周期
- 🆔 自定义跟踪 ID 管理
核心概念
跟踪生命周期
每个跟踪都遵循必须显式管理的严格生命周期
- 🚀 启动跟踪 - 创建根 span
- 📊 启动 Span(s) - 根据需要添加子 span
- 🔚 结束 Span(s) - 以相反的顺序关闭 span(后进先出 - LIFO)
- ✅ 结束跟踪 - 完成根 span
重要
黄金法则:每个 start_trace
或 start_span
调用都必须有一个对应的 end_trace
或 end_span
调用。未能关闭 span 将导致不完整的跟踪。
关键标识符
了解这些标识符对于客户端 API 的使用至关重要
标识符 | 描述 | 用法 |
---|---|---|
request_id | 唯一的跟踪标识符 | 链接跟踪中的所有 span |
span_id | 唯一的 span 标识符 | 标识要结束的特定 span |
parent_id | 父 span 的 ID | 创建 span 层次结构 |
开始使用
初始化客户端
from mlflow import MlflowClient
# Initialize client with default tracking URI
client = MlflowClient()
# Or specify a custom tracking URI
client = MlflowClient(tracking_uri="https://:5000")
启动跟踪
与高级 API 不同,您必须在添加 span 之前显式启动跟踪
# Start a new trace - this creates the root span
root_span = client.start_trace(
name="my_application_flow",
inputs={"user_id": "123", "action": "generate_report"},
attributes={"environment": "production", "version": "1.0.0"},
)
# Extract the request_id for subsequent operations
request_id = root_span.request_id
print(f"Started trace with ID: {request_id}")
添加子 Span
创建 span 的层次结构来表示您的应用程序的工作流程
# Create a child span for data retrieval
data_span = client.start_span(
name="fetch_user_data",
request_id=request_id, # Links to the trace
parent_id=root_span.span_id, # Creates parent-child relationship
inputs={"user_id": "123"},
attributes={"database": "users_db", "query_type": "select"},
)
# Create a sibling span for processing
process_span = client.start_span(
name="process_data",
request_id=request_id,
parent_id=root_span.span_id, # Same parent as data_span
inputs={"data_size": "1024KB"},
attributes={"processor": "gpu", "batch_size": 32},
)
结束 Span
按照创建的相反顺序结束 span(后进先出 - LIFO)
# End the data retrieval span
client.end_span(
request_id=data_span.request_id,
span_id=data_span.span_id,
outputs={"record_count": 42, "cache_hit": True},
attributes={"duration_ms": 150},
)
# End the processing span
client.end_span(
request_id=process_span.request_id,
span_id=process_span.span_id,
outputs={"processed_records": 42, "errors": 0},
status="OK",
)
结束跟踪
通过结束根 span 来完成跟踪
# End the root span (completes the trace)
client.end_trace(
request_id=request_id,
outputs={"report_url": "https://example.com/report/123"},
attributes={"total_duration_ms": 1250, "status": "success"},
)
实用示例
- 错误处理
- 自定义跟踪管理
- 批量处理
适当的错误处理可确保即使发生异常也能完成跟踪
def traced_operation():
client = MlflowClient()
root_span = None
try:
# Start trace
root_span = client.start_trace("risky_operation")
# Start child span
child_span = client.start_span(
name="database_query",
request_id=root_span.request_id,
parent_id=root_span.span_id,
)
try:
# Risky operation
result = perform_database_query()
# End child span on success
client.end_span(
request_id=child_span.request_id,
span_id=child_span.span_id,
outputs={"result": result},
status="OK",
)
except Exception as e:
# End child span on error
client.end_span(
request_id=child_span.request_id,
span_id=child_span.span_id,
status="ERROR",
attributes={"error": str(e)},
)
raise
except Exception as e:
# Log error to trace
if root_span:
client.end_trace(
request_id=root_span.request_id,
status="ERROR",
attributes={"error_type": type(e).__name__, "error_message": str(e)},
)
raise
else:
# End trace on success
client.end_trace(
request_id=root_span.request_id,
outputs={"status": "completed"},
status="OK",
)
实施自定义跟踪 ID 生成和管理,以便与现有系统集成
import uuid
from datetime import datetime
class CustomTraceManager:
"""Custom trace manager with business-specific trace IDs"""
def __init__(self):
self.client = MlflowClient()
self.active_traces = {}
def generate_trace_id(self, user_id: str, operation: str) -> str:
"""Generate custom trace ID based on business logic"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
return f"{user_id}_{operation}_{timestamp}_{uuid.uuid4().hex[:8]}"
def start_custom_trace(self, user_id: str, operation: str, **kwargs):
"""Start trace with custom ID format"""
trace_name = self.generate_trace_id(user_id, operation)
root_span = self.client.start_trace(
name=trace_name,
attributes={
"user_id": user_id,
"operation": operation,
"custom_trace_id": trace_name,
**kwargs,
},
)
self.active_traces[trace_name] = root_span
return root_span
def get_active_trace(self, trace_name: str):
"""Retrieve active trace by custom name"""
return self.active_traces.get(trace_name)
# Usage
manager = CustomTraceManager()
trace = manager.start_custom_trace(
user_id="user123", operation="report_generation", report_type="quarterly"
)
跟踪具有多层嵌套的复杂工作流程
def batch_processor(items):
client = MlflowClient()
# Start main trace
root = client.start_trace(
name="batch_processing", inputs={"batch_size": len(items)}
)
results = []
# Process each item
for i, item in enumerate(items):
# Create span for each item
item_span = client.start_span(
name=f"process_item_{i}",
request_id=root.request_id,
parent_id=root.span_id,
inputs={"item_id": item["id"]},
)
try:
# Validation span
validation_span = client.start_span(
name="validate", request_id=root.request_id, parent_id=item_span.span_id
)
is_valid = validate_item(item)
client.end_span(
request_id=validation_span.request_id,
span_id=validation_span.span_id,
outputs={"is_valid": is_valid},
)
if is_valid:
# Processing span
process_span = client.start_span(
name="transform",
request_id=root.request_id,
parent_id=item_span.span_id,
)
result = transform_item(item)
results.append(result)
client.end_span(
request_id=process_span.request_id,
span_id=process_span.span_id,
outputs={"transformed": result},
)
# End item span
client.end_span(
request_id=item_span.request_id, span_id=item_span.span_id, status="OK"
)
except Exception as e:
# Handle errors gracefully
client.end_span(
request_id=item_span.request_id,
span_id=item_span.span_id,
status="ERROR",
attributes={"error": str(e)},
)
# End main trace
client.end_trace(
request_id=root.request_id,
outputs={
"processed_count": len(results),
"success_rate": len(results) / len(items),
},
)
return results
最佳实践
- 上下文管理器
- 状态管理
- 有意义的属性
创建自定义上下文管理器以确保始终关闭 span
from contextlib import contextmanager
@contextmanager
def traced_span(client, name, request_id, parent_id=None, **kwargs):
"""Context manager for safe span management"""
span = client.start_span(
name=name, request_id=request_id, parent_id=parent_id, **kwargs
)
try:
yield span
except Exception as e:
client.end_span(
request_id=span.request_id,
span_id=span.span_id,
status="ERROR",
attributes={"error": str(e)},
)
raise
else:
client.end_span(request_id=span.request_id, span_id=span.span_id, status="OK")
# Usage
with traced_span(client, "my_operation", request_id, parent_id) as span:
# Your code here
result = perform_operation()
管理复杂应用程序的跟踪状态
class TraceStateManager:
"""Manage trace state across application components"""
def __init__(self):
self.client = MlflowClient()
self._trace_stack = []
@property
def current_trace(self):
"""Get current active trace"""
return self._trace_stack[-1] if self._trace_stack else None
def push_trace(self, name: str, **kwargs):
"""Start a new trace and push to stack"""
if self.current_trace:
# Create child span if trace exists
span = self.client.start_span(
name=name,
request_id=self.current_trace.request_id,
parent_id=self.current_trace.span_id,
**kwargs
)
else:
# Create new trace
span = self.client.start_trace(name=name, **kwargs)
self._trace_stack.append(span)
return span
def pop_trace(self, **kwargs):
"""End current trace and pop from stack"""
if not self._trace_stack:
return
span = self._trace_stack.pop()
if self._trace_stack:
# End child span
self.client.end_span(
request_id=span.request_id, span_id=span.span_id, **kwargs
)
else:
# End root trace
self.client.end_trace(request_id=span.request_id, **kwargs)
使用有助于调试的上下文来丰富您的跟踪
好的例子
# Good: Specific, actionable attributes
client.start_span(
name="llm_call",
request_id=request_id,
parent_id=parent_id,
attributes={
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 1000,
"prompt_template": "rag_v2",
"user_tier": "premium",
},
)
坏的例子
# Bad: Generic, unhelpful attributes
client.start_span(
name="process",
request_id=request_id,
parent_id=parent_id,
attributes={"step": 1, "data": "some data"},
)
常见陷阱
重要注意事项
避免这些常见的错误
- 🚫 忘记结束 span - 始终使用 try/finally 或上下文管理器
- 🔗 不正确的父子关系 - 仔细检查 span ID
- 🔀 混合使用高级和低级 API - 它们不互操作
- 🔐 硬编码跟踪 ID - 始终生成唯一的 ID
- 🧵 忽略线程安全 - 默认情况下,客户端 API 不是线程安全的
性能考量
- 📦 批量操作:当创建许多 span 时,请考虑批量操作以减少开销。
- 🧠 内存管理:注意保持对 span 对象的引用 - 完成后清理它们。
- 🌐 网络调用:每个启动/结束操作都可能导致对跟踪服务器的网络调用。
- 🧵 线程安全:在多线程环境中使用客户端 API 时,请使用锁或线程本地存储。
后续步骤
高级 API - 适用于大多数用例的更简单替代方案
自动跟踪 - 支持的框架的一行式跟踪
跟踪概念 - 了解跟踪结构和组件
查询跟踪 - 以编程方式搜索和分析您的跟踪