跳至主要内容

教程:使用 ChatModel 构建自定义生成式 AI 模型

生成式人工智能 (GenAI) 的快速发展带来了令人兴奋的机遇和集成挑战。为了有效利用最新的 GenAI 进展,开发者需要一个既灵活又标准化的框架。MLflow 在 mlflow.pyfunc.ChatModel 类中解决了这一需求,该类在 2.11.0 版本中引入,为 GenAI 应用提供了一致的接口,同时简化了部署和测试。

选择 ChatModel 还是 PythonModel

在 MLflow 中构建 GenAI 应用时,选择合适的模型抽象至关重要,这需要在易用性和所需的自定义程度之间取得平衡。MLflow 为此提供了两个主要类:mlflow.pyfunc.ChatModelmlflow.pyfunc.PythonModel。它们各有优势和权衡,因此了解哪一个最适合您的用例至关重要。

ChatModelPythonModel
何时使用在您想开发和部署与 OpenAI 规范兼容的标准聊天模式的对话模型时使用。在您想完全控制模型接口或自定义模型行为的每个方面时使用。
接口固定为 OpenAI 的聊天模式。完全控制模型的输入和输出模式。
设置快速。对于对话式应用来说,开箱即用,具有预定义的模型签名和输入示例。自定义。您需要自己定义模型签名或输入示例。
复杂性。标准化接口简化了模型的部署和集成。。部署和集成自定义 PythonModel 可能并不简单。例如,模型需要处理 Pandas DataFrame,因为 MLflow 在将输入数据传递给 PythonModel 之前会将其转换为 DataFrame。

本教程的目的

本教程将引导您使用 MLflow 的 mlflow.pyfunc.ChatModel 类创建自定义聊天代理。

在本教程结束时,您将

前提条件

  • 熟悉 MLflow 日志记录 API 和 GenAI 概念。
  • 已安装 MLflow 2.11.0 或更高版本才能使用 mlflow.pyfunc.ChatModel
  • 已安装 MLflow 2.14.0 或更高版本才能使用 MLflow Tracing

本教程纯粹将 Databricks Foundation Model API 作为与外部服务交互的示例。您可以轻松地将提供者示例替换为任何托管的 LLM 托管服务(Amazon BedrockAzure AI StudioOpenAIAnthropic 等等)。

核心概念

GenAI 追踪自定义

MLflow Tracing 允许您监控和记录模型方法的执行过程,在调试和性能优化期间提供有价值的见解。

在我们的示例 BasicAgent 实现中,我们使用了两种不同的 API 来启动追踪跨度(span):装饰器 API 和流式 API。

装饰器 API

@mlflow.trace
def _get_system_message(self, role: str) -> Dict:
if role not in self.models:
raise ValueError(f"Unknown role: {role}")

instruction = self.models[role]["instruction"]
return ChatMessage(role="system", content=instruction).to_dict()

使用 @mlflow.trace 追踪装饰器是将追踪功能添加到函数和方法的最简单方法。默认情况下,由该装饰器应用生成的跨度将使用函数名称作为跨度名称。可以如下所示覆盖此命名以及与跨度相关的其他参数:

@mlflow.trace(name="custom_span_name", attributes={"key": "value"}, span_type="func")
def _get_system_message(self, role: str) -> Dict:
if role not in self.models:
raise ValueError(f"Unknown role: {role}")

instruction = self.models[role]["instruction"]
return ChatMessage(role="system", content=instruction).to_dict()
提示

建议始终为您生成的任何跨度设置一个人类可读的名称,尤其是在您对私有或通用命名函数或方法进行插桩时。MLflow Trace UI 默认会显示函数或方法的名称,如果您的函数和方法命名模糊,则可能难以理解。

流式 API

fluent APIs 上下文处理器实现用于启动跨度,当您需要完全控制跨度数据的各个方面进行日志记录时非常有用。

下文展示了我们应用程序中用于确保捕获通过 load_context 方法加载模型时设置的参数的示例。我们从实例属性 self.models_configself.models 中提取信息来设置跨度的属性。

with mlflow.start_span("Audit Agent") as root_span:
root_span.set_inputs(messages)
attributes = {**params.to_dict(), **self.models_config, **self.models}
root_span.set_attributes(attributes)
# More span manipulation...

MLflow UI 中的追踪

运行包含这些用于追踪跨度生成和插桩的组合使用模式的示例后,

Traces in the MLflow UI for the Agent example

