用于模型服务的 ResponsesAgent
MLflow 中的 ResponsesAgent 类为服务于处理结构化响应和工具调用能力的生成式 AI 模型提供了一个专门的接口。该代理旨在与 MLflow 的服务基础设施无缝协作,同时提供与 OpenAI 风格 API 的兼容性。
概述
ResponsesAgent
扩展了 MLflow 的 PyFunc 模型接口,以支持需要高级功能的对话式 AI 应用,例如多轮对话、工具调用、多代理编排,以及与 OpenAI 的 Responses API 和 MLflow 模型跟踪的兼容性。
- 📦 结构化请求/响应处理
- 🛠️ 工具调用与函数执行
- 💬 聊天历史管理
- 📊 Token 使用量跟踪
- 🤖 OpenAI API 兼容性
- 🔁 支持返回多条输出消息,包括工具调用的中间输出
- 👥 支持多代理场景
- 📚 与 MLflow 日志记录、追踪和模型服务兼容
- 🔗 确保与 OpenAI Responses API 的兼容性,以便与下游客户端和 UI 无缝集成
这使其成为构建和部署聊天机器人、虚拟助手和其他对话式 AI 应用的理想选择。
主要特性
结构化响应处理
ResponsesAgent 处理符合聊天补全标准的结构化输入和输出
- 消息:处理带有基于角色的消息(系统、用户、助手)的对话历史
- 工具调用:支持带有结构化参数的函数调用
- 使用量跟踪:监控 token 消耗和模型性能
- 元数据:捕获额外的上下文和配置
OpenAI API 兼容性
ResponsesAgent 的设计充分考虑了与 OpenAI API 的完全兼容性,允许与为 OpenAI 聊天补全 API 构建的现有应用程序和工具无缝集成。这种兼容性涵盖了请求格式、响应结构和 API 端点。
MLflow 集成
与 MLflow 生态系统完全集成,包括
- 📈 模型跟踪和版本控制
- 🧪 实验管理
- 🗃️ 模型注册表
- 🚀 部署选项
基本用法
实现 ResponsesAgent
入门
要创建您自己的代理,请子类化 mlflow.pyfunc.ResponsesAgent
并在 predict
方法中实现您的代理逻辑。该实现与框架无关,允许您使用任何代理创作框架。请注意,使用 ResponsesAgent 需要 pydantic>=2
。有关实现示例,请参阅下面的简单聊天代理和工具调用代理。
创建代理输出
在实现您的代理时,您将使用两种主要的输出类型:ResponsesAgentResponse
和 ResponsesAgentStreamEvent
。这些是您应该直接创建的唯一 pydantic 对象。mlflow.types.responses_helpers
中的其余类仅用于验证字典。
如果您想返回不符合标准接口的输出,可以使用 custom_outputs
字段。
以下是一些辅助方法,您可以在 ResponsesAgent 接口内使用它们来创建常见的输出
-
mlflow.pyfunc.ResponsesAgent.create_text_output_item()
-
mlflow.pyfunc.ResponsesAgent.create_function_call_item()
-
mlflow.pyfunc.ResponsesAgent.create_function_call_output_item()
mlflow.pyfunc.ResponsesAgent.create_text_delta()
(仅用于流式传输)
这是一个使用带有自定义输出的 ResponsesAgentResponse
的完整工具调用序列示例
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse
class SimpleResponsesAgent(ResponsesAgent):
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
return ResponsesAgentResponse(
output=[
self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
],
custom_outputs={"key1": "custom-value1"},
)
流式代理输出
对于实时处理,您可以使用流式事件而不是返回完整的响应。流式传输允许您在部分结果可用时发送它们,这对于长时间运行的操作或当您想向用户显示进度时非常有用。
基本文本流式传输
要在 ResponsesAgent 接口内流式传输文本,您应该
- 当块可用时,yield
response.output_text.delta
事件- 它必须有一个
item_id
,将相关事件对应到单个输出项
- 它必须有一个
- yield 一个
response.output_item.done
事件来聚合所有块
from mlflow.types.responses import ResponsesAgentStreamEvent
class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
# stream text, all with the same item_id
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="Hello", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="world", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="!", item_id="msg_1"),
)
# the text output item id should be the same
# item_id as the streamed text deltas
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="Hello world!",
id="msg_1",
),
)
带流式传输的工具调用
您还可以流式传输工具调用及其结果。每个工具调用及其输出都作为单独的 response.output_item.done
事件发送。这使得 MLflow 追踪成为可能,并使客户端更容易重建流式消息历史。
from mlflow.types.responses import ResponsesAgentStreamEvent
class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
)
日志记录与服务
使用代码创建模型方法来记录您的代理。这种方法与框架无关,并支持所有创作框架
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
python_model="agent.py", # replace with your relative path to agent code
name="agent",
)
为方便使用,MLflow 内置了以下功能
- 自动模型签名推断
- 将设置一个遵循 ResponsesAgentRequest 和 ResponsesAgentResponse 模式的输入和输出签名
- 元数据
{"task": "agent/v1/responses"}
将自动附加到您在记录模型时可能传入的任何元数据中
- 输入示例
- 提供输入示例是可选的,默认将使用
mlflow.types.responses.RESPONSES_AGENT_INPUT_EXAMPLE
- 如果您确实提供了输入示例,请确保它是一个符合 ResponsesAgentRequest 模式的字典
- 提供输入示例是可选的,默认将使用
要测试 ResponsesAgent,您可以在记录它之前和之后传递一个遵循 ResponsesAgentRequest 模式的单个输入字典
import mlflow
# load it back from mlflow
loaded_model = mlflow.pyfunc.load_model(logged_agent_info.model_uri)
loaded_model.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)
要服务模型,请参阅服务选项
以获取详细说明。
示例
简单聊天示例
这是一个代理示例,它使用一个简单的工具调用 OpenAI 的 gpt-4o 模型
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import os
from typing import Generator
import mlflow
from mlflow.entities.span import SpanType
from mlflow.models import set_model
from mlflow.pyfunc.model import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
class SimpleResponsesAgent(ResponsesAgent):
def __init__(self, model: str):
self.client = OpenAI()
self.model = model
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
response = self.client.responses.create(input=request.input, model=self.model)
return ResponsesAgentResponse(**response.to_dict())
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for event in self.client.responses.create(
input=request.input, stream=True, model=self.model
):
yield ResponsesAgentStreamEvent(**event.to_dict())
mlflow.openai.autolog()
agent = SimpleResponsesAgent(model="gpt-4o")
set_model(agent)
工具调用示例
这是一个代理示例,它使用一个简单的工具调用 OpenAI 的 gpt-4o 模型
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import json
from typing import Any, Callable, Generator
import os
from uuid import uuid4
import backoff
import mlflow
import openai
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
from pydantic import BaseModel
class ToolInfo(BaseModel):
"""
Class representing a tool for the agent.
- "name" (str): The name of the tool.
- "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
- "exec_fn" (Callable): Function that implements the tool logic
"""
name: str
spec: dict
exec_fn: Callable
class ToolCallingAgent(ResponsesAgent):
"""
Class representing a tool-calling Agent
"""
def __init__(self, model: str, tools: list[ToolInfo]):
"""Initializes the ToolCallingAgent with tools."""
self.model = model
self.client: OpenAI = OpenAI()
self._tools_dict = {tool.name: tool for tool in tools}
def get_tool_specs(self) -> list[dict]:
"""Returns tool specifications in the format OpenAI expects."""
return [tool_info.spec for tool_info in self._tools_dict.values()]
@mlflow.trace(span_type=SpanType.TOOL)
def execute_tool(self, tool_name: str, args: dict) -> Any:
"""Executes the specified tool with the given arguments."""
return self._tools_dict[tool_name].exec_fn(**args)
@backoff.on_exception(backoff.expo, openai.RateLimitError)
@mlflow.trace(span_type=SpanType.LLM)
def call_llm(self, input_messages) -> ResponsesAgentStreamEvent:
return (
self.client.responses.create(
model=self.model,
input=input_messages,
tools=self.get_tool_specs(),
)
.output[0]
.model_dump(exclude_none=True)
)
def handle_tool_call(self, tool_call: dict[str, Any]) -> ResponsesAgentStreamEvent:
"""
Execute tool calls and return a ResponsesAgentStreamEvent w/ tool output
"""
args = json.loads(tool_call["arguments"])
result = str(self.execute_tool(tool_name=tool_call["name"], args=args))
tool_call_output = {
"type": "function_call_output",
"call_id": tool_call["call_id"],
"output": result,
}
return ResponsesAgentStreamEvent(
type="response.output_item.done", item=tool_call_output
)
def call_and_run_tools(
self,
input_messages,
max_iter: int = 10,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for _ in range(max_iter):
last_msg = input_messages[-1]
if (
last_msg.get("type", None) == "message"
and last_msg.get("role", None) == "assistant"
):
return
if last_msg.get("type", None) == "function_call":
tool_call_res = self.handle_tool_call(last_msg)
input_messages.append(tool_call_res.item)
yield tool_call_res
else:
llm_output = self.call_llm(input_messages=input_messages)
input_messages.append(llm_output)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=llm_output,
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item={
"id": str(uuid4()),
"content": [
{
"type": "output_text",
"text": "Max iterations reached. Stopping.",
}
],
"role": "assistant",
"type": "message",
},
)
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
input_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + [
i.model_dump() for i in request.input
]
yield from self.call_and_run_tools(input_messages=input_messages)
tools = [
ToolInfo(
name="get_weather",
spec={
"type": "function",
"name": "get_weather",
"description": "Get current temperature for provided coordinates in celsius.",
"parameters": {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
},
"required": ["latitude", "longitude"],
"additionalProperties": False,
},
"strict": True,
},
exec_fn=lambda latitude, longitude: 70, # dummy tool implementation
)
]
os.environ["OPENAI_API_KEY"] = "your OpenAI API key"
SYSTEM_PROMPT = "You are a helpful assistant that can call tools to get information."
mlflow.openai.autolog()
AGENT = ToolCallingAgent(model="gpt-4o", tools=tools)
mlflow.models.set_model(AGENT)
服务选项
- 本地服务
- Docker 部署
- Databricks 模型服务
启动本地服务器进行开发和测试
mlflow models serve -m models:/<model_id> -p 5000
测试已服务的模型
import requests
response = requests.post(
"https://:5000/invocations",
json={"messages": [{"role": "user", "content": "What's the weather like?"}]},
)
print(response.json())
mlflow models build-docker -m runs:/<run_id>/responses_agent -n my-responses-agent
docker run -p 5000:8080 my-responses-agent
端点创建和管理功能在 Databricks 的托管 MLflow 服务中可用,但在开源 MLflow 中不可用。
先决条件:将已记录的模型注册到 Databricks-UC 中
import mlflow
mlflow.register_model("model:/<model_id>", "<catalog>.<schema>.<model_name>")
对于 Databricks 上的托管 MLflow,ResponsesAgent 模型与 Databricks 模型服务无缝集成
from mlflow.deployments import get_deploy_client
client = get_deploy_client("databricks")
endpoint = client.create_endpoint(
name="unity-catalog-model-endpoint",
config={
"served_entities": [
{
"name": "ads-entity",
"entity_name": "catalog.schema.my-ads-model",
"entity_version": "3",
"workload_size": "Small",
"scale_to_zero_enabled": True,
}
],
"traffic_config": {
"routes": [
{"served_model_name": "my-ads-model-3", "traffic_percentage": 100}
]
},
},
)
模式与类型
ResponsesAgent 使用结构化模式处理请求和响应,请查看 ResponsesAgentRequest 和 ResponsesAgentResponse 以了解详细结构。
# Example Request schema
{
"input": [
{
"role": "user",
"content": "What is the weather like in Boston today?",
}
],
"tools": [
{
"type": "function",
"name": "get_current_weather",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location", "unit"],
},
}
],
}
# Example Response schema
{
"output": [
{
"type": "message",
"id": "some-id",
"status": "completed",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "rainy",
}
],
}
],
}
故障排除
常见问题
- 导入错误:确保所有依赖项都包含在 conda 环境中
- 模式验证:验证输入/输出格式是否与预期模式匹配
- 模型记录问题:使用
models from code
功能来记录模型 - 缺少追踪:通过
mlflow.
或使用.autolog mlflow.start_span
手动追踪来启用追踪
调试
如果您的模型被追踪,启用 追踪 以进行故障排除
import mlflow
# enable autologging if your agent internally uses MLflow tracing-supported libraries internally, such as LangChain, OpenAI, etc.
# mlflow.<flavor>.autolog()
# Test your agent locally before serving
agent = MyResponsesAgent()
agent.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)