跳到主要内容

ResponsesAgent 模型服务代理

MLflow 中的 ResponsesAgent 类提供了一个专门的接口,用于服务具有工具调用能力、处理结构化响应的生成式 AI 模型。该代理旨在与 MLflow 的服务基础设施无缝集成,同时与 OpenAI 风格的 API 保持兼容。

概述

ResponsesAgent 扩展了 MLflow 的 PyFunc 模型接口,以支持需要高级功能(如多轮对话、工具调用、多代理编排以及与 OpenAI 的 Responses API 和 MLflow 模型跟踪的兼容性)的会话式 AI 应用程序。

  • 📦 结构化请求/响应处理
  • 🛠️ 工具调用和函数执行
  • 💬 聊天历史管理
  • 📊 Token 使用量跟踪
  • 🤖 OpenAI API 兼容性
  • 🔁 支持返回多个输出消息,包括来自工具调用的中间输出
  • 👥 支持多代理场景
  • 📚 与 MLflow 日志记录、跟踪和模型服务兼容
  • 🔗 确保与 OpenAI Responses API 兼容,以便与下游客户端和 UI 无缝集成

这使其成为构建和部署聊天机器人、虚拟助手和其他会话式 AI 应用程序的理想选择。

主要特性

结构化响应处理

ResponsesAgent 处理符合聊天完成标准的结构化输入和输出

  • 消息:处理具有角色(系统、用户、助手)的消息的对话历史记录
  • 工具调用:支持带有结构化参数的函数调用
  • 使用量跟踪:监控 Token 消耗和模型性能
  • 元数据:捕获额外的上下文和配置

OpenAI API 兼容性

ResponsesAgent 的设计考虑了完整的 OpenAI API 兼容性,允许与为 OpenAI 的聊天完成 API 构建的现有应用程序和工具无缝集成。这种兼容性涵盖请求格式、响应结构和 API 端点。

MLflow 集成

与 MLflow 生态系统完全集成,包括

  • 📈 模型跟踪和版本控制
  • 🧪 实验管理
  • 🗃️ 模型注册表
  • 🚀 部署选项

基本用法

实现 ResponsesAgent

入门

要创建自己的代理,请继承 mlflow.pyfunc.ResponsesAgent 并实现 predict 方法中的代理逻辑。该实现是框架无关的,允许您使用任何代理编写框架。请注意,使用 ResponsesAgent 需要 pydantic>=2。有关调用 Chat Completions LLM、Responses API LLM、包装 LangGraph 代理和工具调用代理的示例实现,请参阅下面的代码片段

创建代理输出

在实现代理时,您将使用两种主要的输出类型:ResponsesAgentResponseResponsesAgentStreamEvent。这是您应该直接创建的唯一 Pydantic 对象。mlflow.types.responses_helpers 中的其余类仅用于验证字典。

如果您想返回不符合标准接口的输出,可以使用 custom_outputs 字段。

以下是一些您可以在 ResponsesAgent 接口中创建常见输出的辅助方法

这是一个使用 ResponsesAgentResponse 和自定义输出的完整工具调用序列示例

python
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse


class SimpleResponsesAgent(ResponsesAgent):
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
return ResponsesAgentResponse(
output=[
self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
],
custom_outputs={"key1": "custom-value1"},
)

流式代理输出

对于实时处理,您可以使用流式事件而不是返回完整的响应。流式传输允许您在可用时发送部分结果,这对于长时间运行的操作或当您想向用户显示进度时非常有用。

基本文本流式传输

要在 ResponsesAgent 接口中流式传输文本,您应该

  • yield response.output_text.delta 事件,其中包含可用块
    • 它必须有一个 item_id,用于将相关事件与单个输出项关联
  • yield 一个 response.output_item.done 事件来聚合所有块
python
from mlflow.types.responses import ResponsesAgentStreamEvent


class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
# stream text, all with the same item_id
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="Hello", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="world", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="!", item_id="msg_1"),
)

# the text output item id should be the same
# item_id as the streamed text deltas
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="Hello world!",
id="msg_1",
),
)
流式工具调用

您还可以流式传输工具调用及其结果。每个工具调用及其输出都作为单独的 response.output_item.done 事件发送。这使 MLflow 跟踪更加便捷,并使客户端更容易重建流式消息历史记录。

python
from mlflow.types.responses import ResponsesAgentStreamEvent


class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
)

日志记录和模型服务

使用代码模型方法记录您的代理。此方法不依赖于特定框架,并支持所有编写框架

python
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
python_model="agent.py", # replace with your relative path to agent code
name="agent",
)

为方便起见,MLflow 内置了以下功能

  • 自动模型签名推断
    • 将遵循 ResponsesAgentRequest 和 ResponsesAgentResponse 架构的输入和输出签名
  • Metadata
    • {"task": "agent/v1/responses"} 将自动追加到您在记录模型时可能传入的任何元数据中
  • 输入示例
    • 提供输入示例是可选的,将默认使用 mlflow.types.responses.RESPONSES_AGENT_INPUT_EXAMPLE
    • 如果您确实提供了输入示例,请确保它是符合 ResponsesAgentRequest 架构的字典

