超越 Autolog:为新 LLM 提供商添加 MLflow 追踪
在本文中,我们将演示如何通过为 Ollama Python SDK 的 chat
方法添加跟踪支持,从而为新的大型语言模型提供商添加 MLflow 链路追踪。
MLflow 链路追踪是 MLflow 中的一个可观测性工具,用于捕获生成式AI(GenAI)应用和工作流的详细执行轨迹。除了捕获单个调用的输入、输出和元数据外,MLflow 链路追踪还可以捕获中间步骤,例如工具调用、推理步骤、检索步骤或其他自定义步骤。
MLflow 为许多流行的大型语言模型提供商和编排框架提供了内置的链路追踪支持。如果您正在使用这些提供商之一,您可以通过一行代码启用跟踪:mlflow.<provider>.autolog()
。尽管 MLflow 的自动日志记录功能涵盖了许多最广泛使用的大型语言模型提供商和编排框架,但有时您可能需要为不受支持的提供商添加跟踪,或自定义超出自动日志记录范围的跟踪。本文通过以下方式展示了 MLflow 链路追踪的灵活性和可扩展性:
- 为不受支持的提供商(Ollama Python SDK)添加基本链路追踪支持
- 展示如何捕获简单的补全和更复杂的工具调用工作流
- 阐明如何通过对现有代码进行最小的更改来添加跟踪
我们将使用Ollama Python SDK(一个用于Ollama 大型语言模型平台的开源 Python SDK)作为示例。我们将逐步完成整个过程,展示如何在保持与提供商 SDK 良好集成的同时,使用 MLflow 链路追踪捕获关键信息。请注意,MLflow 确实对 Ollama 具有自动日志记录支持,但目前仅限通过 OpenAI 客户端使用,而非直接与 Ollama Python SDK 配合使用。
为新提供商添加 MLflow 链路追踪:通用原则
MLflow 文档中有一份关于贡献 MLflow 链路追踪的优秀指南。尽管在此示例中我们不会直接贡献于 MLflow 本身,但我们将遵循相同的通用原则。
本文假设您对 MLflow 链路追踪是什么以及它是如何工作的有基本了解。如果您是初学者,或者需要复习,请查阅链路追踪概念指南。
为新提供商添加跟踪涉及几个关键考量因素
-
理解提供商的关键功能: 我们首先需要了解哪些 API 方法需要被跟踪,以获取我们想要的跟踪信息。对于大型语言模型推理提供商,这通常涉及聊天补全、工具调用或嵌入生成等操作。在编排框架中,这可能涉及检索、推理、路由或各种自定义步骤等操作。在我们的 Ollama 示例中,我们将重点关注聊天补全 API。这一步骤将根据提供商的不同而有显著差异。
-
将操作映射到 Span: MLflow 链路追踪使用不同的Span 类型来表示不同类型的操作。您可以在此处找到内置 Span 类型的描述。不同的 Span 类型在 MLflow UI 中显示方式不同,并且可以启用特定的功能。在 Span 内部,我们还希望将提供商的输入和输出映射到 MLflow 期望的格式。MLflow 提供了用于记录聊天和工具输入及输出的实用程序,这些内容随后会作为格式化消息显示在 MLflow UI 中。
为新提供商添加跟踪时,我们的主要任务是将提供商的 API 方法映射到具有适当 Span 类型的 MLflow 链路追踪 Span。
-
结构化并保留关键数据: 对于我们想要跟踪的每个操作,我们需要识别要保留的关键信息,并确保它以有用的方式被捕获和显示。例如,我们可能希望捕获控制操作行为的输入和配置数据、解释结果的输出和元数据、过早终止操作的错误等。查看类似提供商的跟踪和跟踪实现可以为如何结构化和保留这些数据提供一个良好的起点。
为 Ollama Python SDK 添加跟踪
既然我们对为新提供商添加跟踪的关键步骤有了高层次的理解,那么让我们逐步完成这个过程,并为 Ollama Python SDK 添加跟踪。
步骤 1:安装并测试 Ollama Python SDK
首先,我们需要安装 Ollama Python SDK,并弄清楚在添加跟踪支持时需要关注哪些方法。您可以使用 pip install ollama-python
安装 Ollama Python SDK。
如果您使用过 OpenAI Python SDK,那么 Ollama Python SDK 会让您感觉非常熟悉。以下是我们使用它进行聊天补全调用的方式:
from ollama import chat
from rich import print
response = chat(model="llama3.2",
messages = [
{"role": "user", "content": "Briefly describe the components of an MLflow model"}
]
)
print(response)
它将返回
ChatResponse(
model='llama3.2',
created_at='2025-01-30T15:57:39.097119Z',
done=True,
done_reason='stop',
total_duration=7687553708,
load_duration=823704250,
prompt_eval_count=35,
prompt_eval_duration=3414000000,
eval_count=215,
eval_duration=3447000000,
message=Message(
role='assistant',
content="In MLflow, a model consists of several key components:\n\n1. **Model Registry**: A centralized
storage for models, containing metadata such as the model's name, version, and description.\n2. **Model Version**:
A specific iteration of a model, represented by a unique version number. This can be thought of as a snapshot of
the model at a particular point in time.\n3. **Model Artifacts**: The actual model code, parameters, and data used
to train the model. These artifacts are stored in the Model Registry and can be easily deployed or reused.\n4.
**Experiment**: A collection of runs that use the same hyperparameters and model version to train and evaluate a
model. Experiments help track progress, provide reproducibility, and facilitate collaboration.\n5. **Run**: An
individual instance of training or testing a model using a specific experiment. Runs capture the output of each
run, including metrics such as accuracy, loss, and more.\n\nThese components work together to enable efficient
model management, version control, and reproducibility in machine learning workflows.",
images=None,
tool_calls=None
)
)
我们已经验证了 Ollama Python SDK 已设置并正常工作。我们也知道在添加跟踪支持时需要关注的方法是:ollama.chat
。
步骤 2:编写一个跟踪装饰器
有几种方法可以为 Ollama 的 SDK 添加跟踪——我们可以直接修改 SDK 代码,创建一个包装类,或者使用 Python 的方法打补丁能力。对于本示例,我们将使用装饰器来为 SDK 的 chat
方法打补丁。这种方法允许我们在不修改 SDK 代码或创建额外包装类的情况下添加跟踪,尽管它确实需要理解 Python 的装饰器模式以及 MLflow 链路追踪的工作原理。
import mlflow
from mlflow.entities import SpanType
from mlflow.tracing.utils import set_span_chat_messages
from functools import wraps
from ollama import chat as ollama_chat
def _get_span_type(task_name: str) -> str:
span_type_mapping = {
"chat": SpanType.CHAT_MODEL,
}
return span_type_mapping.get(task_name, SpanType.UNKNOWN)
def trace_ollama_chat(func):
@wraps(func)
def wrapper(*args, **kwargs):
with mlflow.start_span(
name="ollama.chat",
span_type=_get_span_type("chat"),
) as span:
# Set model name as a span attribute
model_name = kwargs.get("model", "")
span.set_attribute("model_name", model_name)
# Log the inputs
input_messages = kwargs.get("messages", [])
span.set_inputs({
"messages": input_messages,
"model": model_name,
})
# Set input messages
set_span_chat_messages(span, input_messages)
# Make the API call
response = func(*args, **kwargs)
# Log the outputs
if hasattr(response, 'to_dict'):
output = response.to_dict()
else:
output = response
span.set_outputs(output)
output_message = response.message
# Append the output message
set_span_chat_messages(span, [{"role": output_message.role, "content": output_message.content}], append=True)
return response
return wrapper
让我们分解代码,看看它是如何工作的。
-
我们首先定义一个辅助函数
_get_span_type
,它将 Ollama 方法映射到 MLflow Span 类型。这并非严格必要,因为我们目前只跟踪chat
函数,但它展示了一种可以应用于其他方法的模式。这遵循了链路追踪贡献指南中推荐的Anthropic 提供商的参考实现。 -
我们使用
functools.wraps
定义了一个装饰器trace_ollama_chat
,它为chat
函数打补丁。这里有几个关键步骤:-
我们使用
mlflow.start_span
启动一个新的 Span。Span 名称设置为 "ollama.chat",Span 类型设置为_get_span_type
返回的值。 -
我们使用
span.set_attribute
将model_name
设置为 Span 的一个属性。这并非严格必要,因为模型名称将在输入中捕获,但它说明了如何在一个 Span 上设置任意属性。 -
我们使用
span.set_inputs
将消息记录为 Span 的输入。我们通过访问kwargs
字典从messages
参数中获取这些消息。这些消息将记录到 MLflow UI 中 Span 的“输入”部分。我们还将模型名称记录为输入,再次说明如何记录任意输入。 -
我们使用 MLflow 的
set_span_chat_messages
实用函数来格式化输入消息,使其在 MLflow UI 的聊天面板中良好显示。这个辅助函数确保消息被正确格式化,并以适当的样式显示每个消息角色。 -
我们使用
func(*args, **kwargs)
调用原始函数。这是 Ollama 的chat
函数。 -
我们使用
span.set_outputs
将函数的输出记录为 Span 属性。这会获取 Ollama API 的响应并将其设置为 Span 上的一个属性。这些输出将记录到 MLflow UI 中 Span 的“输出”部分。 -
我们从响应中提取输出消息,并再次使用
set_span_chat_messages
将其添加到聊天历史记录中,确保它显示在 MLflow UI 的聊天面板中。 -
最后,我们返回 API 调用的响应,不进行任何更改。现在,当我们使用
trace_ollama_chat
为聊天函数打补丁时,该函数将被跟踪,但其行为仍将正常。
-
几点注意事项:
- 此实现使用简单的装饰器模式来添加跟踪,而无需修改底层的 Ollama SDK 代码。这使其成为一种轻量级且可维护的方法。
set_span_chat_messages
的使用确保了输入和输出消息以用户友好的方式显示在 MLflow UI 的聊天面板中,从而便于跟踪对话流程。- 我们可以通过其他几种方式实现这种跟踪行为。我们可以编写一个包装类,或者使用一个简单的包装函数,用
@mlflow.trace
装饰chat
函数。一些编排框架可能需要更复杂的方法,例如回调或 API 钩子。更多详细信息请参阅MLflow 链路追踪贡献指南。
步骤 3:为 chat
方法打补丁并试用
既然我们有了跟踪装饰器,我们就可以为 Ollama 的 chat
方法打补丁并试用它。
original_chat = ollama_chat
chat = trace_ollama_chat(ollama_chat)
这段代码有效地为当前作用域中的 ollama.chat
函数打补丁。我们首先将原始函数存储在 original_chat
中以备用,然后将 chat
重新分配给装饰后的版本。这意味着我们代码中后续对 chat()
的任何调用都将使用跟踪版本,同时仍保留原始功能。
现在,当我们调用 chat()
时,该方法将被跟踪,并且结果将记录到 MLflow UI 中
mlflow.set_experiment("ollama-tracing")
response = chat(model="llama3.2",
messages = [
{"role": "user", "content": "Briefly describe the components of an MLflow model"}
]
)
跟踪工具和工具调用
Ollama Python SDK 支持工具调用。我们希望记录两件主要的事情:
- 可供大型语言模型使用的工具
- 实际的工具调用,包括特定工具及其传递的参数。
请注意,“工具调用”是指大型语言模型指定使用哪个工具以及传递什么参数——而不是该工具的实际执行。当大型语言模型进行工具调用时,它本质上是在说“这个工具应该用这些参数运行”,而不是自己运行工具。工具的实际执行是单独进行的,通常在应用程序代码中。
这是跟踪代码的更新版本,它为 Ollama 聊天方法打补丁,并记录可用工具和捕获工具调用:
from mlflow.entities import SpanType
from mlflow.tracing.utils import set_span_chat_messages, set_span_chat_tools
from functools import wraps
from ollama import chat as ollama_chat
import json
from uuid import uuid4
def _get_span_type(task_name: str) -> str:
span_type_mapping = {
"chat": SpanType.CHAT_MODEL,
}
return span_type_mapping.get(task_name, SpanType.UNKNOWN)
def trace_ollama_chat(func):
@wraps(func)
def wrapper(*args, **kwargs):
with mlflow.start_span(
name="ollama.chat",
span_type=_get_span_type("chat"),
) as span:
# Set model name as a span attribute
model_name = kwargs.get("model", "")
span.set_attribute("model_name", model_name)
# Log the inputs
input_messages = kwargs.get("messages", [])
tools = kwargs.get("tools", [])
span.set_inputs({
"messages": input_messages,
"model": model_name,
"tools": tools,
})
# Set input messages and tools
set_span_chat_messages(span, input_messages)
if tools:
set_span_chat_tools(span, tools)
# Make the API call
response = func(*args, **kwargs)
# Log the outputs
if hasattr(response, "to_dict"):
output = response.to_dict()
else:
output = response
span.set_outputs(output)
output_message = response.message
# Prepare the output message for span
output_span_message = {
"role": output_message.role,
"content": output_message.content,
}
# Handle tool calls if present
if output_message.tool_calls:
tool_calls = []
for tool_call in output_message.tool_calls:
tool_calls.append({
"id": str(uuid4()),
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": json.dumps(tool_call.function.arguments),
}
})
output_span_message["tool_calls"] = tool_calls
# Append the output message
set_span_chat_messages(span, [output_span_message], append=True)
return response
return wrapper
这里的关键变化是:
- 我们使用
tools = kwargs.get("tools", [])
从tools
参数中提取可用工具列表,将它们记录为输入,并使用set_span_chat_tools
捕获它们以包含在聊天面板中。 - 我们在输出消息中添加了工具调用的特定处理,确保按照 ToolCall 规范对其进行格式化。
现在,让我们用一个简单的消费计算工具来测试这个。工具是根据OpenAI 的工具调用规范定义的。
chat = trace_ollama_chat(ollama_chat)
tools = [
{
"type": "function",
"function": {
"name": "calculate_tip",
"description": "Calculate the tip amount based on the bill amount and tip percentage",
"parameters": {
"type": "object",
"properties": {
"bill_amount": {
"type": "number",
"description": "The total bill amount"
},
"tip_percentage": {
"type": "number",
"description": "The percentage of the bill to be given as a tip, given as a whole number."
}
},
"required": ["bill_amount", "tip_percentage"]
}
}
}
]
response = chat(
model="llama3.2",
messages=[
{"role": "user", "content": "What is the tip for a $187.32 bill with a 22% tip?"}
],
tools=tools,
)
我们可以在 MLflow UI 中检查跟踪,现在其中显示了可用工具和工具调用结果
编排:构建工具调用循环
到目前为止,Ollama 示例在每次聊天补全时只生成一个 Span。但是,许多生成式 AI(GenAI)应用程序包含多个大型语言模型调用、检索步骤、工具执行和其他自定义步骤。尽管我们在此不会详细介绍如何为编排框架添加跟踪,但我们将通过定义一个基于我们之前定义的工具的工具调用循环来阐述一些关键概念。
工具调用循环将遵循以下模式:
- 将用户提示作为输入
- 使用一个或多个工具调用进行响应
- 对于每个工具调用,执行该工具并存储结果
- 将工具调用结果以
tool
角色附加到消息历史记录中 - 再次使用工具调用结果调用大型语言模型,促使其对用户提示给出最终答案
这是一个只包含一个工具调用的实现。
class ToolExecutor:
def __init__(self):
self.tools = [
{
"type": "function",
"function": {
"name": "calculate_tip",
"description": "Calculate the tip amount based on the bill amount and tip percentage",
"parameters": {
"type": "object",
"properties": {
"bill_amount": {
"type": "number",
"description": "The total bill amount"
},
"tip_percentage": {
"type": "number",
"description": "The percentage of the bill to be given as a tip, represented as a whole number."
}
},
"required": ["bill_amount", "tip_percentage"]
}
}
}
]
# Map tool names to their Python implementations
self.tool_implementations = {
"calculate_tip": self._calculate_tip
}
def _calculate_tip(self, bill_amount: float, tip_percentage: float) -> float:
"""Calculate the tip amount based on the bill amount and tip percentage."""
bill_amount = float(bill_amount)
tip_percentage = float(tip_percentage)
return round(bill_amount * (tip_percentage / 100), 2)
def execute_tool_calling_loop(self, messages):
"""Execute a complete tool calling loop with tracing."""
with mlflow.start_span(
name="ToolCallingLoop",
span_type="CHAIN",
) as parent_span:
# Set initial inputs
parent_span.set_inputs({
"initial_messages": messages,
"available_tools": self.tools
})
# Set input messages
set_span_chat_messages(parent_span, messages)
# First LLM call (already traced by our chat method patch)
response = chat(
messages=messages,
model="llama3.2",
tools=self.tools,
)
messages.append(response.message)
tool_calls = response.message.tool_calls
tool_results = []
# Execute tool calls
for tool_call in tool_calls:
with mlflow.start_span(
name=f"ToolExecution_{tool_call.function.name}",
span_type="TOOL",
) as tool_span:
# Parse tool inputs
tool_inputs = tool_call.function.arguments
tool_span.set_inputs(tool_inputs)
# Execute tool
func = self.tool_implementations.get(tool_call.function.name)
if func is None:
raise ValueError(f"No implementation for tool: {tool_call.function.name}")
result = func(**tool_inputs)
tool_span.set_outputs({"result": result})
tool_results.append({
"tool_call_id": str(uuid4()),
"output": str(result)
})
messages.append({
"role": "tool",
"tool_call_id": str(uuid4()),
"content": str(result)
})
# Prepare messages for final response
messages.append({
"role": "user",
"content": "Answer the initial question based on the tool call results. Do not refer to the tool call results in your response. Just give a direct answer."
})
# Final LLM call (already traced by our chat method patch)
final_response = chat(
messages=messages,
model="llama3.2"
)
# Set the final output for the parent span
parent_span.set_outputs({
"final_response": final_response.message.content,
"tool_results": tool_results
})
print(final_response)
# set output messages
set_span_chat_messages(parent_span, [final_response.message.model_dump()], append=True)
return final_response
我们在这个工具调用循环中处理跟踪的方式如下:
- 我们首先使用
mlflow.start_span
为工具调用循环设置一个父 Span。我们将 Span 名称设置为 "ToolCallingLoop",并将 Span 类型设置为 "CHAIN",代表一系列操作。 - 我们将初始消息和可用工具记录为 Span 的输入。这有助于未来的调试,使我们能够验证工具是否可用并配置正确。
- 我们使用打补丁后的
chat
函数进行第一次大型语言模型调用。这个调用已经由我们的装饰器跟踪,所以我们不需要做任何特殊的事情来跟踪它。 - 我们遍历工具调用,执行每个工具并存储结果。每次工具执行都用一个新的 Span 跟踪,该 Span 以工具函数名称命名。输入和输出作为 Span 上的属性进行记录。
- 我们将工具调用结果以
tool
角色附加到消息历史记录中。这使得大型语言模型可以在后续请求中看到工具调用的结果。它还允许我们在 MLflow UI 中查看工具调用结果。 - 我们准备最终响应的消息,包括一个提示,要求根据工具调用结果回答最初的问题。
- 我们使用打补丁后的
chat
函数进行最终的大型语言模型调用。同样,由于我们使用的是打补丁后的函数,此调用也已被跟踪。 - 我们设置了父 Span 的最终输出,包括来自大型语言模型的最终响应和工具结果。
- 最后,我们使用
set_span_chat_messages
将最终响应附加到 MLflow UI 的聊天历史记录中。请注意,为了保持整洁和简单,我们只使用set_span_chat_messages
将用户的初始查询和最终响应记录到父 Span 中。我们可以点击嵌套的 Span 来查看工具调用结果和其他详细信息。
这个过程创建了整个工具调用循环的全面跟踪,从初始请求到工具执行和最终响应。
我们可以按如下方式执行此操作。但是,请注意,在不完全理解它将在您的系统上做什么的情况下,您不应运行由大型语言模型生成或调用的任意代码。
executor = ToolExecutor()
response = executor.execute_tool_calling_loop(
messages=[
{"role": "user", "content": "What is the tip for a $235.32 bill with a 22% tip?"}
]
)
产生以下跟踪:
结论
本文展示了如何将 MLflow 链路追踪扩展到其内置提供商支持之外。我们从一个简单的例子开始——为 Ollama Python SDK 的 chat
方法添加跟踪——并看到如何通过一个轻量级的补丁,捕获每个聊天补全的详细信息。然后我们在此基础上,跟踪了一个更复杂的工具执行循环。
主要的收获是:
- MLflow 链路追踪高度可定制,可以适应没有自动日志记录功能的提供商
- 添加基本的跟踪支持通常只需要最少的代码更改。在本例中,我们为 Ollama Python SDK 的
chat
方法打补丁,并编写了几行代码来添加跟踪支持。 - 用于简单 API 调用的相同原则可以扩展到包含多个步骤的复杂工作流。在本例中,我们跟踪了一个包含多个步骤和工具调用的工具调用循环。