MLflow LangChain 风格
langchain
风格正在积极开发中,并标记为实验性。公共 API 可能会发生变化,并且随着该风格的发展可能会添加新功能。
欢迎来到 LangChain 与 MLflow 集成的开发者指南。本指南是理解和利用 LangChain 与 MLflow 结合能力开发高级语言模型应用的综合资源。
LangChain 是一个多功能框架,专为构建由语言模型驱动的应用而设计。它擅长创建利用语言模型进行推理和生成响应的上下文感知应用,从而能够开发复杂的 NLP 应用。
LangGraph 是 Langchain 创建者推出的一个补充性的基于代理的框架,支持创建有状态代理和多代理 GenAI 应用。LangGraph 利用 LangChain 来与 GenAI 代理组件进行交互。
为什么将 MLflow 与 LangChain 结合使用?
除了使用 MLflow 管理和部署机器学习模型的优势外,LangChain 与 MLflow 的集成还提供了在更广泛的 MLflow 生态系统中使用 LangChain 的一系列优势。
实验追踪
LangChain 在实验各种代理、工具和检索器方面的灵活性与 MLflow Tracking 结合后变得更加强大。这种结合可以快速进行实验和迭代。您可以轻松比较运行,从而更容易地优化模型并加速从开发到生产部署的过程。
依赖管理
利用 MLflow 自动管理和记录代码及环境依赖项的能力,自信地部署 LangChain 应用。您还可以明确声明外部资源依赖项,例如 LangChain 应用查询的 LLM 服务端点或向量搜索索引。MLflow 将这些依赖项作为模型元数据进行跟踪,以便下游服务系统可以确保您部署的 LangChain 应用到这些依赖资源的身份验证顺利进行。
这些功能确保了开发和生产环境之间的一致性,减少了部署风险,并且减少了人工干预。
MLflow Evaluate
MLflow Evaluate 在 MLflow 中提供了本地功能来评估语言模型。使用此功能,您可以轻松地利用自动化评估算法对 LangChain 应用的推理结果进行评估。此功能有助于高效评估 LangChain 应用的推理结果,确保稳健的性能分析。
可观察性
MLflow Tracing 是 MLflow 的一个新功能,允许您追踪数据如何在 LangChain 链/代理等中流动。此功能提供了数据流的可视化表示,使您更容易理解 LangChain 应用的行为并识别潜在的瓶颈或问题。凭借其强大的自动追踪功能,您无需更改任何代码,只需运行一次 mlflow.langchain.autolog()
命令即可检测 LangChain 应用。
自动日志记录
自动日志记录是一个强大的一站式解决方案,只需一行代码 mlflow.langchain.autolog()
即可实现上述所有好处。通过启用自动日志记录,您可以轻松地自动记录 LangChain 应用的所有组件,包括链、代理和检索器。此功能简化了跟踪和管理 LangChain 应用的过程,让您专注于开发和改进模型。有关如何使用此功能的更多信息,请参阅MLflow LangChain 自动日志记录文档。
MLflow LangChain 集成中支持的元素
- 代理(Agents)
- 检索器(Retrievers)
- 可运行对象(Runnables)
- LangGraph Complied Graph (仅通过从代码日志记录模型支持)
- LLMChain (已弃用,仅支持
langchain<0.3.0
) - RetrievalQA (已弃用,仅支持
langchain<0.3.0
)
记录包含 ChatOpenAI 和 AzureChatOpenAI 的链/代理需要 MLflow>=2.12.0
和 LangChain>=0.0.307
。
链、代理和检索器概览
- 链(Chain)
- 代理(Agents)
- 检索器(Retrievers)
代码中硬编码的一系列动作或步骤。LangChain 中的链结合了提示、模型和输出解析器等各种组件,以创建处理步骤的流程。
下图显示了一个示例,其中顶部部分直接通过 API 调用与 SaaS LLM 交互,不考虑对话历史。底部部分显示了向一个包含对话历史状态的 LangChain 链提交相同的查询,以便每次后续输入都包含整个对话历史。以这种方式保留对话上下文是创建“聊天机器人”的关键。
代理(Agent)
动态构造,利用语言模型选择一系列动作。与链不同,代理根据输入、可用工具和中间结果决定动作的顺序。
检索器(Retriever)
RetrievalQA 链中负责查找相关文档或数据的组件。在需要 LLM 参考特定外部信息以获得准确响应的应用中,检索器是关键。
MLflow LangChain 风格入门 - 教程和指南
LangChain 快速入门
通过探索一个由提示和模型组成的可能最简单的链配置,来快速入门 MLflow 和 LangChain,以创建一个单一用途的实用应用。
使用 LangChain 的 RAG 教程
学习如何使用 MLflow 集成构建一个 LangChain RAG,以回答关于商业企业合法性的高度具体的问题。
从代码日志记录模型
-
自 MLflow 2.12.2 起,MLflow 引入了直接从代码定义记录 LangChain 模型的能力。
-
此功能为管理 LangChain 模型提供了多项好处
-
避免序列化复杂性:文件句柄、套接字、外部连接、动态引用、lambda 函数和系统资源是不可序列化的。一些 LangChain 组件不支持原生序列化,例如
RunnableLambda
。
无需 Pickle:在与序列化对象时使用的 Python 版本不同的版本中加载 pickle 或 cloudpickle 文件不保证兼容性。
可读性:序列化对象通常人类难以阅读。从代码日志记录模型允许您通过代码审查模型定义。
有关此功能的更多信息,请参阅从代码日志记录模型功能文档。
-
要使用此功能,您将利用
mlflow.models.set_model()
API 定义要记录为 MLflow 模型的链。在定义链的代码中设置此项后,记录模型时,您将指定定义链的文件的路径。以下示例演示了如何使用此方法记录一个简单的链在单独的 Python 文件中定义链
# %%writefile chain.py
import os
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import OpenAI
import mlflow
mlflow.set_experiment("Homework Helper")
mlflow.langchain.autolog()
prompt = PromptTemplate(
template="You are a helpful tutor that evaluates my homework assignments and provides suggestions on areas for me to study further."
" Here is the question: {question} and my answer which I got wrong: {answer}",
input_variables=["question", "answer"],
)
def get_question(input):
default = "What is your name?"
if isinstance(input_data[0], dict):
return input_data[0].get("content").get("question", default)
return default
def get_answer(input):
default = "My name is Bobo"
if isinstance(input_data[0], dict):
return input_data[0].get("content").get("answer", default)
return default
model = OpenAI(temperature=0.95)
chain = (
{
"question": itemgetter("messages") | RunnableLambda(get_question),
"answer": itemgetter("messages") | RunnableLambda(get_answer),
}
| prompt
| model
| StrOutputParser()
)
mlflow.models.set_model(chain) -
提示
from pprint import pprint
import mlflow
chain_path = "chain.py"
with mlflow.start_run():
info = mlflow.langchain.log_model(lc_model=chain_path, artifact_path="chain") -
如果您使用 Jupyter Notebook,可以使用
%%writefile
魔术命令将代码单元格直接写入文件,而无需手动创建。# Load the model and run inference
homework_chain = mlflow.langchain.load_model(model_uri=info.model_uri)
exam_question = {
"messages": [
{
"role": "user",
"content": {
"question": "What is the primary function of control rods in a nuclear reactor?",
"answer": "To stir the primary coolant so that the neutrons are mixed well.",
},
},
]
}
response = homework_chain.invoke(exam_question)
pprint(response)然后从主 notebook 中,通过提供定义链的文件的路径来记录模型
在 chain.py
中定义的模型现在已记录到 MLflow 中。您可以重新加载模型并运行推理
您可以在 MLflow UI 上看到模型已作为代码记录
从代码记录模型时,请确保您的代码不包含任何敏感信息,例如 API 密钥、密码或其他机密数据。代码将以纯文本形式存储在 MLflow 模型 Artifact 中,任何有权访问 Artifact 的人都可以查看代码。
要了解有关 MLflow LangChain 风格的更多详细信息,请阅读下面的详细指南。
查看综合指南
-
常见问题解答
ValueError: This code relies on the pickle module. You will need to set allow_dangerous_deserialization=True if you want to opt-in to
allow deserialization of data using pickle. Data can be compromised by a malicious actor if not handled properly to include a malicious
payload that when deserialized with pickle can execute arbitrary code on your machine.我无法加载我的链!
允许危险的反序列化:LangChain 中的 Pickle opt-in 逻辑将阻止通过 MLflow 加载组件。您可能会看到如下错误
LangChain 中的一项更改,强制用户选择加入 pickle 反序列化,可能会导致加载使用 MLflow 记录的链、向量存储、检索器和代理时出现一些问题。由于每个组件都没有公开选项来设置加载器函数上的此参数,因此您需要在定义加载器函数时直接设置此选项来记录模型。未设置此值的 LangChain 组件将无问题地保存,但如果未设置,加载时将引发 ValueError
。
要解决此问题,只需重新记录您的模型,并在定义的加载器函数中指定选项 allow_dangerous_deserialization=True
。有关在记录 FAISS
向量存储实例时如何指定此选项的示例,请参阅LangChain 检索器教程中的 loader_fn
声明。
-
我无法使用 MLflow 保存我的链、代理或检索器。
如果您在使用 MLflow 记录或保存 LangChain 组件时遇到问题,请参阅从代码日志记录模型功能文档,以确定从脚本文件记录模型是否提供更简单、更稳健的日志记录解决方案!
-
Cloudpickle 序列化挑战:使用 cloudpickle 进行序列化可能会遇到限制,具体取决于对象的复杂性。
某些对象,尤其是那些具有复杂内部状态或依赖于外部系统资源的对象,本质上不可序列化。此限制出现的原因是序列化本质上需要将对象转换为字节流,这对于与系统状态紧密耦合或具有外部 I/O 操作的对象来说可能很复杂。尝试将 PyDantic 升级到 2.x 版本以解决此问题。
-
验证原生序列化支持:如果使用 MLflow 保存或记录不起作用,请确保 langchain 对象(链、代理或检索器)使用 langchain API 原生可序列化。
由于其复杂的结构,并非所有 langchain 组件都易于序列化。如果不支持原生序列化且 MLflow 不支持保存模型,您可以在 LangChain 仓库中提交问题,或在 LangChain 讨论区寻求指导。
跟上 MLflow 的新功能:MLflow 可能无法立即支持最新的 LangChain 功能。
-
如果 MLflow 不支持新功能,请考虑在 MLflow GitHub issues 页面提交功能请求。由于处于高度活跃开发中的库(例如 LangChain 的发布速度)变化迅速,破坏性更改、API 重构以及甚至现有功能的基本功能支持都可能导致集成问题。如果您希望看到 LangChain 中支持的链、代理、检索器或任何未来结构,请告知我们!
保存模型时出现 AttributeError
-
处理 LangChain 和 MLflow 中的依赖项安装:LangChain 和 MLflow 不会自动安装所有依赖项。
特定代理、检索器或工具可能需要的其他包可能需要在保存或记录模型时明确定义。如果您的模型依赖于标准 LangChain 包中未包含的这些外部组件库(尤其是工具),则这些依赖项并非始终会自动记录为模型的一部分(请参阅下文有关如何包含它们的指南)。
声明额外依赖项:保存和记录时使用 extra_pip_requirements
参数。
-
保存或记录包含不属于核心 langchain 安装的外部依赖项的模型时,您将需要这些额外的依赖项。模型风格提供了两个选项来声明这些依赖项:
extra_pip_requirements
和pip_requirements
。虽然指定pip_requirements
是完全有效的,但我们建议使用extra_pip_requirements
,因为它不依赖于定义使用 langchain 模型进行推理所需的所有核心依赖包(其他核心依赖项将自动推断)。如何使用带有 LangChain 的流式 API?
使用 LangChain 模型进行流式处理:确保 LangChain 模型支持流式响应,并使用 MLflow 版本 >= 2.12.2。
自 MLflow 2.12.2 发布以来,支持流式响应并使用 MLflow 2.12.2(或更高版本)保存的 LangChain 模型可以使用 predict_stream
API 加载并用于流式推理。请确保您正确使用返回类型,因为这些模型返回的是一个 Generator
对象。要了解更多信息,请参阅predict_stream 指南。
如何将使用 LangGraph 构建的代理记录到 MLflow?
from typing import Literal
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
import mlflow
@tool
def get_weather(city: Literal["seattle", "sf"]):
"""Use this to get weather information."""
if city == "seattle":
return "It's probably raining. Again."
elif city == "sf":
return "It's always sunny in sf"
llm = ChatOpenAI()
tools = [get_weather]
graph = create_react_agent(llm, tools)
# specify the Agent as the model interface to be loaded when executing the script
mlflow.models.set_model(graph)
LangGraph 与 MLflow 的集成旨在利用 MLflow 中的从代码日志记录模型功能来拓宽和简化代理序列化的支持。
import mlflow
input_example = {
"messages": [{"role": "user", "content": "what is the weather in seattle today?"}]
}
with mlflow.start_run():
model_info = mlflow.langchain.log_model(
lc_model="./langgraph.py", # specify the path to the LangGraph agent script definition
artifact_path="langgraph",
input_example=input_example,
)
要记录 LangGraph 代理,您可以将代理代码定义在脚本中,如下所示,保存到文件 langgraph.py
中
准备好将此代理脚本定义记录到 MLflow 时,可以在定义模型时直接引用此已保存的脚本
agent = mlflow.langchain.load_model(model_info.model_uri)
query = {
"messages": [
{
"role": "user",
"content": "Should I bring an umbrella today when I go to work in San Francisco?",
}
]
}
agent.invoke(query)
当代理从 MLflow 加载时,脚本将被执行,并且定义的代理将可用于调用。
代理可以加载并用于推理,如下所示
如何评估 LangGraph 代理?
mlflow.evaluate 函数提供了评估模型性能的稳健方法。
import mlflow
import pandas as pd
from typing import List
# Note that we assume the `model_uri` variable is present
# Also note that registering and loading the model is optional and you
# can simply leverage your langgraph object in the custom function.
loaded_model = mlflow.langchain.load_model(model_uri)
eval_data = pd.DataFrame(
{
"inputs": [
"What is MLflow?",
"What is Spark?",
],
"ground_truth": [
"MLflow is an open-source platform for managing the end-to-end machine learning (ML) lifecycle. It was developed by Databricks, a company that specializes in big data and machine learning solutions. MLflow is designed to address the challenges that data scientists and machine learning engineers face when developing, training, and deploying machine learning models.",
"Apache Spark is an open-source, distributed computing system designed for big data processing and analytics. It was developed in response to limitations of the Hadoop MapReduce computing model, offering improvements in speed and ease of use. Spark provides libraries for various tasks such as data ingestion, processing, and analysis through its components like Spark SQL for structured data, Spark Streaming for real-time data processing, and MLlib for machine learning tasks",
],
}
)
def custom_langgraph_wrapper(inputs: pd.DataFrame) -> List[str]:
"""Extract the predictions from a chat message sequence."""
answers = []
for content in inputs["inputs"]:
prediction = loaded_model.invoke(
{"messages": [{"role": "user", "content": content}]}
)
last_message_content = prediction["messages"][-1].content
answers.append(last_message_content)
return answers
with mlflow.start_run() as run:
results = mlflow.evaluate(
custom_langgraph_wrapper, # Pass our function defined above
data=eval_data,
targets="ground_truth",
model_type="question-answering",
extra_metrics=[
mlflow.metrics.latency(),
mlflow.metrics.genai.answer_correctness("openai:/gpt-4o"),
],
)
print(results.metrics)
LangGraph 代理,尤其是那些具有聊天功能的代理,可以在一次推理调用中返回多条消息。考虑到 mlflow.evaluate
在原始预测和指定的基本真值之间进行简单的比较,用户有责任调和预测输出和基本真值之间的潜在差异。
{
"latency/mean": 1.8976624011993408,
"latency/variance": 0.10328687906900313,
"latency/p90": 2.1547686100006103,
"flesch_kincaid_grade_level/v1/mean": 12.1,
"flesch_kincaid_grade_level/v1/variance": 0.25,
"flesch_kincaid_grade_level/v1/p90": 12.5,
"ari_grade_level/v1/mean": 15.850000000000001,
"ari_grade_level/v1/variance": 0.06250000000000044,
"ari_grade_level/v1/p90": 16.05,
"exact_match/v1": 0.0,
"answer_correctness/v1/mean": 5.0,
"answer_correctness/v1/variance": 0.0,
"answer_correctness/v1/p90": 5.0,
}
通常,最佳方法是使用自定义函数来处理响应。下面提供了一个自定义函数示例,该函数从 LangGraph 模型中提取最后一条聊天消息。然后将此函数用于 mlflow.evaluate,以返回单个字符串响应,可以与 "ground_truth"
列进行比较。
输出
有关适用于此评估示例的 LangGraph 模型的完整示例,请参阅MLflow LangGraph 博客。
import json
import mlflow
import os
from operator import itemgetter
from langchain.schema.runnable import RunnablePassthrough
model = RunnablePassthrough.assign(
problem=lambda x: x["messages"][-1]["content"]
) | itemgetter("problem")
input_example = {
"messages": [
{
"role": "user",
"content": "Hello",
}
]
}
# this model accepts the input_example
assert model.invoke(input_example) == "Hello"
# set this environment variable to avoid input conversion
os.environ["MLFLOW_CONVERT_MESSAGES_DICT_FOR_LANGCHAIN"] = "false"
with mlflow.start_run():
model_info = mlflow.langchain.log_model(model, "model", input_example=input_example)
pyfunc_model = mlflow.pyfunc.load_model(model_info.model_uri)
assert pyfunc_model.predict(input_example) == ["Hello"]