跳到主要内容

ResponsesAgent 介绍

什么是 ResponsesAgent?

ResponsesAgentPythonModel 的一个子类,它提供了一种与框架无关的方式来创建代理模型。使用 ResponsesAgent 构建代理具有以下优点:

  • 支持返回多个输出消息,包括工具调用产生的中间输出
  • 支持多代理场景
  • 确保与 MLflow 的日志记录、跟踪和模型服务兼容
  • 确保您的模型与 OpenAI Responses API 兼容,从而与 OpenAI 的 responses 客户端和其他下游 UI/应用程序兼容

我们推荐使用 ResponsesAgent 而不是 ChatModelChatAgent,因为它拥有 ChatAgent 的所有优点,并支持注释等额外功能。

构建 ResponsesAgent

入门

要创建自己的代理,请继承 mlflow.pyfunc.ResponsesAgent 并在 predict 方法中实现您的代理逻辑。该实现与框架无关,允许您使用任何代理构建框架。请注意,使用 ResponsesAgent 需要 pydantic>=2。有关调用 Chat Completions LLM、Responses API LLM、包装 LangGraph 代理和工具调用代理的示例实现,请参阅下面的代码片段

创建代理输出

在实现代理时,您将处理两种主要的输出类型:ResponsesAgentResponseResponsesAgentStreamEvent。这些是您应该直接创建的唯一 pydantic 对象。mlflow.types.responses_helpers 中其余的类仅用于验证字典。

如果您想返回不符合标准接口的输出,可以使用 custom_outputs 字段。

以下是一些您可以在 ResponsesAgent 界面中用于创建常见输出的辅助方法:

这是一个使用 ResponsesAgentResponse 和自定义输出的完整工具调用序列示例

python
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse


class SimpleResponsesAgent(ResponsesAgent):
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
return ResponsesAgentResponse(
output=[
self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
],
custom_outputs={"key1": "custom-value1"},
)

流式传输代理输出

为了实现实时处理,您可以使用流式事件而不是返回完整的响应。流式传输允许您在部分结果可用时发送它们,这对于长时间运行的操作或希望向用户显示进度的场景非常有用。

基本文本流式传输

要在 ResponsesAgent 界面中流式传输文本,您应该:

  • 使用 response.output_text.delta 事件来产生块(chunks)
    • 它必须有一个与相关事件关联到单个输出项的 item_id
  • 产生一个 response.output_item.done 事件来聚合所有块
python
from mlflow.types.responses import ResponsesAgentStreamEvent


class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
# stream text, all with the same item_id
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="Hello", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="world", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="!", item_id="msg_1"),
)

# the text output item id should be the same
# item_id as the streamed text deltas
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="Hello world!",
id="msg_1",
),
)

工具调用流式传输

您还可以流式传输工具调用及其结果。每个工具调用及其输出都作为单独的 response.output_item.done 事件发送。这支持 MLflow 跟踪,并使客户端更容易重建流式消息历史记录。

python
from mlflow.types.responses import ResponsesAgentStreamEvent


class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
)

记录您的代理

使用“代码生成模型”(Models-from-code)方法记录您的代理。这种方法与框架无关,并支持所有构建框架。

python
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
python_model="agent.py", # replace with your relative path to agent code
name="agent",
)

为方便使用,MLflow 内置了以下功能:

  • 自动模型签名推断
    • 一个遵循 ResponsesAgentRequest 和 ResponsesAgentResponse 架构的输入和输出签名将被设置
  • Metadata
    • {"task": "agent/v1/responses"} 将自动附加到您在记录模型时可能传入的任何元数据中
  • 输入示例
    • 提供输入示例是可选的,默认将使用 mlflow.types.responses.RESPONSES_AGENT_INPUT_EXAMPLE
    • 如果您确实提供了输入示例,请确保它是一个符合 ResponsesAgentRequest 架构的字典

测试您的代理

要测试 ResponsesAgent,您可以传递一个遵循 ResponsesAgentRequest 架构的单个输入字典,无论是在记录之前还是之后。

python
from mlflow.pyfunc import ResponsesAgent