我们示例中的关键类和方法

  • BasicAgent:我们扩展了 ChatModel 的自定义聊天代理类。
  • _get_system_message:检索特定角色的系统消息配置。
  • get_agent_response:向端点发送消息并检索响应。
  • _call_agent:管理代理角色之间的对话流程。
  • prepare_message_list:准备要发送的消息列表。
  • load_context:初始化模型上下文和配置。
  • predict:处理聊天模型的预测逻辑。

在上面列出的这些方法中,load_contextpredict 方法覆盖了 ChatModel 的基础抽象实现。为了定义 ChatModel 的子类,您必须(至少)实现 predict 方法。load_context 方法仅在您实现自定义加载逻辑(如我们在下面将要做的)时使用,其中需要加载静态配置以使模型对象工作,或者需要执行额外的依赖逻辑才能使对象实例化正常运行。

自定义 ChatModel 示例

在下面的完整示例中,我们通过子类化 mlflow.pyfunc.ChatModel 来创建一个自定义聊天代理。这个名为 BasicAgent 的代理利用了几个重要特性,有助于简化 GenAI 应用程序的开发、部署和追踪。通过子类化 ChatModel,我们确保了用于处理对话代理的一致接口,同时避免了与更通用模型相关的常见陷阱。

下面的实现强调了以下关键方面:

  • 追踪:我们利用 MLflow 的追踪功能,使用装饰器和流式 API 上下文处理器方法来追踪和记录关键操作。
    • 装饰器 API:这用于轻松追踪诸如 _get_agent_response_call_agent 之类的方法,以自动创建跨度。
    • 流式 API:提供了对跨度创建的精细控制,如 predict 方法所示,用于审计代理交互期间的关键输入和输出。
    • 提示:我们确保跨度的名称具有人类可读性,以便在 MLflow Trace UI 中更容易调试,以及通过客户端 API 获取记录的追踪时。
  • 自定义配置:
    • 模型配置:通过在模型记录期间传递自定义配置(使用 model_config 参数),我们将模型行为与硬编码值解耦。这使得无需修改源代码即可快速测试不同的代理配置。
    • load_context 方法:确保在运行时加载配置,用必要的设置初始化代理,并防止由于缺少配置而导致的运行时故障。
    • 提示:我们避免在 load_context 中直接设置未定义的实例属性。相反,所有属性都在类构造函数中用默认值进行初始化,以确保正确加载模型。
  • 对话管理:
    • 我们使用诸如 _get_system_message_get_agent_response_call_agent 之类的方法实现多步代理交互模式。这些方法管理多个代理之间的通信流程,例如“oracle”(预言者)和“judge”(评审)角色,每个角色都配置有特定的指令和参数。
    • 静态输入/输出结构:通过遵循 ChatModel 所需的输入 (List[ChatMessage]) 和输出 (ChatCompletionResponse) 格式,我们消除了与转换 JSON 或表格数据相关的复杂性,这在更通用的模型(如 PythonModel)中很常见。
  • 避免常见陷阱:
    • 通过输入示例进行模型验证:我们在模型记录期间提供了一个输入示例,允许 MLflow 验证输入接口并尽早捕获结构性问题,减少部署期间的调试时间。
  • 代码模型:
    • MLflow 建议在编写 GenAI 代理或应用程序时使用代码模型,以便对包含任意 Python 代码的代理进行健壮的记录和直接部署。
import mlflow
from mlflow.types.llm import ChatCompletionResponse, ChatMessage, ChatParams, ChatChoice
from mlflow.pyfunc import ChatModel
from mlflow import deployments
from typing import List, Optional, Dict


class BasicAgent(ChatModel):
def __init__(self):
"""Initialize the BasicAgent with placeholder values."""
self.deploy_client = None
self.models = {}
self.models_config = {}
self.conversation_history = []

def load_context(self, context):
"""Initialize the connectors and model configurations."""
self.deploy_client = deployments.get_deploy_client("databricks")
self.models = context.model_config.get("models", {})
self.models_config = context.model_config.get("configuration", {})

def _get_system_message(self, role: str) -> Dict:
"""
Get the system message configuration for the specified role.

Args:
role (str): The role of the agent (e.g., "oracle" or "judge").

Returns:
dict: The system message for the given role.
"""
if role not in self.models:
raise ValueError(f"Unknown role: {role}")

instruction = self.models[role]["instruction"]
return ChatMessage(role="system", content=instruction).to_dict()

