手动跟踪
除了自动跟踪集成之外,您还可以使用 MLflow 跟踪 SDK 来检测您的 Python 代码。这在您需要检测自定义 Python 代码时特别有用。
装饰器
@mlflow.trace
装饰器允许您为任何函数创建一个 span。这种方法提供了一种简单有效的方式,以最小的工作量向您的代码添加跟踪。
- MLflow 检测函数之间的父子关系,使其与自动跟踪集成兼容。
- 捕获函数执行期间的异常,并将其记录为 span 事件。
- 自动记录函数的名称、输入、输出和执行时间。
- 可以与自动跟踪功能(例如
mlflow.openai.autolog
)一起使用。
@mlflow.trace 装饰器目前支持以下函数类型
函数类型 | 支持 |
---|---|
同步 | ✅ |
异步 | ✅ (>= 2.16.0) |
生成器 | ✅ (>= 2.20.2) |
异步生成器 | ✅ (>= 2.20.2) |
示例
以下代码是使用装饰器跟踪 Python 函数的最小示例。
import mlflow
@mlflow.trace(span_type="func", attributes={"key": "value"})
def add_1(x):
return x + 1
@mlflow.trace(span_type="func", attributes={"key1": "value1"})
def minus_1(x):
return x - 1
@mlflow.trace(name="Trace Test")
def trace_test(x):
step1 = add_1(x)
return minus_1(step1)
trace_test(4)
当一个 trace 包含多个同名 span 时,MLflow 会为其附加一个自动递增的后缀,例如 _1
、_2
。
自定义 Spans
@mlflow.trace
装饰器接受以下参数来自定义要创建的 span
name
参数,用于覆盖默认的 span 名称(即被装饰函数的名称)span_type
参数,用于设置 span 的类型。可以设置为内置的Span Types 中的一个,或一个字符串。attributes
参数,用于向 span 添加自定义属性。
@mlflow.trace(
name="call-local-llm", span_type=SpanType.LLM, attributes={"model": "gpt-4o-mini"}
)
def invoke(prompt: str):
return client.invoke(
messages=[{"role": "user", "content": prompt}], model="gpt-4o-mini"
)
或者,您可以使用 mlflow.get_current_active_span
API 在函数内部动态更新 span。
@mlflow.trace(span_type=SpanType.LLM)
def invoke(prompt: str):
model_id = "gpt-4o-mini"
# Get the current span (created by the @mlflow.trace decorator)
span = mlflow.get_current_active_span()
# Set the attribute to the span
span.set_attributes({"model": model_id})
return client.invoke(messages=[{"role": "user", "content": prompt}], model=model_id)
添加 Trace Tags
可以将 Tags 添加到 traces 中,以在 trace 级别提供额外的元数据。有几种不同的方法可以在 trace 上设置 tags。请参阅操作指南以了解其他方法。
@mlflow.trace
def my_func(x):
mlflow.update_current_trace(tags={"fruit": "apple"})
return x + 1
自动异常处理
如果在跟踪检测的操作处理期间引发 Exception
,UI 中将显示调用未成功的指示,并且将提供部分数据捕获以帮助调试。此外,有关引发的 Exception 的详细信息将包含在部分完成的 span 的 Events
中,进一步帮助识别代码中出现问题的位置。
与自动跟踪结合
@mlflow.trace
装饰器可以与自动跟踪结合使用。例如,以下代码将 OpenAI 自动跟踪与手动定义的 span 结合在一个统一集成的 trace 中。
import mlflow
import openai
mlflow.openai.autolog()
@mlflow.trace(span_type=SpanType.CHAIN)
def run(question):
messages = build_messages()
# MLflow automatically generates a span for OpenAI invocation
response = openai.OpenAI().chat.completions.create(
model="gpt-4o-mini",
max_tokens=100,
messages=messages,
)
return parse_response(response)
@mlflow.trace
def build_messages(question):
return [
{"role": "system", "content": "You are a helpful chatbot."},
{"role": "user", "content": question},
]
@mlflow.trace
def parse_response(response):
return response.choices[0].message.content
run("What is MLflow?")
运行此代码将生成以下单个 trace
流式处理
自 MLflow 2.20.2 起,@mlflow.trace
装饰器可用于跟踪返回生成器或迭代器的函数。
@mlflow.trace
def stream_data():
for i in range(5):
yield i
上面的示例将为 stream_data
函数生成一个带有单个 span 的 trace。默认情况下,MLflow 会将生成器 yield 的所有元素作为列表捕获到 span 的输出中。在上面的示例中,span 的输出将是 [0, 1, 2, 3, 4]
。
流函数的 span 将在返回的迭代器开始被消费时开始,并在迭代器耗尽或迭代过程中发生异常时结束。
如果您想将元素聚合为单个 span 输出,可以使用 output_reducer
参数指定一个自定义函数来聚合元素。自定义函数应接受一个 yield 元素列表作为输入。
@mlflow.trace(output_reducer=lambda x: ",".join(x))
def stream_data():
for c in "hello":
yield c
在上面的示例中,span 的输出将是 "h,e,l,l,o"
。原始块仍然可以在 span 的 Events
选项卡中找到。
以下是一个更高级的示例,它使用 output_reducer
将 OpenAI LLM 的 ChatCompletionChunk 输出合并到一个消息对象中。
当然,对于此类示例,我们建议使用 OpenAI 的自动跟踪,它能以一行代码完成同样的工作。下面的示例仅用于演示目的。
import mlflow
import openai
from openai.types.chat import *
from typing import Optional
def aggregate_chunks(outputs: list[ChatCompletionChunk]) -> Optional[ChatCompletion]:
"""Consolidate ChatCompletionChunks to a single ChatCompletion"""
if not outputs:
return None
first_chunk = outputs[0]
delta = first_chunk.choices[0].delta
message = ChatCompletionMessage(
role=delta.role, content=delta.content, tool_calls=delta.tool_calls or []
)
finish_reason = first_chunk.choices[0].finish_reason
for chunk in outputs[1:]:
delta = chunk.choices[0].delta
message.content += delta.content or ""
message.tool_calls += delta.tool_calls or []
finish_reason = finish_reason or chunk.choices[0].finish_reason
base = ChatCompletion(
id=first_chunk.id,
choices=[Choice(index=0, message=message, finish_reason=finish_reason)],
created=first_chunk.created,
model=first_chunk.model,
object="chat.completion",
)
return base
@mlflow.trace(output_reducer=aggregate_chunks)
def predict(messages: list[dict]):
stream = openai.OpenAI().chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
)
for chunk in stream:
yield chunk
for chunk in predict([{"role": "user", "content": "Hello"}]):
print(chunk)
在上面的示例中,生成的 predict
span 将具有单个聊天完成消息作为输出,该输出由自定义 reducer 函数聚合。
函数包装
函数包装提供了一种灵活的方式,可以在不修改函数定义的情况下向现有函数添加跟踪。当您想要向第三方函数或在您控制之外定义的函数添加跟踪时,这特别有用。通过使用 @mlflow.trace
包装外部函数,您可以捕获其输入、输出和执行上下文。
import math
import mlflow
def invocation(x, y, exp=2):
# Wrap an external function from the math library
traced_pow = mlflow.trace(math.pow)
raised = traced_pow(x, exp)
traced_factorial = mlflow.trace(math.factorial)
factorial = traced_factorial(int(raised))
return response
invocation(4)
上下文管理器
除了装饰器之外,MLflow 还允许使用 mlflow.start_span()
上下文管理器创建 span,然后在任何封装的任意代码块中访问该 span。这对于以比捕获单个函数边界更精细的粒度捕获代码中的复杂交互非常有用。
与装饰器类似,上下文管理器自动捕获父子关系、异常、执行时间,并与自动跟踪配合使用。然而,span 的名称、输入和输出必须手动提供。您可以通过上下文管理器返回的 Span 对象来设置它们。
with mlflow.start_span(name="my_span") as span:
span.set_inputs({"x": 1, "y": 2})
z = x + y
span.set_outputs(z)
下面是一个稍微复杂一些的示例,它结合使用了 mlflow.start_span()
上下文管理器以及装饰器和 OpenAI 的自动跟踪功能。
import mlflow
from mlflow.entities import SpanType
@mlflow.trace(span_type=SpanType.CHAIN)
def start_session():
messages = [{"role": "system", "content": "You are a friendly chat bot"}]
while True:
with mlflow.start_span(name="User") as span:
span.set_inputs(messages)
user_input = input(">> ")
span.set_outputs(user_input)
if user_input == "BYE":
break
messages.append({"role": "user", "content": user_input})
response = openai.OpenAI().chat.completions.create(
model="gpt-4o-mini",
max_tokens=100,
messages=messages,
)
answer = response.choices[0].message.content
print(f"🤖: {answer}")
messages.append({"role": "assistant", "content": answer})
mlflow.openai.autolog()
start_session()
多线程
MLflow Tracing 是线程安全的,traces 默认按线程隔离。但您也可以通过一些额外的步骤创建一个跨多个线程的 trace。
MLflow 使用 Python 内置的 ContextVar 机制来确保线程安全,该机制默认不会跨线程传播。因此,您需要手动将上下文从主线程复制到工作线程,如下面的示例所示。
import contextvars
from concurrent.futures import ThreadPoolExecutor, as_completed
import mlflow
from mlflow.entities import SpanType
import openai
client = openai.OpenAI()
# Enable MLflow Tracing for OpenAI
mlflow.openai.autolog()
@mlflow.trace
def worker(question: str) -> str:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": question},
]
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.1,
max_tokens=100,
)
return response.choices[0].message.content
@mlflow.trace
def main(questions: list[str]) -> list[str]:
results = []
# Almost same as how you would use ThreadPoolExecutor, but two additional steps
# 1. Copy the context in the main thread using copy_context()
# 2. Use ctx.run() to run the worker in the copied context
with ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for question in questions:
ctx = contextvars.copy_context()
futures.append(executor.submit(ctx.run, worker, question))
for future in as_completed(futures):
results.append(future.result())
return results
questions = [
"What is the capital of France?",
"What is the capital of Germany?",
]
main(questions)
相反,ContextVar
默认会复制到异步任务中。因此,使用 asyncio
时无需手动复制上下文,这可能是使用 MLflow Tracing 处理 Python 中并发 I/O 密集型任务的更简便方法。
(高级) 低层客户端 API
当装饰器或上下文管理器不满足您的要求时,您可以使用低层客户端 API。例如,您可能需要从不同的函数开始和结束一个 span。客户端 API 被设计为 MLflow REST API 的一个薄包装,使您对 trace 生命周期有更多控制。有关更多详细信息,请参阅 MLflow Tracing Client APIs 指南。
使用客户端 API 时,请注意以下限制
- 父子关系不会自动捕获。您需要手动传递父 span 的 ID。
- 使用客户端 API 创建的 spans 不会与自动跟踪 spans 结合。
- 标记为实验性的低层 API 可能会根据后端实现更新而更改。