跳到主要内容

LangChain 在 MLflow 中 (实验性)

信息

langchain 数据风味目前处于积极开发阶段,并被标记为“实验性”。公共 API 正在不断演进,并且正在添加新功能以增强其功能。

概述

LangChain 是一个用于创建由语言模型驱动的应用程序的 Python 框架。它提供了用于开发利用语言模型进行推理和生成响应的上下文感知应用程序的独特功能。与 MLflow 的集成简化了复杂 NLP 应用程序的开发和部署。

LangChain 的技术精髓

  • 上下文感知应用:LangChain 专门用于将语言模型连接到各种上下文来源,使其能够生成更相关、更准确的输出。
  • 推理能力:它利用语言模型的力量来推理给定上下文并根据上下文采取适当的行动。
  • 灵活的链式组合:LangChain 表达式语言 (LCEL) 允许从基本组件轻松构建复杂的链,支持流式传输、并行处理和日志记录等功能。

使用 LangChain 构建链

  • 基本组件:LangChain 促进将提示模板、模型和输出解析器等组件链接起来,以创建复杂的 [工作流](https://python.langchain.ac.cn/docs/modules/chains/basic/llm_chain)。
  • 示例 - 笑话生成器:
    • 一个基本链可以接受一个主题,并使用提示模板、ChatOpenAI 模型和输出解析器的组合来生成一个笑话。
    • 组件使用 | 运算符链接,类似于 Unix 管道,允许一个组件的输出馈送到下一个组件。
  • 高级用例:
    • LangChain 还支持更复杂的设置,例如检索增强生成 (RAG) 链,这些链可以在回答问题时添加上下文。

与 MLflow 集成

  • 简化日志记录和加载:MLflow 的 langchain 数据风味提供了 log_model()load_model() 等函数,可在 MLflow 生态系统中轻松记录和检索 LangChain 模型。
  • 简化部署:在 MLflow 中记录的 LangChain 模型可以被解释为通用的 Python 函数,从而简化了它们在各种应用程序中的部署和使用。通过直接集成到您的记录模型中的依赖项管理,您可以部署您的应用程序,并确信用于训练模型的环境就是用于服务它的环境。
  • 多功能模型交互:该集成允许开发人员利用 LangChain 的独特功能,并结合 MLflow 强大的模型跟踪和管理功能。
  • 自动记录:MLflow 的 langchain 数据风味提供了 LangChain 模型的自动记录,该记录会自动记录用于推理的 [伪影](https://mlflow.org.cn/docs/latest/models.html#what-is-an-mlflow-model),[指标](https://mlflow.org.cn/docs/latest/models.html#what-is-an-mlflow-model) 和模型。

langchain 模型数据风味通过 mlflow.langchain.save_model()mlflow.langchain.log_model() 函数,允许使用 MLflow 格式记录 LangChain 模型。使用这些函数还会为它们生成的 MLflow 模型添加 python_function 数据风味,允许通过 mlflow.pyfunc.load_model() 将模型解释为通用的 Python 函数进行推理。

您还可以使用 mlflow.langchain.load_model() 函数将已保存或记录的 MLflow 模型(具有 langchain 数据风味)加载为模型属性的字典。

基本示例:在 MLflow 中记录 LangChain LLMChain

python
import os

from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate

import mlflow

# Ensure the OpenAI API key is set in the environment
assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."

# Initialize the OpenAI model and the prompt template
llm = OpenAI(temperature=0.9)
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?",
)

# Create the LLMChain with the specified model and prompt
chain = LLMChain(llm=llm, prompt=prompt)

# Log the LangChain LLMChain in an MLflow run
with mlflow.start_run():
logged_model = mlflow.langchain.log_model(chain, name="langchain_model")

# Load the logged model using MLflow's Python function flavor
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)

# Predict using the loaded model
print(loaded_model.predict([{"product": "colorful socks"}]))

下面显示了该示例的输出

python
["\n\nColorful Cozy Creations."]