@mlflow.trace(name="Raw Agent Response")
def _get_agent_response(
self, message_list: List[Dict], endpoint: str, params: Optional[dict] = None
) -> Dict:
"""
Call the agent endpoint to get a response.

Args:
message_list (List[Dict]): List of messages for the agent.
endpoint (str): The agent's endpoint.
params (Optional[dict]): Additional parameters for the call.

Returns:
dict: The response from the agent.
"""
response = self.deploy_client.predict(
endpoint=endpoint, inputs={"messages": message_list, **(params or {})}
)
return response["choices"][0]["message"]

@mlflow.trace(name="Agent Call")
def _call_agent(
self, message: ChatMessage, role: str, params: Optional[dict] = None
) -> Dict:
"""
Prepares and sends the request to a specific agent based on the role.

Args:
message (ChatMessage): The message to be processed.
role (str): The role of the agent (e.g., "oracle" or "judge").
params (Optional[dict]): Additional parameters for the call.

Returns:
dict: The response from the agent.
"""
system_message = self._get_system_message(role)
message_list = self._prepare_message_list(system_message, message)

# Fetch agent response
agent_config = self.models[role]
response = self._get_agent_response(
message_list, agent_config["endpoint"], params
)

# Update conversation history
self.conversation_history.extend([message.to_dict(), response])
return response

@mlflow.trace(name="Assemble Conversation")
def _prepare_message_list(
self, system_message: Dict, user_message: ChatMessage
) -> List[Dict]:
"""
Prepare the list of messages to send to the agent.

Args:
system_message (dict): The system message dictionary.
user_message (ChatMessage): The user message.

Returns:
List[dict]: The complete list of messages to send.
"""
user_prompt = {
"role": "user",
"content": self.models_config.get(
"user_response_instruction", "Can you make the answer better?"
),
}
if self.conversation_history:
return [system_message, *self.conversation_history, user_prompt]
else:
return [system_message, user_message.to_dict()]

def predict(
self, context, messages: List[ChatMessage], params: Optional[ChatParams] = None
) -> ChatCompletionResponse:
"""
Predict method to handle agent conversation.

Args:
context: The MLflow context.
messages (List[ChatMessage]): List of messages to process.
params (Optional[ChatParams]): Additional parameters for the conversation.

Returns:
ChatCompletionResponse: The structured response object.
"""
# Use the fluent API context handler to have added control over what is included in the span
with mlflow.start_span(name="Audit Agent") as root_span:
# Add the user input to the root span
root_span.set_inputs(messages)

# Add attributes to the root span
attributes = {**params.to_dict(), **self.models_config, **self.models}
root_span.set_attributes(attributes)

# Initiate the conversation with the oracle
oracle_params = self._get_model_params("oracle")
oracle_response = self._call_agent(messages[0], "oracle", oracle_params)

# Process the response with the judge
judge_params = self._get_model_params("judge")
judge_response = self._call_agent(
ChatMessage(**oracle_response), "judge", judge_params
)

# Reset the conversation history and return the final response
self.conversation_history = []

output = ChatCompletionResponse(
choices=[ChatChoice(index=0, message=ChatMessage(**judge_response))],
usage={},
model=judge_params.get("endpoint", "unknown"),
)

root_span.set_outputs(output)

return output

def _get_model_params(self, role: str) -> dict:
"""
Retrieves model parameters for a given role.

Args:
role (str): The role of the agent (e.g., "oracle" or "judge").

Returns:
dict: A dictionary of parameters for the agent.
"""
role_config = self.models.get(role, {})

return {
"temperature": role_config.get("temperature", 0.5),
"max_tokens": role_config.get("max_tokens", 500),
}


# IMPORTANT: specifies the Python ChatModel instance to use for inference requests when
# the model is loaded back
agent = BasicAgent()
mlflow.models.set_model(agent)

上面的代码片段将我们的代理定义为 ChatModel 的子类。使用代码模型方法,我们调用 mlflow.models.set_model,并传入我们的 BasicAgent 实例,以指示在重新加载代理时要使用哪个模型对象进行推理。

将代理代码保存在一个 Python 文件中,例如 basic_agent.py这是代码模型的一个关键部分 - 它允许记录一个包含模型代码的文件,而不是序列化的模型对象,从而绕过了序列化问题。

模型已在代码文件中定义,在记录之前只剩下一个步骤:我们需要定义用于初始化模型的配置。这通过定义 model_config 配置来完成。

