跳到主要内容

手动追踪

除了自动追踪集成外,您还可以使用 MLflow 的手动追踪 API 来检测您的 Python 代码。当您需要检测自定义 Python 代码时,这尤其有用。

提示

正在寻找 Typescript 指南?请查看Typescript SDK文档。

装饰器

mlflow.trace() 装饰器允许您为任何函数创建 span。这种方法提供了一种简单而有效的方式,以最少的精力向您的代码添加追踪

  • 🔗 MLflow 检测函数之间的父子关系,使其与自动追踪集成兼容。
  • 🛡️ 捕获函数执行期间的异常并将其记录为 span 事件。
  • 📊 自动记录函数的名称、输入、输出和执行时间
  • 🤝 可与自动追踪功能配合使用,例如 mlflow.openai.autolog

@mlflow.trace 装饰器目前支持以下类型的函数

函数类型支持
同步
异步是(>= 2.16.0)
生成器是(>= 2.20.2)
异步生成器是(>= 2.20.2)

示例

以下代码是使用装饰器追踪 Python 函数的最小示例。

装饰器顺序

为确保完全可观察性,如果使用多个装饰器,@mlflow.trace 装饰器通常应该是最外层的。有关详细说明和示例,请参阅将 @mlflow.trace 与其他装饰器配合使用

import mlflow


@mlflow.trace(span_type="func", attributes={"key": "value"})
def add_1(x):
return x + 1


@mlflow.trace(span_type="func", attributes={"key1": "value1"})
def minus_1(x):
return x - 1


@mlflow.trace(name="Trace Test")
def trace_test(x):
step1 = add_1(x)
return minus_1(step1)


trace_test(4)

Tracing Decorator

注意

当一个追踪包含多个同名 span 时,MLflow 会为其附加自动递增的后缀,例如_1_2

自定义 Span

mlflow.trace() 装饰器接受以下参数来自定义要创建的 span

  • 🏷️ name 参数用于覆盖默认的 span 名称(即被装饰函数的名称)
  • 🎯 span_type 参数用于设置 span 的类型。设置为内置的Span 类型之一或字符串。
  • 🏗️ attributes 参数用于向 span 添加自定义属性。
装饰器顺序

当将 @mlflow.trace 与其他装饰器(例如,来自 Web 框架的装饰器)结合使用时,它必须是最外层的。有关正确与不正确顺序的清晰示例,请参阅将 @mlflow.trace 与其他装饰器配合使用

from mlflow.entities import SpanType


@mlflow.trace(
name="call-local-llm", span_type=SpanType.LLM, attributes={"model": "gpt-4o-mini"}
)
def invoke(prompt: str):
return client.invoke(
messages=[{"role": "user", "content": prompt}], model="gpt-4o-mini"
)

或者,您可以使用 mlflow.get_current_active_span() API 在函数内部动态更新 span。

from mlflow.entities import SpanType


@mlflow.trace(span_type=SpanType.LLM)
def invoke(prompt: str):
model_id = "gpt-4o-mini"
# Get the current span (created by the @mlflow.trace decorator)
span = mlflow.get_current_active_span()
# Set the attribute to the span
span.set_attributes({"model": model_id})
return client.invoke(messages=[{"role": "user", "content": prompt}], model=model_id)

@mlflow.trace 与其他装饰器配合使用

当对单个函数应用多个装饰器时,将 @mlflow.trace 作为最外层装饰器(位于最顶层)至关重要。这确保 MLflow 可以捕获函数的整个执行过程,包括任何内部装饰器的行为。

如果 @mlflow.trace 不是最外层装饰器,则其对函数执行的可见性可能会受到限制或不正确,可能导致追踪不完整或错误地表示函数的输入、输出和执行时间。

考虑以下概念性示例

import mlflow
import functools
import time


# A hypothetical additional decorator
def simple_timing_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(
f"{func.__name__} executed in {end_time - start_time:.4f} seconds by simple_timing_decorator."
)
return result

return wrapper


# Correct order: @mlflow.trace is outermost
@mlflow.trace(name="my_decorated_function_correct_order")
@simple_timing_decorator
# @another_framework_decorator # e.g., @app.route("/mypath") from Flask
def my_complex_function(x, y):
# Function logic here
time.sleep(0.1) # Simulate work
return x + y


# Incorrect order: @mlflow.trace is NOT outermost
@simple_timing_decorator
@mlflow.trace(name="my_decorated_function_incorrect_order")
# @another_framework_decorator
def my_other_complex_function(x, y):
time.sleep(0.1)
return x * y


# Example calls
if __name__ == "__main__":
print("Calling function with correct decorator order:")
my_complex_function(5, 3)

print("\nCalling function with incorrect decorator order:")
my_other_complex_function(5, 3)

my_complex_function 示例(正确顺序)中,@mlflow.trace 将捕获完整的执行,包括 simple_timing_decorator 添加的时间。在 my_other_complex_function 示例(不正确顺序)中,MLflow 捕获的追踪可能无法准确反映总执行时间,或者可能错过 simple_timing_decorator@mlflow.trace 看到它们之前对输入/输出进行的修改。