要测试 ResponsesAgent,您可以在记录模型之前和之后,传递一个符合 ResponsesAgentRequest 架构的单个输入字典

python
import mlflow

# load it back from mlflow
loaded_model = mlflow.pyfunc.load_model(logged_agent_info.model_uri)
loaded_model.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)

要服务模型,请参阅服务选项以获取详细说明。

示例

带有 ChatCompletions LLM 的简单聊天示例

这是一个调用 OpenAI 的 gpt-5 模型并使用 ChatCompletions API 的代理示例

python
import mlflow
from mlflow.models import set_model
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
output_to_responses_items_stream,
to_chat_completions_input,
)
from openai import OpenAI

client = OpenAI()


class SimpleResponsesAgent(ResponsesAgent):
def call_llm(self, messages):
for chunk in client.chat.completions.create(
model="gpt-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()

def predict(self, request: ResponsesAgentRequest):
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)

def predict_stream(self, request: ResponsesAgentRequest):
messages = to_chat_completions_input([i.model_dump() for i in request.input])

yield from output_to_responses_items_stream(self.call_llm(messages))


mlflow.openai.autolog()
agent = SimpleResponsesAgent()
set_model(agent)

带有 Responses API LLM 的简单聊天示例

这是一个调用 OpenAI 的 gpt-4o 模型并使用 Responses API 的代理示例

python
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import os
from typing import Generator

import mlflow
from mlflow.entities.span import SpanType
from mlflow.models import set_model
from mlflow.pyfunc.model import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI


class SimpleResponsesAgent(ResponsesAgent):
def __init__(self, model: str):
self.client = OpenAI()
self.model = model

@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
response = self.client.responses.create(input=request.input, model=self.model)
return ResponsesAgentResponse(**response.to_dict())

@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for event in self.client.responses.create(
input=request.input, stream=True, model=self.model
):
yield ResponsesAgentStreamEvent(**event.to_dict())


mlflow.openai.autolog()
agent = SimpleResponsesAgent(model="gpt-4o")
set_model(agent)

包装 LangGraph 代理

这是一个在 ResponsesAgent 中包装 LangGraph 代理的示例

python
from typing import Generator

import mlflow
from langgraph.graph.state import CompiledStateGraph
from mlflow.models import set_model
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
output_to_responses_items_stream,
to_chat_completions_input,
)


class LangGraphResponsesAgent(ResponsesAgent):
def __init__(self, agent: CompiledStateGraph):
self.agent = agent

def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)

def predict_stream(
self,
request: ResponsesAgentRequest,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
cc_msgs = to_chat_completions_input([i.model_dump() for i in request.input])

for _, events in self.agent.stream(
{"messages": cc_msgs}, stream_mode=["updates"]
):
for node_data in events.values():
yield from output_to_responses_items_stream(node_data["messages"])


mlflow.langchain.autolog()
graph = None # TODO: replace with your compiled LangGraph agent
agent = LangGraphResponsesAgent(graph)
set_model(agent)

工具调用示例

这是一个调用 OpenAI 的 gpt-4o 模型并使用简单工具的代理示例

python
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import json
from typing import Any, Callable, Generator
import os
from uuid import uuid4

import backoff
import mlflow
import openai
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
from pydantic import BaseModel


class ToolInfo(BaseModel):
"""
Class representing a tool for the agent.
- "name" (str): The name of the tool.
- "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
- "exec_fn" (Callable): Function that implements the tool logic
"""

name: str
spec: dict
exec_fn: Callable


class ToolCallingAgent(ResponsesAgent):
"""
Class representing a tool-calling Agent
"""

def __init__(self, model: str, tools: list[ToolInfo]):
"""Initializes the ToolCallingAgent with tools."""
self.model = model
self.client: OpenAI = OpenAI()
self._tools_dict = {tool.name: tool for tool in tools}

def get_tool_specs(self) -> list[dict]:
"""Returns tool specifications in the format OpenAI expects."""
return [tool_info.spec for tool_info in self._tools_dict.values()]

@mlflow.trace(span_type=SpanType.TOOL)
def execute_tool(self, tool_name: str, args: dict) -> Any:
"""Executes the specified tool with the given arguments."""
return self._tools_dict[tool_name].exec_fn(**args)

@backoff.on_exception(backoff.expo, openai.RateLimitError)
@mlflow.trace(span_type=SpanType.LLM)
def call_llm(self, input_messages) -> ResponsesAgentStreamEvent:
return (
self.client.responses.create(
model=self.model,
input=input_messages,
tools=self.get_tool_specs(),
)
.output[0]
.model_dump(exclude_none=True)
)

def handle_tool_call(self, tool_call: dict[str, Any]) -> ResponsesAgentStreamEvent:
"""
Execute tool calls and return a ResponsesAgentStreamEvent w/ tool output
"""
args = json.loads(tool_call["arguments"])
result = str(self.execute_tool(tool_name=tool_call["name"], args=args))

tool_call_output = {
"type": "function_call_output",
"call_id": tool_call["call_id"],
"output": result,
}
return ResponsesAgentStreamEvent(
type="response.output_item.done", item=tool_call_output
)

def call_and_run_tools(
self,
input_messages,
max_iter: int = 10,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for _ in range(max_iter):
last_msg = input_messages[-1]
if (
last_msg.get("type", None) == "message"
and last_msg.get("role", None) == "assistant"
):
return
if last_msg.get("type", None) == "function_call":
tool_call_res = self.handle_tool_call(last_msg)
input_messages.append(tool_call_res.item)
yield tool_call_res
else:
llm_output = self.call_llm(input_messages=input_messages)
input_messages.append(llm_output)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=llm_output,
)

yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item={
"id": str(uuid4()),
"content": [
{
"type": "output_text",
"text": "Max iterations reached. Stopping.",
}
],
"role": "assistant",
"type": "message",
},
)

