通过 SDK 查询追踪
为何以编程方式查询跟踪?
MLflow UI 非常适合交互式探索,但编程访问可实现自动化并与您现有工作流集成。您可以分析数千个跟踪中的错误模式,监控随时间变化的性能趋势,并构建自定义分析工作流。
search_traces
API 默认返回一个 pandas DataFrame(如果安装了 pandas),但当指定 return_type="list"
时,也可以返回一个 Trace 对象列表。
语法差异: 当使用 DataFrame 返回类型时,filter_string
中使用的列名与返回的 DataFrame 中的列名不同
- 过滤条件:
status
→ DataFrame:state
- 过滤条件:
timestamp_ms
→ DataFrame:request_time
- 过滤条件:
execution_time_ms
→ DataFrame:execution_duration
基本查询方法
mlflow.search_traces()
函数返回两种格式的跟踪数据
- pandas DataFrame(如果安装了 pandas,则为默认):非常适合数据分析和快速洞察
- Trace 对象列表:当您需要处理完整的 Trace 对象结构时很有用
您可以使用 return_type
参数控制返回类型
import mlflow
# Default: Get traces as a pandas DataFrame (if pandas is installed)
error_traces_df = mlflow.search_traces(
experiment_ids=["1"], filter_string="status = 'ERROR'", max_results=100
)
# Easy analysis with pandas
print(f"Found {len(error_traces_df)} errors")
print(f"Average execution time: {error_traces_df['execution_duration'].mean():.1f}ms")
# Alternative: Get traces as a list of Trace objects
error_traces_list = mlflow.search_traces(
experiment_ids=["1"],
filter_string="status = 'ERROR'",
max_results=100,
return_type="list",
)
# Work with Trace objects directly
for trace in error_traces_list[:3]:
print(f"Trace {trace.info.trace_id}: {trace.info.execution_duration}ms")
选择正确的返回类型
在以下情况使用 DataFrame(默认)::
- 您需要对跟踪数据执行统计分析
- 您希望聚合多个跟踪的指标
- 您正在创建可视化或报告
- 您熟悉 pandas 并希望利用其强大的数据操作功能
在以下情况使用 Trace 对象列表::
- 您需要访问完整的 Trace 对象方法和属性
- 您正在与其他期望 Trace 对象的 MLflow API 集成
- 您希望详细处理单个跟踪
- 您的环境中未安装 pandas
常见用例
查找和分析错误
当您需要了解应用程序出现问题的原因时,请从简单的查询开始,以识别模式
import mlflow
from datetime import datetime, timedelta
# Get errors from the last 24 hours
yesterday = datetime.now() - timedelta(days=1)
timestamp_ms = int(yesterday.timestamp() * 1000)
error_traces = mlflow.search_traces(
experiment_ids=["1"],
filter_string=f"status = 'ERROR' AND timestamp_ms > {timestamp_ms}",
order_by=["timestamp_ms DESC"],
)
print(f"Found {len(error_traces)} errors in the last 24 hours")
# Look at error distribution by tags
if not error_traces.empty:
# Count errors by user if user tags exist
if "tags" in error_traces.columns:
print("\nError patterns:")
tag_analysis = {}
for tags in error_traces["tags"].dropna():
if isinstance(tags, dict):
user = tags.get("user_id", "unknown")
tag_analysis[user] = tag_analysis.get(user, 0) + 1
for user, count in sorted(
tag_analysis.items(), key=lambda x: x[1], reverse=True
)[:5]:
print(f" {user}: {count} errors")
性能监控
跟踪您的应用程序随时间变化的性能并识别瓶颈
# Get successful traces to analyze performance
recent_traces = mlflow.search_traces(
experiment_ids=["1"],
filter_string="status = 'OK'",
order_by=["timestamp_ms DESC"],
max_results=1000,
)
if not recent_traces.empty:
# Basic performance metrics
avg_time = recent_traces["execution_duration"].mean()
p95_time = recent_traces["execution_duration"].quantile(0.95)
print(f"Average execution time: {avg_time:.1f}ms")
print(f"95th percentile: {p95_time:.1f}ms")
# Find slowest traces
slow_traces = recent_traces[recent_traces["execution_duration"] > p95_time]
print(f"Found {len(slow_traces)} slow traces (>{p95_time:.1f}ms)")
用户会话分析
通过分析用户会话了解他们如何与您的应用程序交互
# Analyze traces for a specific user
user_traces = mlflow.search_traces(
experiment_ids=["1"],
filter_string="tags.user_id = 'user123'",
order_by=["timestamp_ms ASC"],
)
if not user_traces.empty:
print(f"User has {len(user_traces)} interactions")
# Calculate session metrics
total_time = user_traces["execution_duration"].sum()
error_count = len(user_traces[user_traces["state"] == "ERROR"])
print(f"Total execution time: {total_time:.1f}ms")
print(
f"Error rate: {error_count}/{len(user_traces)} ({error_count/len(user_traces)*100:.1f}%)"
)
# Show recent activity
print("\nRecent traces:")
for _, trace in user_traces.tail(3).iterrows():
status = "✅" if trace["state"] == "OK" else "❌"
print(f" {status} {trace['execution_duration']:.1f}ms")
最佳实践
从简单开始: 从基本查询开始,并根据需要逐步增加复杂性。大多数监控可以通过简单的过滤器和 pandas 操作完成。
使用时间窗口: 在分析近期数据时,始终按 timestamp_ms 过滤,以保持查询快速和相关。
处理缺失数据: 生产跟踪可能存在缺失字段,因此在使用数据之前务必检查数据是否存在。
保持查询集中: 使用特定过滤器仅获取所需数据,而不是检索所有内容并在内存中进行过滤。
错误处理
添加基本错误处理以使您的脚本更健壮
import pandas as pd
def safe_trace_query(experiment_ids, filter_string=None):
"""Query traces with basic error handling"""
try:
return mlflow.search_traces(
experiment_ids=experiment_ids, filter_string=filter_string
)
except Exception as e:
print(f"Error querying traces: {e}")
return pd.DataFrame() # Return empty DataFrame on error
# Usage
traces = safe_trace_query(["1"], "status = 'ERROR'")
if not traces.empty:
print(f"Found {len(traces)} traces")
else:
print("No traces found or query failed")
总结
使用 MLflow SDK 以编程方式查询跟踪可实现强大的自动化,用于调试、监控和分析。从简单查询开始以理解您的数据,然后逐步构建更复杂的分析工作流。
关键在于关注可操作的洞察,这有助于您理解和改进应用程序在生产环境中的行为。
后续步骤
MLflow 跟踪 UI: 了解用于交互式跟踪探索的 Web 界面
搜索跟踪: 掌握高级搜索和过滤技术
删除和管理跟踪: 理解跟踪生命周期管理