添加追踪标签

可以将标签添加到追踪中,以在追踪级别提供额外的元数据。有几种不同的方法可以在追踪上设置标签。有关其他方法,请参阅操作指南

@mlflow.trace
def my_func(x):
mlflow.update_current_trace(tags={"fruit": "apple"})
return x + 1

在 UI 中自定义请求和响应预览

MLflow UI 中的“追踪”选项卡显示追踪列表,“请求”和“响应”列显示每个追踪的端到端输入和输出预览。这使您可以快速了解每个追踪代表什么。

默认情况下,这些预览会被截断为固定数量的字符。但是,您可以使用 mlflow.update_current_trace() 函数中的 request_previewresponse_preview 参数来自定义这些列中显示的内容。这对于默认截断可能无法显示最相关信息的复杂输入或输出特别有用。

下面是一个为处理长文档和用户指令的追踪设置自定义请求预览的示例,旨在在 UI 的“请求”列中呈现最相关的信息

import mlflow


@mlflow.trace(name="Summarization Pipeline")
def summarize_document(document_content: str, user_instructions: str):
# Construct a custom preview for the request column
# For example, show beginning of document and user instructions
request_p = f"Doc: {document_content[:30]}... Instr: {user_instructions[:30]}..."
mlflow.update_current_trace(request_preview=request_p)

# Simulate LLM call
# messages = [
# {"role": "system", "content": "Summarize the following document based on user instructions."},
# {"role": "user", "content": f"Document: {document_content}\nInstructions: {user_instructions}"}
# ]
# completion = client.chat.completions.create(model="gpt-4o-mini", messages=messages)
# summary = completion.choices[0].message.content
summary = f"Summary of document starting with '{document_content[:20]}...' based on '{user_instructions}'"

# Customize the response preview
response_p = f"Summary: {summary[:50]}..."
mlflow.update_current_trace(response_preview=response_p)

return summary


# Example Call
long_document = (
"This is a very long document that contains many details about various topics..."
* 10
)
instructions = "Focus on the key takeaways regarding topic X."
summary_result = summarize_document(long_document, instructions)

通过在追踪(通常是根 span)上设置 request_previewresponse_preview,您可以控制整体交互在主追踪列表视图中如何汇总,从而更容易一目了然地识别和理解追踪。

自动异常处理

如果在追踪检测操作的处理过程中引发 Exception,UI 中将显示调用不成功的指示,并提供部分数据捕获以帮助调试。此外,有关引发的异常的详细信息将包含在部分完成的 span 的 Events 中,进一步帮助识别代码中出现问题的位置。

Trace Error

与自动追踪结合

@mlflow.trace 装饰器可以与自动追踪结合使用,将自动追踪与手动定义的 span 组合成一个单一的、连贯的集成追踪。在此处了解更多信息:此处

流式传输

自 MLflow 2.20.2 起,@mlflow.trace 装饰器可用于追踪返回生成器或迭代器的函数。

@mlflow.trace
def stream_data():
for i in range(5):
yield i

上面的示例将为 stream_data 函数生成一个包含单个 span 的追踪。默认情况下,MLflow 将捕获生成器生成的所有元素作为列表存储在 span 的输出中。在上面的示例中,span 的输出将是 [0, 1, 2, 3, 4]

注意

流函数的 span 将在返回的迭代器开始被消耗时开始,并在迭代器耗尽或迭代期间引发异常时结束。

如果您想将元素聚合为单个 span 输出,可以使用 output_reducer 参数指定自定义函数来聚合元素。自定义函数应将生成的元素列表作为输入。

@mlflow.trace(output_reducer=lambda x: ",".join(x))
def stream_data():
for c in "hello":
yield c

在上面的示例中,span 的输出将是 "h,e,l,l,o"。原始块仍可在 span 的 Events 选项卡中找到。

以下是一个高级示例,它使用 output_reducer 将 OpenAI LLM 的 ChatCompletionChunk 输出合并为一个消息对象。

提示

当然,对于此类示例,我们建议使用OpenAI 的自动追踪,它以一行代码完成相同的工作。以下示例仅用于演示目的。

import mlflow
import openai
from openai.types.chat import *
from typing import Optional


def aggregate_chunks(outputs: list[ChatCompletionChunk]) -> Optional[ChatCompletion]:
"""Consolidate ChatCompletionChunks to a single ChatCompletion"""
if not outputs:
return None

first_chunk = outputs[0]
delta = first_chunk.choices[0].delta
message = ChatCompletionMessage(
role=delta.role, content=delta.content, tool_calls=delta.tool_calls or []
)
finish_reason = first_chunk.choices[0].finish_reason
for chunk in outputs[1:]:
delta = chunk.choices[0].delta
message.content += delta.content or ""
message.tool_calls += delta.tool_calls or []
finish_reason = finish_reason or chunk.choices[0].finish_reason

base = ChatCompletion(
id=first_chunk.id,
choices=[Choice(index=0, message=message, finish_reason=finish_reason)],
created=first_chunk.created,
model=first_chunk.model,
object="chat.completion",
)
return base


