跳到主要内容

MLflow Agent Server

Agent Server 功能

注意

MLflow Agent Server 随 MLflow 3.6.0 发布。目前正在积极开发中,并被标记为实验性。公共 API 可能会发生更改,并且正在添加新功能以增强其功能。

  • 简单的 FastAPI 服务器,用于在 /invocations 端点托管代理
  • 基于装饰器的函数注册(@invoke, @stream),方便开发代理
  • 为 Responses API schema 代理自动进行请求和响应验证
  • 自动 MLflow 跟踪集成和聚合

完整示例

在此示例中,我们将使用 openai-agents-sdk 来定义兼容 Responses API 的代理。有关更多信息,请参阅 openai-agents-sdk 快速入门

  1. 安装 openai-agents-sdk 和 mlflow,并设置您的 OpenAI API 密钥

    bash
    pip install -U openai-agents mlflow>=3.6.0
    export OPENAI_API_KEY=sk-...
  2. agent.py 中定义您的代理,并创建使用 @invoke 注解的方法

    python
    from agents import Agent, Runner
    from mlflow.genai.agent_server import invoke, stream
    from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse

    agent = Agent(
    name="Math Tutor",
    instructions="You provide help with math problems. Explain your reasoning and include examples",
    )


    @invoke()
    async def non_streaming(request: ResponsesAgentRequest) -> ResponsesAgentResponse:
    msgs = [i.model_dump() for i in request.input]
    result = await Runner.run(agent, msgs)
    return ResponsesAgentResponse(
    output=[item.to_input_item() for item in result.new_items]
    )


    # You can also optionally register a @stream function to support streaming responses
  3. 定义一个 start_server.py 文件来启动 AgentServer

    python
    # Need to import the agent to register the functions with the server
    import agent # noqa: F401
    from mlflow.genai.agent_server import (
    AgentServer,
    setup_mlflow_git_based_version_tracking,
    )

    agent_server = AgentServer("ResponsesAgent")
    app = agent_server.app

    # Optionally, set up MLflow git-based version tracking
    # to correspond your agent's traces to a specific git commit
    setup_mlflow_git_based_version_tracking()


    def main():
    # To support multiple workers, pass the app as an import string
    agent_server.run(app_import_string="start_server:app")


    if __name__ == "__main__":
    main()

部署和测试您的代理

使用 --reload 标志运行您的代理服务器,以便在代码更改时自动重新加载服务器

bash
python3 start_server.py --reload
# Pass in a number of workers to support multiple concurrent requests
# python3 start_server.py --workers 4
# Pass in a port to run the server on
# python3 start_server.py --reload --port 8000

向服务器发送请求以测试您的代理

bash
curl -X POST https://:8000/invocations \
-H "Content-Type: application/json" \
-d '{ "input": [{ "role": "user", "content": "What is the 14th Fibonacci number?"}]}'

测试完代理后,您可以在 MLflow UI 中通过点击“Traces”(跟踪)选项卡来查看跟踪记录。

如果您注册了 @stream 函数,可以通过传递 "stream": true 来发送流式请求到服务器

bash
curl -X POST https://:8000/invocations \
-H "Content-Type: application/json" \
-d '{
"input": [{ "role": "user", "content": "What is the 14th Fibonacci number?"}],
"stream": true
}'

评估您的代理

您可以使用 mlflow.genai.evaluate() 来评估您的代理。有关更多信息,请参阅 评估代理 指南和 Scorer 文档。

  1. 定义一个类似 eval_agent.py 的文件来评估您的代理

    python
    import asyncio

    import mlflow

    # need to import agent for our @invoke-registered function to be found
    from agent import agent # noqa: F401
    from mlflow.genai.agent_server import get_invoke_function
    from mlflow.genai.scorers import RelevanceToQuery, Safety
    from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse

    eval_dataset = [
    {
    "inputs": {
    "request": {
    "input": [
    {"role": "user", "content": "What's the 15th Fibonacci number"}
    ]
    }
    },
    "expected_response": "The 15th Fibonacci number is 610.",
    }
    ]


    def sync_invoke_fn(request: dict) -> ResponsesAgentResponse:
    # Get the invoke function that was registered via @invoke decorator in your agent
    invoke_fn = get_invoke_function()
    return asyncio.run(invoke_fn(ResponsesAgentRequest(**request)))


    mlflow.genai.evaluate(
    data=eval_dataset,
    predict_fn=sync_invoke_fn,
    scorers=[RelevanceToQuery(), Safety()],
    )
  2. 运行评估

    bash
    python3 eval_agent.py

    您应该会在控制台输出中看到评估结果和 MLflow 运行信息。在 MLflow UI 中,您可以在实验页面找到评估产生的运行。点击运行名称以在概览窗格中查看聚合的指标和元数据。