class MyResponsesAgent(ResponsesAgent):
...


responses_agent = MyResponsesAgent()
responses_agent.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)
# ... log responses_agent using code from above
# load it back from mlflow
loaded_model = mlflow.pyfunc.load_model(path)
loaded_model.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)

ChatAgent 迁移

ChatAgent 迁移到 ResponsesAgent 时,主要任务是将您的消息格式从 ChatCompletion API 适配到 Responses API 架构。有关这些更改的详细信息,请参阅 OpenAI 文档。

ResponsesAgent 界面扩展了 ChatAgent 中先前可用的所有功能,同时引入了新功能。下面,我们概述了在常见用例中这两种界面在消息表示方面的关键区别:

标准文本响应

ResponsesAgent

json
{
"type": "message",
"id": "",
"content": [
{
"annotations": [],
"text": "",
"type": "output_text"
}
],
"role": "assistant",
"status": "completed"
}

ChatAgent

json
{
"role": "assistant",
"content": ""
}

工具调用

ResponsesAgent

json
{
"type": "function_call",
"id": "fc_1",
"arguments": "",
"call_id": "call_1",
"name": "",
"status": "completed"
}

ChatAgent

json
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "",
"arguments": ""
}
}
]
}

工具调用结果

ResponsesAgent

json
{
"type": "function_call_output",
"call_id": "call_1",
"output": ""
}

ChatAgent

json
{
"role": "tool",
"content": "12",
"tool_call_id": "call_1"
}

工具定义

ResponsesAgent

json
{
"name": "",
"parameters": {},
"strict": true,
"type": "function",
"description": ""
}

ChatAgent

json
{
"type": "function",
"function": {
"name": "",
"description": "",
"parameters": {},
"strict": true
}
}

示例

带有 ChatCompletions LLM 的简单聊天示例

这是一个调用 OpenAI 的 gpt-5 模型并使用 ChatCompletions API 的代理示例

python
import mlflow
from mlflow.models import set_model
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
output_to_responses_items_stream,
to_chat_completions_input,
)
from openai import OpenAI

client = OpenAI()


class SimpleResponsesAgent(ResponsesAgent):
def call_llm(self, messages):
for chunk in client.chat.completions.create(
model="gpt-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()

def predict(self, request: ResponsesAgentRequest):
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)

def predict_stream(self, request: ResponsesAgentRequest):
messages = to_chat_completions_input([i.model_dump() for i in request.input])

yield from output_to_responses_items_stream(self.call_llm(messages))


mlflow.openai.autolog()
agent = SimpleResponsesAgent()
set_model(agent)

带有 Responses API LLM 的简单聊天示例

这是一个调用 OpenAI 的 gpt-4o 模型并使用 Responses API 的代理示例

python
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import os
from typing import Generator

import mlflow
from mlflow.entities.span import SpanType
from mlflow.models import set_model
from mlflow.pyfunc.model import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI


class SimpleResponsesAgent(ResponsesAgent):
def __init__(self, model: str):
self.client = OpenAI()
self.model = model

@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
response = self.client.responses.create(input=request.input, model=self.model)
return ResponsesAgentResponse(**response.to_dict())

@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for event in self.client.responses.create(
input=request.input, stream=True, model=self.model
):
yield ResponsesAgentStreamEvent(**event.to_dict())


mlflow.openai.autolog()
agent = SimpleResponsesAgent(model="gpt-4o")
set_model(agent)

包装 LangGraph 代理

这是一个将 LangGraph 代理包装在 ResponsesAgent 中的示例

python
from typing import Generator

import mlflow
from langgraph.graph.state import CompiledStateGraph
from mlflow.models import set_model
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
output_to_responses_items_stream,
to_chat_completions_input,
)


class LangGraphResponsesAgent(ResponsesAgent):
def __init__(self, agent: CompiledStateGraph):
self.agent = agent

def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)