@mlflow.trace(output_reducer=aggregate_chunks)
def predict(messages: list[dict]):
stream = openai.OpenAI().chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
)
for chunk in stream:
yield chunk


for chunk in predict([{"role": "user", "content": "Hello"}]):
print(chunk)

在上面的示例中,生成的 predict span 将只有一个聊天完成消息作为输出,该消息由自定义 reducer 函数聚合。

函数封装

函数封装提供了一种灵活的方式来向现有函数添加追踪,而无需修改其定义。当您想向第三方函数或在您的控制之外定义的函数添加追踪时,这特别有用。通过使用 mlflow.trace() 封装外部函数,您可以捕获其输入、输出和执行上下文。

注意

在动态封装函数时,“最外层”的概念仍然适用。追踪封装器应应用于您想要捕获对封装函数的整个调用的点。

import math
import mlflow


def invocation(x, y, exp=2):
# Wrap an external function from the math library
traced_pow = mlflow.trace(math.pow)
raised = traced_pow(x, exp)

traced_factorial = mlflow.trace(math.factorial)
factorial = traced_factorial(int(raised))
return factorial


invocation(4, 2)

上下文管理器

除了装饰器之外,MLflow 还允许使用 mlflow.start_span() 上下文管理器来创建 span,然后可以在任何封装的任意代码块中访问该 span。它对于比通过捕获单个函数的边界更细致地捕获代码中的复杂交互非常有用。

与装饰器类似,上下文管理器自动捕获父子关系、异常、执行时间,并与自动追踪协同工作。但是,必须手动提供 span 的名称、输入和输出。您可以通过上下文管理器返回的 mlflow.entities.Span() 对象设置它们。

import mlflow

with mlflow.start_span(name="my_span") as span:
span.set_inputs({"x": 1, "y": 2})
z = x + y
span.set_outputs(z)

下面是一个稍微复杂一点的示例,它将 mlflow.start_span() 上下文管理器与装饰器和 OpenAI 的自动追踪结合使用。

import mlflow
import openai
from mlflow.entities import SpanType

# Enable auto-tracing for OpenAI
mlflow.openai.autolog()


@mlflow.trace(span_type=SpanType.CHAIN)
def start_session():
messages = [{"role": "system", "content": "You are a friendly chat bot"}]
while True:
with mlflow.start_span(name="User") as span:
span.set_inputs(messages)
user_input = input(">> ")
span.set_outputs(user_input)

if user_input == "BYE":
break

messages.append({"role": "user", "content": user_input})

response = openai.OpenAI().chat.completions.create(
model="gpt-4o-mini",
max_tokens=100,
messages=messages,
)
answer = response.choices[0].message.content
print(f"🤖: {answer}")

messages.append({"role": "assistant", "content": answer})


start_session()

高级功能

多线程

MLflow 追踪是线程安全的,追踪默认按线程隔离。但您也可以通过几个额外的步骤创建跨多个线程的追踪。

MLflow 使用 Python 内置的 ContextVar 机制来确保线程安全,该机制默认不会跨线程传播。因此,您需要手动将上下文从主线程复制到工作线程,如下面的示例所示。

import contextvars
from concurrent.futures import ThreadPoolExecutor, as_completed
import mlflow
from mlflow.entities import SpanType
import openai

client = openai.OpenAI()

# Enable MLflow Tracing for OpenAI
mlflow.openai.autolog()


@mlflow.trace
def worker(question: str) -> str:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": question},
]
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.1,
max_tokens=100,
)
return response.choices[0].message.content


@mlflow.trace
def main(questions: list[str]) -> list[str]:
results = []
# Almost same as how you would use ThreadPoolExecutor, but two additional steps
# 1. Copy the context in the main thread using copy_context()
# 2. Use ctx.run() to run the worker in the copied context
with ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for question in questions:
ctx = contextvars.copy_context()
futures.append(executor.submit(ctx.run, worker, question))
for future in as_completed(futures):
results.append(future.result())
return results


questions = [
"What is the capital of France?",
"What is the capital of Germany?",
]

main(questions)

Multi threaded tracing

提示

相比之下,ContextVar 默认复制到异步任务。因此,在使用 asyncio 时,您无需手动复制上下文,这可能是使用 MLflow 追踪处理 Python 中并发 I/O 密集型任务的更简单方法。

异步支持

@mlflow.trace 装饰器与异步函数无缝协作

import asyncio
import mlflow


@mlflow.trace
async def async_operation(data: str) -> str:
# Simulate async work
await asyncio.sleep(0.1)
return f"Processed: {data}"


@mlflow.trace
async def async_pipeline(items: list[str]) -> list[str]:
results = []
for item in items:
result = await async_operation(item)
results.append(result)
return results


# Run the async pipeline
asyncio.run(async_pipeline(["item1", "item2", "item3"]))

后续步骤

与自动追踪结合:混合自动和手动追踪以实现最佳可观察性

追踪概念:了解 MLflow 追踪的结构和组件

查询追踪:以编程方式搜索和分析您的追踪