简单 LLMChain 示例展示了什么

  • 集成灵活性:该示例突出了 LangChain 的 LLMChain(由 OpenAI 模型和自定义提示模板组成)如何轻松地在 MLflow 中记录。
  • 简化模型管理:通过 MLflow 的 langchain 数据风味,链被记录下来,支持版本控制、跟踪和轻松检索。
  • 易于部署:记录的 LangChain 模型使用 MLflow 的 pyfunc 模块加载,说明了 LangChain 模型在 MLflow 中部署过程的简单性。
  • 实际应用:最终的预测步骤演示了模型在真实场景中的功能,根据给定的产品生成公司名称。

使用 MLflow 记录 LangChain Agent

什么是 Agent?

LangChain 中的 Agent 利用语言模型动态确定和执行一系列动作,这与链中的硬编码序列形成对比。要了解更多关于 Agent 的信息并查看 LangChain 中的其他示例,您可以 阅读 LangChain 关于 Agent 的文档

Agent 的关键组件

Agent
  • 驱动决策制定的核心链,利用语言模型和提示。
  • 接收诸如工具描述、用户目标和先前执行的步骤等输入。
  • 输出下一个动作集 (AgentActions) 或最终响应 (AgentFinish)。
工具
  • 由 Agent 调用以完成任务的函数。
  • 为有效使用,提供适当的工具并准确描述它们至关重要。
工具包
  • 为特定任务量身定制的工具集合。
  • LangChain 提供一系列内置工具包,并支持自定义工具包创建。
AgentExecutor
  • 执行 Agent 决策的运行时环境。
  • 处理工具错误和 Agent 输出解析等复杂性。
  • 确保全面的日志记录和可观察性。
其他 Agent 运行时
  • 除了 AgentExecutor,LangChain 还支持 Plan-and-execute Agent、Baby AGI 和 Auto GPT 等实验性运行时。
  • 也支持自定义运行时逻辑的创建。

记录 LangChain Agent 的示例

此示例说明了在 MLflow 中记录 LangChain Agent 的过程,突出了 LangChain 复杂的 Agent 功能与 MLflow 强大的模型管理功能的集成。

python
import os

from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.llms import OpenAI

import mlflow

# Note: Ensure that the package 'google-search-results' is installed via pypi to run this example
# and that you have a accounts with SerpAPI and OpenAI to use their APIs.

# Ensuring necessary API keys are set
assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."
assert (
"SERPAPI_API_KEY" in os.environ
), "Please set the SERPAPI_API_KEY environment variable."

# Load the language model for agent control
llm = OpenAI(temperature=0)

# Next, let's load some tools to use. Note that the `llm-math` tool uses an LLM, so we need to pass that in.
tools = load_tools(["serpapi", "llm-math"], llm=llm)

# Finally, let's initialize an agent with the tools, the language model, and the type of agent we want to use.
agent = initialize_agent(
tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True
)

# Log the agent in an MLflow run
with mlflow.start_run():
logged_model = mlflow.langchain.log_model(agent, name="langchain_model")

# Load the logged agent model for prediction
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)

# Generate an inference result using the loaded model
question = "What was the high temperature in SF yesterday in Fahrenheit? What is that number raised to the .023 power?"

answer = loaded_model.predict([{"input": question}])

print(answer)

上面示例的输出如下所示

python
["1.1044000282035853"]

简单 Agent 示例展示了什么

  • 复杂的 Agent 记录:演示了 LangChain 的复杂 Agent(使用多个工具和语言模型)如何在 MLflow 中进行记录。
  • 高级工具集成:展示了如何将 'serpapi' 和 'llm-math' 等附加工具与 LangChain Agent 一起使用,强调了该框架集成复杂功能的能力。
  • Agent 初始化和使用:详细介绍了使用特定工具和模型设置初始化 LangChain Agent 的过程,以及如何使用它来执行复杂查询。
  • 高效的模型管理和部署:说明了使用 MLflow,从记录到预测,可以轻松管理和部署复杂的 LangChain Agent。

