通过 SDK 查询追踪
为什么要以编程方式查询追踪?
MLflow UI 非常适合交互式探索,但编程访问可以实现自动化并与您现有的工作流集成。您可以分析数千个追踪中的错误模式,监控随时间变化的性能趋势,并构建自定义分析工作流。
search_traces
API 默认返回一个 pandas DataFrame(如果已安装 pandas),但也可以在指定 return_type="list"
时返回一个 Trace 对象列表。
语法差异:当使用 DataFrame 返回类型时,filter_string
中使用的列名与返回的 DataFrame 中的列名不同。
- 筛选器:
status
→ DataFrame:state
- 筛选器:
timestamp_ms
→ DataFrame:request_time
- 筛选器:
execution_time_ms
→ DataFrame:execution_duration
基本查询方法
mlflow.search_traces()
函数以两种格式返回追踪数据
- pandas DataFrame(如果已安装 pandas,则为默认值):非常适合数据分析和快速洞察
- Trace 对象列表:当您需要使用完整的 Trace 对象结构时非常有用
您可以使用 return_type
参数控制返回类型
import mlflow
# Default: Get traces as a pandas DataFrame (if pandas is installed)
error_traces_df = mlflow.search_traces(
experiment_ids=["1"], filter_string="status = 'ERROR'", max_results=100
)
# Easy analysis with pandas
print(f"Found {len(error_traces_df)} errors")
print(f"Average execution time: {error_traces_df['execution_duration'].mean():.1f}ms")
# Alternative: Get traces as a list of Trace objects
error_traces_list = mlflow.search_traces(
experiment_ids=["1"],
filter_string="status = 'ERROR'",
max_results=100,
return_type="list",
)
# Work with Trace objects directly
for trace in error_traces_list[:3]:
print(f"Trace {trace.info.trace_id}: {trace.info.execution_duration}ms")
选择正确的返回类型
当出现以下情况时,请使用 DataFrame(默认):
- 您需要对追踪数据进行统计分析
- 您想在多个追踪中聚合指标
- 您正在创建可视化或报告
- 您熟悉 pandas 并希望利用其强大的数据处理能力
当出现以下情况时,请使用 Trace 对象列表:
- 您需要访问完整的 Trace 对象方法和属性
- 您正在与需要 Trace 对象的其他 MLflow API 集成
- 您想详细处理单个追踪
- 您的环境中未安装 pandas
常见用例
查找和分析错误
当您需要了解应用程序中出了什么问题时,请从简单的查询开始,以识别模式
import mlflow
from datetime import datetime, timedelta
# Get errors from the last 24 hours
yesterday = datetime.now() - timedelta(days=1)
timestamp_ms = int(yesterday.timestamp() * 1000)
error_traces = mlflow.search_traces(
experiment_ids=["1"],
filter_string=f"status = 'ERROR' AND timestamp_ms > {timestamp_ms}",
order_by=["timestamp_ms DESC"],
)
print(f"Found {len(error_traces)} errors in the last 24 hours")
# Look at error distribution by tags
if not error_traces.empty:
# Count errors by user if user tags exist
if "tags" in error_traces.columns:
print("\nError patterns:")
tag_analysis = {}
for tags in error_traces["tags"].dropna():
if isinstance(tags, dict):
user = tags.get("user_id", "unknown")
tag_analysis[user] = tag_analysis.get(user, 0) + 1
for user, count in sorted(
tag_analysis.items(), key=lambda x: x[1], reverse=True
)[:5]:
print(f" {user}: {count} errors")
性能监控
跟踪您的应用程序随时间推移的性能并识别瓶颈
# Get successful traces to analyze performance
recent_traces = mlflow.search_traces(
experiment_ids=["1"],
filter_string="status = 'OK'",
order_by=["timestamp_ms DESC"],
max_results=1000,
)
if not recent_traces.empty:
# Basic performance metrics
avg_time = recent_traces["execution_duration"].mean()
p95_time = recent_traces["execution_duration"].quantile(0.95)
print(f"Average execution time: {avg_time:.1f}ms")
print(f"95th percentile: {p95_time:.1f}ms")
# Find slowest traces
slow_traces = recent_traces[recent_traces["execution_duration"] > p95_time]
print(f"Found {len(slow_traces)} slow traces (>{p95_time:.1f}ms)")
用户会话分析
通过分析用户的会话来了解他们如何与您的应用程序互动
# Analyze traces for a specific user
user_traces = mlflow.search_traces(
experiment_ids=["1"],
filter_string="tags.user_id = 'user123'",
order_by=["timestamp_ms ASC"],
)
if not user_traces.empty:
print(f"User has {len(user_traces)} interactions")
# Calculate session metrics
total_time = user_traces["execution_duration"].sum()
error_count = len(user_traces[user_traces["state"] == "ERROR"])
print(f"Total execution time: {total_time:.1f}ms")
print(
f"Error rate: {error_count}/{len(user_traces)} ({error_count/len(user_traces)*100:.1f}%)"
)
# Show recent activity
print("\nRecent traces:")
for _, trace in user_traces.tail(3).iterrows():
status = "✅" if trace["state"] == "OK" else "❌"
print(f" {status} {trace['execution_duration']:.1f}ms")
最佳实践
从简开始:从基本查询开始,并根据需要逐渐增加复杂性。大多数监控都可以通过简单的筛选器和 pandas 操作完成。
使用时间窗口:在分析近期数据时,始终按 timestamp_ms 进行筛选,以保持查询的快速和相关性。
处理缺失数据:生产环境中的追踪可能存在缺失字段,因此在使用数据前务必检查数据是否存在。
保持查询的专注性:使用特定的筛选器仅获取您需要的数据,而不是检索所有内容后再在内存中筛选。
错误处理
添加基本的错误处理以使您的脚本更健壮
import pandas as pd
def safe_trace_query(experiment_ids, filter_string=None):
"""Query traces with basic error handling"""
try:
return mlflow.search_traces(
experiment_ids=experiment_ids, filter_string=filter_string
)
except Exception as e:
print(f"Error querying traces: {e}")
return pd.DataFrame() # Return empty DataFrame on error
# Usage
traces = safe_trace_query(["1"], "status = 'ERROR'")
if not traces.empty:
print(f"Found {len(traces)} traces")
else:
print("No traces found or query failed")
总结
使用 MLflow SDK 进行编程化追踪查询,可为调试、监控和分析实现强大的自动化。从简单的查询开始以了解您的数据,然后逐步构建更复杂的分析工作流。
关键在于专注于可操作的见解,以帮助您了解和改进应用程序在生产环境中的行为。
后续步骤
MLflow 追踪 UI:学习用于交互式追踪探索的 Web 界面
搜索追踪:掌握高级搜索和筛选技术
删除和管理追踪:了解追踪的生命周期管理