设置我们的 model_config

在记录模型之前,我们需要定义控制模型代理行为的配置。将配置与模型的核心逻辑解耦,使我们能够轻松测试和比较不同的代理行为,而无需修改模型实现。通过使用灵活的配置系统,我们可以有效地尝试不同的设置,从而更容易地迭代和微调模型。

为什么要解耦配置?

在生成式 AI (GenAI) 的背景下,代理行为会因提供给每个代理的指令集和参数(例如 temperaturemax_tokens)而差异很大。如果我们直接将这些配置硬编码到模型的逻辑中,每次新测试都需要更改模型的源代码,这会导致:

  • 效率低下:每次测试都更改源代码会减慢实验过程。
  • 增加出错风险:不断修改源代码会增加引入 bug 或意外副作用的机会。
  • 缺乏可重现性:如果代码和配置之间没有清晰的分离,跟踪和重现特定结果所使用的确切配置将变得困难。

通过通过 model_config 参数在外部设置这些值,我们使模型变得灵活,能够适应不同的测试场景。这种方法还与 MLflow 的评估工具无缝集成,例如 mlflow.evaluate(),它允许您系统地比较不同配置下的模型输出。

定义模型配置

配置包含两个主要部分:

  1. 模型:此部分定义特定于代理的配置,例如此示例中的 judgeoracle 角色。每个代理都有:

    • 一个 endpoint:指定此代理使用的模型类型或服务。
    • 一个 instruction:定义代理的角色和职责(例如,回答问题、评估响应)。
    • 温度和最大 Tokens:控制生成的可变性 (temperature) 和响应的 token 限制。
  2. 通用配置:模型的整体行为的其他设置,例如用户响应如何为后续代理构建。

注意

设置模型配置有两种可用选项:直接在日志记录代码中(如下所示)或将配置写入 yaml 格式的文件到本地位置,其路径可以在日志记录期间定义 model_config 参数时指定。要了解有关如何使用 model_config 参数的更多信息,请参阅model_config 使用指南

以下是我们如何为代理设置配置:

model_config = {
"models": {
"judge": {
"endpoint": "databricks-meta-llama-3-1-405b-instruct",
"instruction": (
"You are an evaluator of answers provided by others. Based on the context of both the question and the answer, "
"provide a corrected answer if it is incorrect; otherwise, enhance the answer with additional context and explanation."
),
"temperature": 0.5,
"max_tokens": 2000,
},
"oracle": {
"endpoint": "databricks-mixtral-8x7b-instruct",
"instruction": (
"You are a knowledgeable source of information that excels at providing detailed, but brief answers to questions. "
"Provide an answer to the question based on the information provided."
),
"temperature": 0.9,
"max_tokens": 5000,
},
},
"configuration": {
"user_response_instruction": "Can you evaluate and enhance this answer with the provided contextual history?"
},
}

外部配置的好处

  • 灵活性:解耦的配置允许我们轻松切换或调整模型行为,而无需修改核心逻辑。例如,我们可以更改模型的指令或调整 temperature 来测试不同水平的响应创造力。
  • 可伸缩性:随着更多代理被添加到系统中或引入新角色,我们可以扩展此配置而不会使模型代码变得混乱。这种分离使代码库更清晰且更易于维护。
  • 可重现性和比较:通过将配置保持在外部,我们可以使用 MLflow 记录每次运行中使用的特定设置。这使得重现结果和比较不同实验变得更容易,确保了健壮的评估和裁定过程,以选择性能最佳的配置。

配置完成后,我们现在可以记录模型并使用这些设置运行实验。通过利用 MLflow 强大的追踪和日志记录功能,我们将能够有效地管理实验并从代理的响应中提取有价值的见解。

定义输入示例

在记录模型之前,提供一个演示如何与模型交互的 input_example 非常重要。此示例具有几个关键目的:

  • 记录时的验证:包含 input_example 允许 MLflow 在记录过程中使用此示例执行 predict 方法。这有助于验证模型能够处理预期的输入格式并尽早捕获任何问题。
  • UI 展示input_example 在 MLflow UI 中模型的 artifact 下方显示。这为用户提供了一个方便的参考,以了解与部署模型交互时预期的输入结构。

通过提供输入示例,您可以确保您的模型已使用真实数据进行测试,从而增加部署后其行为符合预期的信心。

提示