使用 LangChain 和 GenAI LLM 实现实时流式输出

注意

通过 predict_stream API 进行流式响应仅在 MLflow 版本 >= 2.12.2 中可用。先前的 MLflow 版本不支持流式响应。

流式输出功能概述

MLflow 中的 LangChain 集成支持从各种支持此类功能的 GenAI 语言模型 (LLM) 实现实时流式输出。此功能对于需要即时、增量响应的应用程序至关重要,可促进对话 Agent 或实时内容生成等动态交互。

支持的流式模型

LangChain 旨在与任何提供流式输出功能的 LLM 无缝协作。这包括来自 OpenAI(例如,特定版本的 ChatGPT)的某些模型,以及来自其他供应商的支持类似功能的 LLM。

使用 predict_stream 进行流式输出

MLflow pyfunc LangChain 数据风味中的 predict_stream 方法旨在处理同步输入并以流式方式提供输出。此方法对于通过在模型响应生成完成后等待整个响应,而是随时提供模型响应的各个部分,来保持引人入胜的用户体验特别有用。

示例用法

以下示例演示了如何设置和使用 predict_stream 函数与 MLflow 中管理的 LangChain 模型,并突出了实时响应生成

python
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAI
import mlflow


template_instructions = "Provide brief answers to technical questions about {topic} and do not answer non-technical questions."
prompt = PromptTemplate(
input_variables=["topic"],
template=template_instructions,
)
chain = LLMChain(llm=OpenAI(temperature=0.05), prompt=prompt)

with mlflow.start_run():
model_info = mlflow.langchain.log_model(chain, name="tech_chain")

# Assuming the model is already logged in MLflow and loaded
loaded_model = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)

# Simulate a single synchronous input
input_data = "Hello, can you explain streaming outputs?"

# Generate responses in a streaming fashion
response_stream = loaded_model.predict_stream(input_data)
for response_part in response_stream:
print("Streaming Response Part:", response_part)
# Each part of the response is handled as soon as it is generated

与回调的高级集成

LangChain 的架构也支持在流式输出上下文中使用回调。这些回调可用于通过允许在流式传输过程中触发操作来增强功能,例如记录中间响应或在传递之前修改它们。

注意

大多数回调处理器的使用涉及记录在对 Chain 或 Retriever 中各种服务和工具的调用过程中涉及的跟踪。为简单起见,下面展示了一个简单的 stdout 回调处理器。真实世界的回调处理器必须是 LangChain 中 BaseCallbackHandler 类的子类。

python
from langchain_core.callbacks import StdOutCallbackHandler

handler = StdOutCallbackHandler()

# Attach callback to enhance the streaming process
response_stream = loaded_model.predict_stream(input_data, callback_handlers=[handler])
for enhanced_response in response_stream:
print("Enhanced Streaming Response:", enhanced_response)

这些示例和解释展示了开发人员如何利用 MLflow 中 LangChain 模型的实时流式输出功能,从而能够创建高度响应和交互式的应用程序。

增强 MLflow 对 RetrievalQA Chains 的管理

LangChain 与 MLflow 的集成引入了一种更有效的方式来管理和利用 RetrievalQA 链,这是 LangChain 功能的关键方面。这些链熟练地将数据检索与问答过程相结合,利用了语言模型的强大功能。

关于 RetrievalQA Chains 的关键见解

  • RetrievalQA Chain 功能:这些链代表了一种复杂的 LangChain 功能,其中信息检索与基于语言模型的问答无缝结合。它们在需要语言模型查阅特定数据或文档以获得准确响应的场景中表现出色。

  • 检索对象的作用:RetrievalQA 链的核心是检索器对象,该对象负责根据查询源相关的文档或数据。

RAG 过程的详细概述

  • 文档加载器:便于从各种来源加载文档,拥有超过 100 个加载器和集成。

  • 文档转换器:通过转换和分割文档为易于管理的片段,为检索准备文档。

  • 文本嵌入模型:生成文本的语义嵌入,提高数据检索的相关性和效率。

  • 向量存储:专门的数据库,用于存储和促进文本嵌入的搜索。

  • 检索器:采用各种检索技术,从简单的语义搜索到更复杂的检索方法,如父文档检索器和集成检索器。