def predict_stream(
self,
request: ResponsesAgentRequest,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
cc_msgs = to_chat_completions_input([i.model_dump() for i in request.input])

for _, events in self.agent.stream(
{"messages": cc_msgs}, stream_mode=["updates"]
):
for node_data in events.values():
yield from output_to_responses_items_stream(node_data["messages"])


mlflow.langchain.autolog()
graph = None # TODO: replace with your compiled LangGraph agent
agent = LangGraphResponsesAgent(graph)
set_model(agent)

工具调用示例

这是一个调用 OpenAI 的 gpt-4o 模型并使用简单工具的代理示例

python
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import json
from typing import Any, Callable, Generator
import os
from uuid import uuid4

import backoff
import mlflow
import openai
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
from pydantic import BaseModel


class ToolInfo(BaseModel):
"""
Class representing a tool for the agent.
- "name" (str): The name of the tool.
- "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
- "exec_fn" (Callable): Function that implements the tool logic
"""

name: str
spec: dict
exec_fn: Callable


class ToolCallingAgent(ResponsesAgent):
"""
Class representing a tool-calling Agent
"""

def __init__(self, model: str, tools: list[ToolInfo]):
"""Initializes the ToolCallingAgent with tools."""
self.model = model
self.client: OpenAI = OpenAI()
self._tools_dict = {tool.name: tool for tool in tools}

def get_tool_specs(self) -> list[dict]:
"""Returns tool specifications in the format OpenAI expects."""
return [tool_info.spec for tool_info in self._tools_dict.values()]

@mlflow.trace(span_type=SpanType.TOOL)
def execute_tool(self, tool_name: str, args: dict) -> Any:
"""Executes the specified tool with the given arguments."""
return self._tools_dict[tool_name].exec_fn(**args)

@backoff.on_exception(backoff.expo, openai.RateLimitError)
@mlflow.trace(span_type=SpanType.LLM)
def call_llm(self, input_messages) -> ResponsesAgentStreamEvent:
return (
self.client.responses.create(
model=self.model,
input=input_messages,
tools=self.get_tool_specs(),
)
.output[0]
.model_dump(exclude_none=True)
)

def handle_tool_call(self, tool_call: dict[str, Any]) -> ResponsesAgentStreamEvent:
"""
Execute tool calls and return a ResponsesAgentStreamEvent w/ tool output
"""
args = json.loads(tool_call["arguments"])
result = str(self.execute_tool(tool_name=tool_call["name"], args=args))

tool_call_output = {
"type": "function_call_output",
"call_id": tool_call["call_id"],
"output": result,
}
return ResponsesAgentStreamEvent(
type="response.output_item.done", item=tool_call_output
)

def call_and_run_tools(
self,
input_messages,
max_iter: int = 10,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for _ in range(max_iter):
last_msg = input_messages[-1]
if (
last_msg.get("type", None) == "message"
and last_msg.get("role", None) == "assistant"
):
return
if last_msg.get("type", None) == "function_call":
tool_call_res = self.handle_tool_call(last_msg)
input_messages.append(tool_call_res.item)
yield tool_call_res
else:
llm_output = self.call_llm(input_messages=input_messages)
input_messages.append(llm_output)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=llm_output,
)

yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item={
"id": str(uuid4()),
"content": [
{
"type": "output_text",
"text": "Max iterations reached. Stopping.",
}
],
"role": "assistant",
"type": "message",
},
)

@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)

@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
input_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + [
i.model_dump() for i in request.input
]
yield from self.call_and_run_tools(input_messages=input_messages)


tools = [
ToolInfo(
name="get_weather",
spec={
"type": "function",
"name": "get_weather",
"description": "Get current temperature for provided coordinates in celsius.",
"parameters": {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
},
"required": ["latitude", "longitude"],
"additionalProperties": False,
},
"strict": True,
},
exec_fn=lambda latitude, longitude: 70, # dummy tool implementation
)
]

os.environ["OPENAI_API_KEY"] = "your OpenAI API key"

SYSTEM_PROMPT = "You are a helpful assistant that can call tools to get information."
mlflow.openai.autolog()
AGENT = ToolCallingAgent(model="gpt-4o", tools=tools)
mlflow.models.set_model(AGENT)