MLflow 中的 LangChain(实验性)
langchain
特性目前正在积极开发中,并标记为实验性。公共 API 正在演进,并且正在添加新功能以增强其功能。
概述
LangChain 是一个用于构建由语言模型驱动的应用的 Python 框架。它提供了独特的功能,用于开发利用语言模型进行推理和生成响应的上下文感知应用。与 MLflow 的这种集成简化了复杂 NLP 应用的开发和部署。
LangChain 的技术精髓
- 上下文感知应用:LangChain 专注于将语言模型连接到各种上下文源,使它们能够产生更相关和更准确的输出。
- 推理能力:它利用语言模型的能力来对给定的上下文进行推理,并基于此采取适当的行动。
- 灵活的链式组合:LangChain Expression Language (LCEL) 允许轻松地从基本组件构建复杂的链,支持流式传输、并行处理和日志记录等功能。
使用 LangChain 构建链
- 基本组件:LangChain 简化了将提示模板、模型和输出解析器等组件连接在一起以创建复杂工作流的过程。
- 示例 - 笑话生成器:
- 一个基本链可以接受一个主题,并结合提示模板、ChatOpenAI 模型和输出解析器来生成一个笑话。
- 组件使用
|
运算符进行链式连接,类似于 Unix 管道,允许一个组件的输出输入到下一个组件。
- 高级用例:
- LangChain 还支持更复杂的设置,例如检索增强生成 (RAG) 链,它们可以在回答问题时添加上下文。
与 MLflow 集成
- 简化的日志记录和加载:MLflow 的
langchain
特性提供了log_model()
和load_model()
等函数,可以在 MLflow 生态系统内轻松记录和检索 LangChain 模型。 - 简化的部署:在 MLflow 中记录的 LangChain 模型可以被解释为通用的 Python 函数,从而简化了它们在各种应用中的部署和使用。通过将依赖管理直接集成到您记录的模型中,您可以部署您的应用,并知道用于训练模型的环境也将用于服务它。
- 多功能模型交互:这种集成允许开发者利用 LangChain 的独特功能与 MLflow 强大的模型追踪和管理能力相结合。
- 自动日志记录:MLflow 的
langchain
特性提供 LangChain 模型的自动日志记录,这会自动记录用于推理的工件、指标和模型。
langchain
模型特性通过 mlflow.langchain.save_model()
和 mlflow.langchain.log_model()
函数支持以 MLflow 格式记录 LangChain 模型。使用这些函数还会向它们生成的 MLflow 模型添加 python_function
特性,允许模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。
您还可以使用 mlflow.langchain.load_model()
函数将带有 langchain
特性的已保存或已记录的 MLflow 模型作为模型属性的字典加载。
基本示例:在 MLflow 中记录 LangChain LLMChain
import os
from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
import mlflow
# Ensure the OpenAI API key is set in the environment
assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."
# Initialize the OpenAI model and the prompt template
llm = OpenAI(temperature=0.9)
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?",
)
# Create the LLMChain with the specified model and prompt
chain = LLMChain(llm=llm, prompt=prompt)
# Log the LangChain LLMChain in an MLflow run
with mlflow.start_run():
logged_model = mlflow.langchain.log_model(chain, "langchain_model")
# Load the logged model using MLflow's Python function flavor
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
# Predict using the loaded model
print(loaded_model.predict([{"product": "colorful socks"}]))
示例输出如下所示
["\n\nColorful Cozy Creations."]
简单 LLMChain 示例展示了什么
- 集成灵活性:该示例强调了如何轻松地在 MLflow 中记录由 OpenAI 模型和自定义提示模板组成的 LangChain LLMChain。
- 简化的模型管理:通过 MLflow 的
langchain
特性,该链被记录,实现了版本控制、追踪和轻松检索。 - 易于部署:使用 MLflow 的
pyfunc
模块加载已记录的 LangChain 模型,展示了在 MLflow 中部署 LangChain 模型的直接流程。 - 实际应用:最后的预测步骤展示了模型在实际场景中的功能,根据给定产品生成公司名称。
使用 MLflow 记录 LangChain Agent
Agent 是什么?
LangChain 中的 Agent 利用语言模型动态确定和执行一系列动作,这与链中硬编码的序列不同。要了解更多关于 Agent 的信息并在 LangChain 中查看更多示例,您可以阅读 LangChain 关于 Agent 的文档。
Agent 的关键组件
Agent
- 驱动决策的核心链,利用语言模型和提示。
- 接收工具描述、用户目标和先前执行的步骤等输入。
- 输出下一组动作 (AgentActions) 或最终响应 (AgentFinish)。
工具
- Agent 调用以完成任务的函数。
- 提供适当的工具并准确描述它们对于有效使用至关重要。
工具集
- 为特定任务量身定制的工具集合。
- LangChain 提供一系列内置工具集,并支持创建自定义工具集。
AgentExecutor
- 执行 Agent 决策的运行时环境。
- 处理工具错误和 Agent 输出解析等复杂情况。
- 确保全面的日志记录和可观测性。
额外的 Agent 运行时
- 除了 AgentExecutor,LangChain 还支持实验性运行时,如 Plan-and-execute Agent、Baby AGI 和 Auto GPT。
- 也简化了自定义运行时逻辑的创建。
记录 LangChain Agent 的示例
此示例说明了在 MLflow 中记录 LangChain Agent 的过程,突出了 LangChain 复杂的 Agent 功能与 MLflow 强大的模型管理的集成。
import os
from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.llms import OpenAI
import mlflow
# Note: Ensure that the package 'google-search-results' is installed via pypi to run this example
# and that you have a accounts with SerpAPI and OpenAI to use their APIs.
# Ensuring necessary API keys are set
assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."
assert (
"SERPAPI_API_KEY" in os.environ
), "Please set the SERPAPI_API_KEY environment variable."
# Load the language model for agent control
llm = OpenAI(temperature=0)
# Next, let's load some tools to use. Note that the `llm-math` tool uses an LLM, so we need to pass that in.
tools = load_tools(["serpapi", "llm-math"], llm=llm)
# Finally, let's initialize an agent with the tools, the language model, and the type of agent we want to use.
agent = initialize_agent(
tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True
)
# Log the agent in an MLflow run
with mlflow.start_run():
logged_model = mlflow.langchain.log_model(agent, "langchain_model")
# Load the logged agent model for prediction
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
# Generate an inference result using the loaded model
question = "What was the high temperature in SF yesterday in Fahrenheit? What is that number raised to the .023 power?"
answer = loaded_model.predict([{"input": question}])
print(answer)
上述示例的输出如下所示
["1.1044000282035853"]
简单 Agent 示例展示了什么
- 复杂 Agent 记录:演示了如何将利用多种工具和语言模型的 LangChain 复杂 Agent 记录到 MLflow 中。
- 高级工具集成:展示了如何在 LangChain Agent 中使用“serpapi”和“llm-math”等额外工具,强调了该框架集成复杂功能的能力。
- Agent 初始化和使用:详细介绍了使用特定工具和模型设置初始化 LangChain Agent 的过程,以及如何使用它执行复杂的查询。
- 高效的模型管理和部署:说明了使用 MLflow 管理和部署复杂 LangChain Agent 的便捷性,从记录到预测。
使用 LangChain 和生成式 AI LLM 实现实时流式输出
通过 predict_stream
API 的流式响应仅在 MLflow 版本 >= 2.12.2 中可用。 MLflow 的先前版本不支持流式响应。
流式输出功能概述
MLflow 中集成 LangChain 使得能够从支持此功能的各种生成式 AI 语言模型 (LLM) 获取实时流式输出。此功能对于需要即时、增量响应的应用至关重要,有助于实现动态交互,例如对话式 Agent 或实时内容生成。
支持的流式模型
LangChain 旨在与任何提供流式输出功能的 LLM 无缝协作。这包括来自 OpenAI 等提供商的某些模型(例如特定版本的 ChatGPT),以及来自支持类似功能的其他供应商的其他 LLM。
使用 predict_stream
实现流式输出
MLflow pyfunc LangChain 特性中的 predict_stream
方法旨在处理同步输入并以流式方式提供输出。此方法通过在模型的响应可用时立即传递部分响应,而不是等待整个响应生成完成,特别有助于保持良好的用户体验。
示例用法
以下示例演示了如何使用 MLflow 中管理的 LangChain 模型设置和使用 predict_stream
函数,突出了实时响应生成能力
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAI
import mlflow
template_instructions = "Provide brief answers to technical questions about {topic} and do not answer non-technical questions."
prompt = PromptTemplate(
input_variables=["topic"],
template=template_instructions,
)
chain = LLMChain(llm=OpenAI(temperature=0.05), prompt=prompt)
with mlflow.start_run():
model_info = mlflow.langchain.log_model(chain, "tech_chain")
# Assuming the model is already logged in MLflow and loaded
loaded_model = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# Simulate a single synchronous input
input_data = "Hello, can you explain streaming outputs?"
# Generate responses in a streaming fashion
response_stream = loaded_model.predict_stream(input_data)
for response_part in response_stream:
print("Streaming Response Part:", response_part)
# Each part of the response is handled as soon as it is generated
与回调函数的高级集成
LangChain 的架构还支持在流式输出上下文中使用回调函数。这些回调函数可以通过在流式处理过程中触发操作来增强功能,例如记录中间响应或在交付之前修改它们。
回调处理程序的大多数用途涉及记录链或 Retriever 中对各种服务和工具调用的踪迹。为了简单起见,下面展示了一个简单的 stdout
回调处理程序。实际应用中的回调处理程序必须是 LangChain 的 BaseCallbackHandler
类的子类。
from langchain_core.callbacks import StdOutCallbackHandler
handler = StdOutCallbackHandler()
# Attach callback to enhance the streaming process
response_stream = loaded_model.predict_stream(input_data, callback_handlers=[handler])
for enhanced_response in response_stream:
print("Enhanced Streaming Response:", enhanced_response)
这些示例和解释展示了开发者如何在 MLflow 中利用 LangChain 模型的实时流式输出功能,从而创建高度响应和交互式的应用。
使用 MLflow 增强对 RetrievalQA 链的管理
LangChain 与 MLflow 的集成引入了一种更有效的方式来管理和利用 RetrievalQA 链,这是 LangChain 能力的关键方面。这些链巧妙地将数据检索与问答过程相结合,利用了语言模型的优势。
关于 RetrievalQA 链的关键见解
-
RetrievalQA 链功能:这些链代表了 LangChain 的一个复杂特性,其中信息检索与基于语言模型的问答无缝融合。它们在需要语言模型查阅特定数据或文档以获得准确响应的场景中表现出色。
-
Retrieval 对象的角色:RetrievalQA 链的核心是 retriever 对象,负责根据查询获取相关文档或数据。
RAG 流程详细概述
-
文档加载器:促进从各种来源加载文档,拥有 100 多种加载器和集成。
-
文档转换器:通过转换和分割文档使其成为可管理的片段,从而准备文档用于检索。
-
文本嵌入模型:生成文本的语义嵌入,提高数据检索的相关性和效率。
-
向量存储:专门用于存储和促进文本嵌入搜索的数据库。
-
Retriever:采用各种检索技术,从简单的语义搜索到更复杂的方法,如 Parent Document Retriever 和 Ensemble Retriever。
使用 MLflow 阐明向量数据库管理
-
传统的 LangChain 序列化:LangChain 通常需要手动管理 retriever 对象的序列化,包括处理向量数据库。
-
MLflow 的简化:MLflow 中的
langchain
特性极大地简化了此过程。它自动执行序列化,管理persist_dir
的内容以及loader_fn
函数的 pickle 化。
关键的 MLflow 组件和 VectorDB 日志记录
-
persist_dir:指定 retriever 对象(包括向量数据库)的存储位置。
-
loader_fn:详细说明用于从存储位置加载 retriever 对象的函数。
重要考虑事项
-
VectorDB 日志记录:MLflow 通过其
langchain
特性管理作为 retriever 对象一部分的向量数据库。但是,向量数据库本身不会在 MLflow 中作为单独的实体被显式记录。 -
运行时 VectorDB 维护:保持训练和运行时环境之间向量数据库的一致性至关重要。虽然 MLflow 管理 retriever 对象的序列化,但确保在运行时可以访问相同的向量数据库对于保持一致的性能仍然至关重要。
记录 LangChain RetrievalQA 链的示例
import os
import tempfile
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."
with tempfile.TemporaryDirectory() as temp_dir:
persist_dir = os.path.join(temp_dir, "faiss_index")
# Create the vector db, persist the db to a local fs folder
loader = TextLoader("tests/langchain/state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
# Create the RetrievalQA chain
retrievalQA = RetrievalQA.from_llm(llm=OpenAI(), retriever=db.as_retriever())
# Log the retrievalQA chain
def load_retriever(persist_directory):
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
retrievalQA,
artifact_path="retrieval_qa",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(
loaded_model.predict(
[{"query": "What did the president say about Ketanji Brown Jackson"}]
)
)
上述示例的输出如下所示
[" The president said..."]
在 MLflow 中记录和评估 LangChain Retriever
MLflow 中的 langchain
特性扩展了其功能,包括对 retriever 对象的记录和独立评估。此功能对于在无需通过大型语言模型 (LLM) 处理的情况下评估 retriever 检索到的文档质量特别有价值。
记录单个 Retriever 的目的
- 独立评估:允许评估 retriever 在获取相关文档方面的性能,独立于它们随后在 LLM 中的使用。
- 质量保证:有助于评估 retriever 在获取准确且符合上下文的文档方面的有效性。
在 MLflow 中记录 Retriever 的要求
- persist_dir:指定 retriever 对象的存储位置。
- loader_fn:详细说明用于从其存储位置加载 retriever 对象的函数。
- 这些要求与记录 RetrievalQA 链的要求一致,确保过程的一致性。
记录 LangChain Retriever 的示例
import os
import tempfile
from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
import mlflow
assert (
"OPENAI_API_KEY" in os.environ
), "Please set the OPENAI_API_KEY environment variable."
with tempfile.TemporaryDirectory() as temp_dir:
persist_dir = os.path.join(temp_dir, "faiss_index")
# Create the vector database and persist it to a local filesystem folder
loader = TextLoader("tests/langchain/state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(docs, embeddings)
db.save_local(persist_dir)
# Define a loader function to recall the retriever from the persisted vectorstore
def load_retriever(persist_directory):
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.load_local(persist_directory, embeddings)
return vectorstore.as_retriever()
# Log the retriever with the loader function
with mlflow.start_run() as run:
logged_model = mlflow.langchain.log_model(
db.as_retriever(),
artifact_path="retriever",
loader_fn=load_retriever,
persist_dir=persist_dir,
)
# Load the retriever chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(
loaded_model.predict(
[{"query": "What did the president say about Ketanji Brown Jackson"}]
)
)
上述示例的输出如下所示
[
[
{
"page_content": "Tonight. I call...",
"metadata": {"source": "/state.txt"},
},
{
"page_content": "A former top...",
"metadata": {"source": "/state.txt"},
},
]
]
MLflow Langchain 自动日志记录
请参阅MLflow Langchain 自动日志记录文档,了解有关如何为 Langchain 模型启用自动日志记录的更多详细信息。