使用 MLflow 和 LlamaIndex 工作流构建高级 RAG
使用各种数据源增强大型语言模型(LLMs)是构建 LLM 应用的有效策略。然而,随着系统变得日益复杂,原型开发和迭代改进这些复杂系统也变得更具挑战性。
LlamaIndex Workflow 是构建此类复合系统的优秀框架。与 MLflow 结合使用,Workflow API 在开发周期中带来了效率和鲁棒性,从而简化了调试、实验追踪和评估,以实现持续改进。
在本博客中,我们将详细介绍如何使用 LlamaIndex 的 Workflow API 和 MLflow 构建一个复杂的聊天机器人。
什么是 LlamaIndex Workflow?
LlamaIndex Workflow 是一个事件驱动的编排框架,用于设计动态 AI 应用程序。LlamaIndex Workflow 的核心包括:
-
Steps
(步骤)是执行单元,代表工作流中的不同操作。 -
Events
(事件)触发这些步骤,作为控制工作流流程的信号。 -
Workflow
(工作流)将这两者连接为一个 Python 类。每个步骤都作为工作流类的一个方法实现,并定义了输入和输出事件。
这种简单而强大的抽象允许您将复杂任务分解为可管理的步骤,从而实现更大的灵活性和可伸缩性。作为一个体现事件驱动设计的框架,使用 Workflow
API 可以直观地设计并行和异步执行流,显著提高长时间运行任务的效率,并有助于提供生产就绪的可伸缩性。
为什么将 MLflow 与 LlamaIndex Workflow 结合使用?
Workflow 提供了极大的灵活性,可以设计几乎任意的执行流程。然而,能力越大,责任越大。如果不妥善管理您的更改,它可能会变成一个由不确定状态和混乱配置组成的混乱局面。在经历几十次更改后,您可能会问自己:“我的工作流到底是怎么运行的?”
MLflow 为 LlamaIndex Workflows 在端到端开发周期中带来了强大的 MLOps 支撑。
-
实验追踪:MLflow 允许您记录各种组件,如步骤、提示词、LLM 和工具,从而方便系统进行迭代改进。
-
可复现性:MLflow 打包了环境信息,例如全局配置(
Settings
)、库版本和元数据,以确保在机器学习生命周期的不同阶段实现一致的部署。 -
追踪:在复杂的事件驱动工作流中调试问题十分繁琐。MLflow Tracing 是一种生产就绪的可观测性解决方案,与 LlamaIndex 原生集成,让您能够观察工作流中的每个内部阶段。
-
评估:衡量是改进模型的关键任务。MLflow Evaluation 是评估 LLM 应用质量、速度和成本的强大工具。它与 MLflow 的实验追踪功能紧密集成,简化了迭代改进的过程。
开始构建!🛠️
策略:采用多种检索方法的混合方法
检索增强生成(RAG)是一个强大的框架,但检索步骤常常成为瓶颈,因为基于嵌入的检索可能无法始终捕获最相关的上下文。尽管存在许多技术可以提高检索质量,但没有一种解决方案是普遍适用的。因此,一种有效的策略是结合多种检索方法。
我们在此探讨的概念是并行运行多种检索方法:(1) 标准向量搜索,(2) 基于关键词的搜索(BM25),以及 (3) 网页搜索。然后合并检索到的上下文,并过滤掉不相关的数据以提高整体质量。
我们如何将这一概念变为现实?让我们深入探索,使用 LlamaIndex Workflow 和 MLflow 构建这个混合 RAG。
1. 设置代码库
包含环境设置脚本的示例代码可在GitHub 仓库中找到。它包含了完整的工作流定义、一个实践笔记本和一个用于运行实验的示例数据集。要将其克隆到您的工作环境,请使用以下命令:
git clone https://github.com/mlflow/mlflow.git
克隆仓库后,通过运行以下命令设置虚拟环境:
cd mlflow/examples/llama_index/workflow
chmod +x install.sh
./install.sh
安装完成后,在 Poetry 环境中启动 Jupyter Notebook,使用以下命令:
poetry run jupyter notebook
接下来,打开位于根目录下的 Tutorial.ipynb
笔记本。在本博客中,我们将全程讲解此笔记本,引导您完成开发过程。
2. 启动 MLflow 实验
一个 MLflow 实验 是您追踪模型开发所有方面的地方,包括模型定义、配置、参数、依赖版本等。让我们首先创建一个名为“LlamaIndex Workflow RAG”的新 MLflow 实验。
import mlflow
mlflow.set_experiment("LlamaIndex Workflow RAG")
此时,实验尚未记录任何数据。要在 MLflow UI 中查看实验,请打开一个新终端并运行 mlflow ui
命令,然后导航到浏览器中提供的 URL。
poetry run mlflow ui
3. 选择您的 LLM 和嵌入模型
现在,将您偏好的 LLM 和嵌入模型设置到 LlamaIndex 的 Settings 对象中。这些模型将在 LlamaIndex 组件中全程使用。
为了本次演示,我们将使用 OpenAI 模型,但您可以根据笔记本中的说明轻松切换到不同的 LLM 提供商或本地模型。
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter OpenAI API Key")
from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
# LlamaIndex by default uses OpenAI APIs for LLMs and embeddings models. You can use the default
# model (`gpt-3.5-turbo` and `text-embeddings-ada-002` as of Oct 2024), but we recommend using the
# latest efficient models instead for getting better results with lower cost.
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")
Settings.llm = OpenAI(model="gpt-4o-mini")
💡 MLflow 在记录模型时会自动将 Settings
配置记录到您的 MLflow 实验中,确保可复现性并减少环境差异的风险。
4. 设置网页搜索 API
在本博客的稍后部分,我们将为 QA 机器人添加网页搜索功能。我们将使用 Tavily AI,这是一个为 LLM 应用优化的搜索 API,并与 LlamaIndex 原生集成。访问其网站可免费获取 API 密钥,或者使用其他与 LlamaIndex 集成的搜索引擎,例如GoogleSearchToolSpec。
获取 API 密钥后,将其设置为环境变量:
os.environ["TAVILY_AI_API_KEY"] = getpass.getpass("Enter Tavily AI API Key")
5. 设置文档索引以进行检索
下一步是为从 MLflow 文档中检索而构建文档索引。data
目录中的 urls.txt
文件包含 MLflow 文档页面的列表。这些页面可以使用网页读取器工具加载为文档对象。
from llama_index.readers.web import SimpleWebPageReader
with open("data/urls.txt", "r") as file:
urls = [line.strip() for line in file if line.strip()]
documents = SimpleWebPageReader(html_to_text=True).load_data(urls)
接下来,将这些文档摄入到向量数据库中。在本教程中,我们将使用 Qdrant 向量存储,如果自托管,它是免费的。如果您的机器上安装了 Docker,可以通过运行官方 Docker 容器来启动 Qdrant 数据库:
$ docker pull qdrant/qdrant
$ docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/.qdrant_storage:/qdrant/storage:z \
qdrant/qdrant
容器运行后,您可以创建一个连接到 Qdrant 数据库的索引对象:
import qdrant_client
from llama_index.vector_stores.qdrant import QdrantVectorStore
client = qdrant_client.QdrantClient(host="localhost", port=6333)
vector_store = QdrantVectorStore(client=client, collection_name="mlflow_doc")
from llama_index.core import StorageContext, VectorStoreIndex
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents=documents,
storage_context=storage_context
)
当然,您可以在此处使用您偏好的向量存储。LlamaIndex 支持多种向量数据库,例如 FAISS、Chroma 和 Databricks Vector Search。如果您选择其他选项,请遵循相关的 LlamaIndex 文档并相应地更新 workflow/workflow.py
文件。
除了评估向量搜索检索之外,我们稍后还将评估基于关键词的检索器(BM25)。让我们设置本地文档存储,以在工作流中启用 BM25 检索。
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
splitter = SentenceSplitter(chunk_size=512)
nodes = splitter.get_nodes_from_documents(documents)
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes)
bm25_retriever.persist(".bm25_retriever")
6. 定义工作流
现在环境和数据源已准备就绪,我们可以构建工作流并进行实验。完整的工作流代码定义在 workflow
目录中。让我们探讨一下实现中的一些关键组件。
事件
workflow/events.py
文件定义了工作流中使用的所有事件。这些是简单的 Pydantic 模型,用于在工作流步骤之间传递信息。例如,VectorSearchRetrieveEvent
通过传递用户查询来触发向量搜索步骤。
class VectorSearchRetrieveEvent(Event):
"""Event for triggering VectorStore index retrieval step."""
query: str
提示词
在工作流执行过程中,我们会多次调用 LLM。这些 LLM 调用的提示词模板定义在 workflow/prompts.py
文件中。
工作流类
主要工作流类定义在 workflow/workflow.py
中。让我们分解一下它的工作原理。
构造函数接受一个 retrievers
参数,该参数指定了工作流中要使用的检索方法。例如,如果传递 ["vector_search", "bm25"]
,则工作流将执行向量搜索和基于关键词的搜索,跳过网页搜索。
💡 动态决定使用哪些检索器,使我们能够在不复制代码的情况下,尝试不同的检索策略。
class HybridRAGWorkflow(Workflow):
VALID_RETRIEVERS = {"vector_search", "bm25", "web_search"}
def __init__(self, retrievers=None, **kwargs):
super().__init__(**kwargs)
self.llm = Settings.llm
self.retrievers = retrievers or []
if invalid_retrievers := set(self.retrievers) - self.VALID_RETRIEVERS:
raise ValueError(f"Invalid retrievers specified: {invalid_retrievers}")
self._use_vs_retriever = "vector_search" in self.retrievers
self._use_bm25_retriever = "bm25" in self.retrievers
self._use_web_search = "web_search" in self.retrievers
if self._use_vs_retriever:
qd_client = qdrant_client.QdrantClient(host=_QDRANT_HOST, port=_QDRANT_PORT)
vector_store = QdrantVectorStore(client=qd_client, collection_name=_QDRANT_COLLECTION_NAME)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
self.vs_retriever = index.as_retriever()
if self._use_bm25_retriever:
self.bm25_retriever = BM25Retriever.from_persist_dir(_BM25_PERSIST_DIR)
if self._use_web_search:
self.tavily_tool = TavilyToolSpec(api_key=os.environ.get("TAVILY_AI_API_KEY"))
工作流通过执行一个以 StartEvent
为输入的步骤开始,在本例中是 route_retrieval
步骤。此步骤检查 retrievers
参数并触发必要的检索步骤。通过使用上下文对象的 send_event()
方法,可以从这一个步骤并行调度多个事件。
# If no retriever is specified, proceed directly to the final query step with an empty context
if len(self.retrievers) == 0:
return QueryEvent(context="")
# Trigger the retrieval steps based on the configuration
if self._use_vs_retriever:
ctx.send_event(VectorSearchRetrieveEvent(query=query))
if self._use_bm25_retriever:
ctx.send_event(BM25RetrieveEvent(query=query))
if self._use_web_search:
ctx.send_event(TransformQueryEvent(query=query))
检索步骤很简单。然而,网页搜索步骤更为高级,因为它包含一个额外的步骤,使用 LLM 将用户的提问转换为适合搜索的查询。
所有检索步骤的结果都在 gather_retrieval_results
步骤中聚合。在这里,ctx.collect_events()
方法用于轮询异步执行步骤的结果。
results = ctx.collect_events(ev, [RetrievalResultEvent] * len(self.retrievers))
传递来自多个检索器的所有结果常常导致上下文庞大,包含不相关或重复的内容。为了解决这个问题,我们需要过滤并选择最相关的结果。虽然基于分值的方法很常见,但网页搜索结果不返回相似度分值。因此,我们使用 LLM 来排序并过滤掉不相关结果。重排序步骤通过利用与 RankGPT 的内置重排序器集成来实现这一点。
reranker = RankGPTRerank(llm=self.llm, top_n=5)
reranked_nodes = reranker.postprocess_nodes(ev.nodes, query_str=query)
reranked_context = "\n".join(node.text for node in reranked_nodes)
最后,重排序后的上下文与用户查询一起传递给 LLM,以生成最终答案。结果以包含 result
键的 StopEvent
返回。
@step
async def query_result(self, ctx: Context, ev: QueryEvent) -> StopEvent:
"""Get result with relevant text."""
query = await ctx.get("query")
prompt = FINAL_QUERY_TEMPLATE.format(context=ev.context, query=query)
response = self.llm.complete(prompt).text
return StopEvent(result=response)
现在,让我们实例化工作流并运行它。
# Workflow with VS + BM25 retrieval
from workflow.workflow import HybridRAGWorkflow
workflow = HybridRAGWorkflow(retrievers=["vector_search", "bm25"], timeout=60)
response = await workflow.run(query="Why use MLflow with LlamaIndex?")
print(response)
7. 在 MLflow 实验中记录工作流
现在我们希望使用各种不同的检索策略运行工作流并评估其性能。然而,在运行评估之前,我们将把模型记录到 MLflow 中,以便在 MLflow 实验中追踪模型及其性能。
对于 LlamaIndex Workflow,我们使用新的Model-from-code 方法,该方法将模型记录为独立的 Python 脚本。这种方法避免了与 pickle 等序列化方法相关的风险和不稳定性,而是依赖代码作为模型定义的唯一真相来源。当与 MLflow 的环境冻结功能结合使用时,它提供了一种可靠的方式来持久化模型。有关更多详细信息,请参阅MLflow 文档。
💡 在 workflow
目录中,有一个 model.py
脚本,它导入了 HybridRAGWorkflow
并在记录时通过 model_config
参数传入动态配置来实例化它。这种设计允许您追踪具有不同配置的模型,而无需重复模型定义。
我们将启动一个 MLflow Run,并使用 mlflow.llama_index.log_model() API 记录 model.py
模型脚本及其不同的配置。
# Different configurations we will evaluate. We don't run evaluation for all permutation
# for demonstration purpose, but you can add as many patterns as you want.
run_name_to_retrievers = {
# 1. No retrievers (prior knowledge in LLM).
"none": [],
# 2. Vector search retrieval only.
"vs": ["vector_search"],
# 3. Vector search and keyword search (BM25)
"vs + bm25": ["vector_search", "bm25"],
# 4. All retrieval methods including web search.
"vs + bm25 + web": ["vector_search", "bm25", "web_search"],
}
# Create an MLflow Run and log model with each configuration.
models = []
for run_name, retrievers in run_name_to_retrievers.items():
with mlflow.start_run(run_name=run_name):
model_info = mlflow.llama_index.log_model(
# Specify the model Python script.
llama_index_model="workflow/model.py",
# Specify retrievers to use.
model_config={"retrievers": retrievers},
# Define dependency files to save along with the model
code_paths=["workflow"],
# Subdirectory to save artifacts (not important)
artifact_path="model",
)
models.append(model_info)
现在再次打开 MLflow UI,这次应该显示已记录了 4 个具有不同 retrievers
参数值的 MLflow Runs。通过点击每个 Run 名称并导航到“Artifacts”选项卡,您可以看到 MLflow 记录了模型以及各种元数据,例如依赖版本和设置。
8. 启用 MLflow 追踪
在运行评估之前,还有最后一步:启用 MLflow 追踪。我们稍后会深入探讨此功能及其原因,但现在,您可以通过一个简单的单行命令来启用它。MLflow 将自动追踪每个 LlamaIndex 执行。
mlflow.llama_index.autolog()
9. 使用不同的检索器策略评估工作流
示例仓库包含一个名为 mlflow_qa_dataset.csv
的评估数据集,其中包含 30 对与 MLflow 相关的问答对。
import pandas as pd
eval_df = pd.read_csv("data/mlflow_qa_dataset.csv")
display(eval_df.head(3))
要评估工作流,请使用 mlflow.evaluate() API,它需要 (1) 您的数据集,(2) 已记录的模型,以及 (3) 您要计算的指标。
from mlflow.metrics import latency
from mlflow.metrics.genai import answer_correctness
for model_info in models:
with mlflow.start_run(run_id=model_info.run_id):
result = mlflow.evaluate(
# Pass the URI of the logged model above
model=model_info.model_uri,
data=eval_df,
# Specify the column for ground truth answers.
targets="ground_truth",
# Define the metrics to compute.
extra_metrics=[
latency(),
answer_correctness("openai:/gpt-4o-mini"),
],
# The answer_correctness metric requires "inputs" column to be
# present in the dataset. We have "query" instead so need to
# specify the mapping in `evaluator_config` parameter.
evaluator_config={"col_mapping": {"inputs": "query"}},
)
在此示例中,我们使用两个指标评估模型:
- 延迟:衡量执行单个查询的工作流所需的时间。
- 答案正确性:根据事实真相评估答案的准确性,由 OpenAI GPT-4o 模型以 1-5 分制评分。
这些指标仅用于演示目的——您可以添加额外的指标,例如毒性(toxicity)或忠实度(faithfulness),甚至可以创建自己的指标。有关完整的内置指标集以及如何定义自定义指标,请参阅 MLflow 文档。
评估过程将需要几分钟。完成后,您可以在 MLflow UI 中查看结果。打开实验页面,点击运行列表上方的图表图标 📈。
*💡 评估结果可能会因模型设置和一些随机性而有所不同。
第一行显示答案正确性指标的条形图,第二行显示延迟结果。表现最佳的组合是“向量搜索 + BM25”。有趣的是,添加网页搜索不仅显著增加了延迟,还降低了答案正确性。
为什么会这样?似乎启用网页搜索的模型给出的一些答案偏离了主题。例如,在回答关于启动模型注册表的问题时,网页搜索模型提供了与模型部署无关的答案,而“vs + bm25”模型则提供了正确的回复。
这个不正确的答案从何而来?这似乎是一个检索器问题,因为我们只更改了检索策略。然而,很难从最终结果中看出每个检索器返回了什么。为了更深入地了解幕后发生的事情,MLflow Tracing 是完美的解决方案。
10. 使用 MLflow Trace 检查质量问题
MLflow Tracing 是一项新功能,为 LLM 应用带来了可观测性。它与 LlamaIndex 无缝集成,记录工作流执行过程中所有输入、输出以及中间步骤的元数据。由于我们在一开始调用了 mlflow.llama_index.autolog()
,因此每个 LlamaIndex 操作都被追踪并记录在 MLflow 实验中。
要检查评估中特定问题的追踪,请导航到实验页面上的“Traces”选项卡。查找请求列中包含特定问题且运行名称为“vs + bm25 + web”的行。点击请求 ID 链接将打开 Trace UI,您可以在其中查看执行中每个步骤的详细信息,包括输入、输出、元数据和延迟。
在这种情况下,我们通过检查重排序器(reranker)步骤发现了问题。网页搜索检索器返回了与模型服务不相关的上下文,而重排序器错误地将其排名为最相关的。有了这一洞察,我们可以确定潜在的改进措施,例如优化重排序器以更好地理解 MLflow 主题,提高网页搜索的精度,甚至完全移除网页搜索检索器。
结论
在本博客中,我们探讨了 LlamaIndex 和 MLflow 的结合如何提升检索增强生成(RAG)工作流的开发,汇集了强大的模型管理和可观测性能力。通过集成多种检索策略(例如向量搜索、BM25 和网页搜索),我们展示了灵活的检索如何增强 LLM 驱动应用程序的性能。
- 实验追踪使我们能够组织和记录不同的工作流配置,确保可复现性,并使我们能够追踪多次运行中的模型性能。
- MLflow Evaluate 使我们能够轻松记录和评估具有不同检索器策略的工作流,并使用延迟和答案正确性等关键指标来比较性能。
- MLflow UI 为我们提供了清晰的可视化,展示了各种检索策略如何影响准确性和延迟,帮助我们确定最有效的配置。
- MLflow Tracing 与 LlamaIndex 集成,提供了工作流每个步骤的详细可观测性,用于诊断质量问题,例如搜索结果的错误重排序。
有了这些工具,您就拥有了一个完整的框架,用于构建、记录和优化 RAG 工作流。随着 LLM 技术的不断发展,追踪、评估和微调模型性能各个方面的能力将变得至关重要。我们强烈鼓励您进一步实验,看看这些工具如何能根据您的应用进行定制。
如需继续学习,请探索以下资源:
- 了解更多关于MLflow LlamaIndex 集成的信息。
- 在MLflow 中的 LLM 发现更多 MLflow LLM 功能。
- 使用MLflow Deployment 将您的工作流部署到服务端点。
- 查看 LlamaIndex 的更多工作流示例。