@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)

@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
input_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + [
i.model_dump() for i in request.input
]
yield from self.call_and_run_tools(input_messages=input_messages)


tools = [
ToolInfo(
name="get_weather",
spec={
"type": "function",
"name": "get_weather",
"description": "Get current temperature for provided coordinates in celsius.",
"parameters": {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
},
"required": ["latitude", "longitude"],
"additionalProperties": False,
},
"strict": True,
},
exec_fn=lambda latitude, longitude: 70, # dummy tool implementation
)
]

os.environ["OPENAI_API_KEY"] = "your OpenAI API key"

SYSTEM_PROMPT = "You are a helpful assistant that can call tools to get information."
mlflow.openai.autolog()
AGENT = ToolCallingAgent(model="gpt-4o", tools=tools)
mlflow.models.set_model(AGENT)

服务选项

为开发和测试启动本地服务器

bash
mlflow models serve -m models:/<model_id> -p 5000

测试已服务的模型

python
import requests

response = requests.post(
"https://:5000/invocations",
json={"messages": [{"role": "user", "content": "What's the weather like?"}]},
)

print(response.json())

架构和类型

ResponsesAgent 使用结构化架构进行请求和响应,请查看 ResponsesAgentRequestResponsesAgentResponse 以获取详细结构。

python
# Example Request schema
{
"input": [
{
"role": "user",
"content": "What is the weather like in Boston today?",
}
],
"tools": [
{
"type": "function",
"name": "get_current_weather",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location", "unit"],
},
}
],
}

# Example Response schema
{
"output": [
{
"type": "message",
"id": "some-id",
"status": "completed",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "rainy",
}
],
}
],
}

故障排除

常见问题

  1. 导入错误:确保所有依赖项都包含在 conda 环境中

  2. 架构验证:验证输入/输出格式是否符合预期架构

  3. 模型记录问题:使用 代码模型 功能来记录模型

  4. 缺少跟踪:通过 mlflow.<flavor>.autolog 或使用 mlflow.start_span 进行手动跟踪来启用跟踪

  5. 长时间运行的代理预测超时错误:如果预测需要几分钟(例如,具有复杂工具调用的代理),则需要配置两个单独的超时

    • MLFLOW_DEPLOYMENT_PREDICT_TIMEOUT:单个 HTTP 请求的最大时间(默认值:120 秒)
    • MLFLOW_DEPLOYMENT_PREDICT_TOTAL_TIMEOUT:所有重试尝试的总最大时间(默认值:600 秒)
    警告

    MLFLOW_DEPLOYMENT_PREDICT_TOTAL_TIMEOUT 不会取消挂起的 HTTP 请求。它仅在每个请求完成后检查已过时间。挂起的请求将继续,直到达到 MLFLOW_DEPLOYMENT_PREDICT_TIMEOUT,这可能会导致 Token 浪费和重复的跟踪条目。

    对于需要 10 分钟每次查询的长时间运行的代理

    python
    import os

    # Allow individual requests up to 15 minutes
    os.environ["MLFLOW_DEPLOYMENT_PREDICT_TIMEOUT"] = "900"

    # Allow total retry time up to 20 minutes
    os.environ["MLFLOW_DEPLOYMENT_PREDICT_TOTAL_TIMEOUT"] = "1200"

    from mlflow.deployments import get_deploy_client

    client = get_deploy_client("databricks")
    response = client.predict(endpoint="my-agent", inputs=data)
    提示
    • 如果最长的查询需要 N 秒,请将 MLFLOW_DEPLOYMENT_PREDICT_TIMEOUT 设置为至少 N 秒
    • MLFLOW_DEPLOYMENT_PREDICT_TOTAL_TIMEOUT 设置为 >= MLFLOW_DEPLOYMENT_PREDICT_TIMEOUT

调试

启用 跟踪 进行故障排除,如果您的模型被跟踪

python
import mlflow

# enable autologging if your agent internally uses MLflow tracing-supported libraries internally, such as LangChain, OpenAI, etc.
# mlflow.<flavor>.autolog()

# Test your agent locally before serving
agent = MyResponsesAgent()
agent.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)