使用 mlflow.pyfunc.ChatModel 定义 GenAI 应用程序时,如果未提供输入示例,将使用默认的占位符输入示例。如果您在 MLflow UI 的 artifact 查看器中看到一个不熟悉或通用的输入示例,那很可能是系统分配的默认占位符。为避免这种情况,请在保存模型时确保指定自定义输入示例。

这是我们将使用的输入示例:

input_example = {
"messages": [
{
"role": "user",
"content": "What is a good recipe for baking scones that doesn't require a lot of skill?",
}
]
}

此示例表示用户请求一个简单的司康饼食谱。它与我们的 BasicAgent 模型期望的输入结构对齐,该模型处理一个消息列表,其中每条消息包含一个 rolecontent

提供输入示例的好处

  • 执行和验证:MLflow 将在记录期间将此 input_example 传递给模型的 predict 方法,以确保其能够无错误地处理输入。任何输入处理问题,例如不正确的数据类型或缺失字段,都将在这一阶段被捕获,从而节省您以后调试的时间。
  • 用户界面显示input_example 将在 MLflow UI 的模型 artifact 查看部分可见。这有助于用户了解模型期望的输入数据格式,从而更容易在部署后与模型交互。
  • 部署信心:通过预先使用示例输入验证模型,您可以进一步确保模型在生产环境中正常运行,从而降低部署后出现意外行为的风险。

包含 input_example 是一个简单但强大的步骤,可以验证您的模型已准备好部署,并在接收用户输入时按预期运行。

记录和加载我们的自定义代理

要使用 MLflow 记录和加载模型,请使用:

with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
"model",
# If needed, update `python_model` to the Python file containing your agent code
python_model="basic_agent.py",
model_config=model_config,
input_example=input_example,
)

loaded = mlflow.pyfunc.load_model(model_info.model_uri)

response = loaded.predict(
{
"messages": [
{
"role": "user",
"content": "What is the best material to make a baseball bat out of?",
}
]
}
)

结论

在本教程中,您探索了使用 MLflow 的 mlflow.pyfunc.ChatModel 类创建自定义 GenAI 聊天代理的过程。我们演示了如何实现一种灵活、可伸缩和标准化的方法来管理 GenAI 应用程序的部署,使您即使对于尚未在 MLflow 中原生支持的库和框架,也能利用 AI 的最新进展。

通过使用 ChatModel 而非更通用的 PythonModel,您可以利用不可变签名接口的优势,该接口在您所有部署的 GenAI 接口中保持一致,从而提供一致的体验,简化所有解决方案的使用,避免与部署 GenAI 相关的许多常见陷阱。

本教程的关键要点包括:

  • 追踪与监控:通过将追踪直接集成到模型中,您可以获得对应用程序内部工作原理的宝贵见解,从而使调试和优化更加简单。装饰器和流式 API 方法都提供了用于管理关键操作追踪的多功能方式。
  • 灵活的配置管理:将配置从模型代码中解耦,确保您可以快速测试和迭代,而无需修改源代码。这种方法不仅简化了实验过程,还提高了应用程序发展过程中的可重现性和可伸缩性。
  • 标准化的输入和输出结构:利用 ChatModel 的静态签名简化了部署和提供 GenAI 模型的复杂性。通过遵循既定标准,可以减少通常与集成和验证输入/输出格式相关的摩擦。
  • 避免常见陷阱:在整个实现过程中,我们强调了避免常见问题的最佳实践,例如正确处理秘密信息、验证输入示例以及理解加载上下文的细微差别。遵循这些实践可确保您的模型在生产环境中保持安全、健壮和可靠。
  • 验证和部署就绪性:模型在部署前验证的重要性再怎么强调也不为过。通过使用诸如 mlflow.models.validate_serving_input() 之类的工具,您可以及早发现和解决潜在的部署问题,从而在生产部署过程中节省时间和精力。

随着生成式 AI 领域的不断发展,构建适应性强且标准化的模型对于利用未来几个月和几年将解锁的令人兴奋且强大的能力至关重要。本教程中涵盖的方法为您提供了一个健壮的框架,用于在 MLflow 中集成和管理 GenAI 技术,使您能够轻松开发、追踪和部署复杂的 AI 解决方案。

我们鼓励您根据您的具体需求扩展和自定义此基础示例,并探索进一步的增强功能。通过利用 MLflow 不断增长的功能,您可以继续完善您的 GenAI 模型,确保它们在任何应用程序中都能提供有影响力和可靠的结果。