阐明 MLflow 的向量数据库管理

  • 传统的 LangChain 序列化:LangChain 通常需要手动管理检索器对象的序列化,包括处理向量数据库。

  • MLflow 的简化:MLflow 中的 langchain 数据风味极大地简化了这个过程。它自动化了序列化,管理 persist_dir 的内容和 loader_fn 函数的 [pickling](https://docs.pythonlang.cn/3/library/pickle.html)。

MLflow 的关键组件和 VectorDB 记录

  1. persist_dir:存储检索器对象(包括向量数据库)的目录。

  2. loader_fn:用于从其存储位置加载检索器对象的函数。

重要注意事项

  • VectorDB 记录:MLflow 通过其 langchain 数据风味,确实会管理检索器对象中的向量数据库。然而,向量数据库本身并未作为单独的实体在 MLflow 中进行显式记录。

  • 运行时 VectorDB 维护:在训练和运行时环境中保持向量数据库的一致性至关重要。虽然 MLflow 管理检索器对象的序列化,但确保在运行时可以访问相同的向量数据库对于保持性能一致性仍然至关重要。

记录 LangChain RetrievalQA Chain 的示例

python
import os
import tempfile

from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS

import mlflow

assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."

with tempfile.TemporaryDirectory() as temp_dir:
persist_dir = os.path.join(temp_dir, "faiss_index")

# Create the vector db, persist the db to a local fs folder
loader = TextLoader("tests/langchain/state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)

# Create the RetrievalQA chain
retrievalQA = RetrievalQA.from_llm(llm=OpenAI(), retriever=db.as_retriever())

# Log the retrievalQA chain
def load_retriever(persist_directory):
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()

with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
name="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)

# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(
loaded_model.predict(
[{"query": "What did the president say about Ketanji Brown Jackson"}]
)
)

上面示例的输出如下所示

python
[" The president said..."]

在 MLflow 中记录和评估 LangChain Retriever

MLflow 中的 langchain 数据风味将其功能扩展到包括检索器对象的记录和单独评估。此功能对于评估检索器检索文档的质量特别有价值,而无需通过大型语言模型 (LLM) 进行处理。

记录单个检索器的目的

  • 独立评估:允许评估检索器在获取相关文档方面的性能,而独立于它们随后在 LLM 中的使用。
  • 质量保证:有助于评估检索器在获取准确且符合上下文的文档方面的有效性。

在 MLflow 中记录检索器的要求

  • persist_dir:指定存储检索器对象的位置。
  • loader_fn:详细说明用于从存储位置加载检索器对象的函数。
  • 这些要求与记录 RetrievalQA 链的要求一致,确保了流程的一致性。

记录 LangChain Retriever 的示例

python
import os
import tempfile

from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS

import mlflow

assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."

with tempfile.TemporaryDirectory() as temp_dir:
persist_dir = os.path.join(temp_dir, "faiss_index")

# Create the vector database and persist it to a local filesystem folder
loader = TextLoader("tests/langchain/state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)

# Define a loader function to recall the retriever from the persisted vectorstore
def load_retriever(persist_directory):
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()

# Log the retriever with the loader function
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
db.as_retriever(),
name="retriever",
loader_fn=load_retriever,
persist_dir=persist_dir,
)

# Load the retriever chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(
loaded_model.predict(
[{"query": "What did the president say about Ketanji Brown Jackson"}]
)
)

上面示例的输出如下所示

python
[
[
{
"page_content": "Tonight. I call...",
"metadata": {"source": "/state.txt"},
},
{
"page_content": "A former top...",
"metadata": {"source": "/state.txt"},
},
]
]

MLflow Langchain 自动记录

有关如何为 Langchain 模型启用自动记录的更多详细信息,请参阅 MLflow Langchain 自动记录 文档。