使用 MLflow 和 LlamaIndex 工作流构建高级 RAG
使用各种数据源增强大型语言模型 (LLM) 是构建 LLM 应用程序的强大策略。然而,随着系统变得越来越复杂,原型设计和迭代改进这些复杂系统变得具有挑战性。
LlamaIndex Workflow 是构建此类复合系统的绝佳框架。结合 MLflow,Workflow API 在开发周期中带来了效率和健壮性,从而实现轻松调试、实验跟踪和评估以持续改进。
在本博客中,我们将逐步介绍使用 LlamaIndex 的 Workflow API 和 MLflow 构建一个复杂聊天机器人的过程。
什么是 LlamaIndex Workflow?
LlamaIndex Workflow 是一个事件驱动的编排框架,用于设计动态 AI 应用程序。LlamaIndex Workflow 的核心包括:
-
步骤 (Steps)
是执行单元,代表工作流中的不同操作。 -
事件 (Events)
触发这些步骤,充当控制工作流流程的信号。 -
工作流 (Workflow)
将这两者作为 Python 类连接起来。每个步骤都作为工作流类的一个方法实现,并定义了输入和输出事件。
这种简单而强大的抽象允许您将复杂任务分解为可管理的小步骤,从而实现更大的灵活性和可伸缩性。作为一个体现事件驱动设计的框架,使用 Workflow
API 可以直观地设计并行和异步执行流,显著提高长时间运行任务的效率,并有助于提供生产就绪的可伸缩性。
为什么要将 MLflow 与 LlamaIndex Workflow 结合使用?
Workflow 提供了极大的灵活性,可以设计几乎任意的执行流程。然而,权力越大,责任越大。如果不正确管理您的更改,它可能会变成一个混乱的、不确定状态和令人困惑的配置。经过几十次更改后,您可能会问自己,“我的工作流是如何工作的?”
MLflow 在 LlamaIndex 工作流的整个端到端开发周期中提供了强大的 MLOps 支撑。
-
实验跟踪:MLflow 允许您记录各种组件,如步骤、提示、LLM 和工具,从而轻松迭代改进系统。
-
可复现性:MLflow 打包了环境信息,例如全局配置 (
Settings
)、库版本和元数据,以确保 ML 生命周期不同阶段的一致部署。 -
追踪:在复杂的事件驱动工作流中调试问题非常麻烦。MLflow Tracing 是一个生产就绪的可观察性解决方案,它与 LlamaIndex 原生集成,让您能够观察工作流中的每个内部阶段。
-
评估:衡量是改进模型的关键任务。MLflow Evaluation 是评估 LLM 应用程序质量、速度和成本的绝佳工具。它与 MLflow 的实验跟踪功能紧密集成,简化了迭代改进的过程。
让我们开始构建!🛠️
策略:使用多种检索方法的混合方法
检索增强生成 (RAG) 是一个强大的框架,但检索步骤通常会成为瓶颈,因为基于嵌入的检索可能无法始终捕获最相关的上下文。虽然存在许多提高检索质量的技术,但没有一种解决方案是普遍适用的。因此,有效的策略是结合多种检索方法。
我们将在此处探讨的概念是并行运行多种检索方法:(1) 标准向量搜索,(2) 基于关键词的搜索 (BM25),以及 (3) 网络搜索。然后合并检索到的上下文,并过滤掉不相关的数据以提高整体质量。
我们如何将这个概念变为现实?让我们深入了解并使用 LlamaIndex Workflow 和 MLflow 构建这个混合 RAG。
1. 设置存储库
示例代码,包括环境设置脚本,可在 GitHub 存储库中找到。它包含完整的工作流定义、一个动手实践的笔记本和一个用于运行实验的示例数据集。要将其克隆到您的工作环境,请使用以下命令:
git clone https://github.com/mlflow/mlflow.git
克隆存储库后,通过运行以下命令设置虚拟环境:
cd mlflow/examples/llama_index/workflow
chmod +x install.sh
./install.sh
安装完成后,使用以下命令在 Poetry 环境中启动 Jupyter Notebook:
poetry run jupyter notebook
接下来,打开位于根目录中的 Tutorial.ipynb
笔记本。在整个博客中,我们将逐步介绍此笔记本,以指导您完成开发过程。
2. 启动 MLflow 实验
MLflow 实验是您跟踪模型开发所有方面的地方,包括模型定义、配置、参数、依赖版本等。让我们首先创建一个名为“LlamaIndex Workflow RAG”的新 MLflow 实验。
import mlflow
mlflow.set_experiment("LlamaIndex Workflow RAG")
此时,实验还没有任何记录数据。要在 MLflow UI 中查看实验,请打开一个新的终端并运行 mlflow ui
命令,然后导航到浏览器中提供的 URL:
poetry run mlflow ui
3. 选择您的 LLM 和嵌入模型
现在,将您首选的 LLM 和嵌入模型设置到 LlamaIndex 的 Settings 对象。这些模型将在 LlamaIndex 组件中全程使用。
在此演示中,我们将使用 OpenAI 模型,但您可以按照笔记本中的说明轻松切换到不同的 LLM 提供商或本地模型。
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter OpenAI API Key")
from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
# LlamaIndex by default uses OpenAI APIs for LLMs and embeddings models. You can use the default
# model (`gpt-3.5-turbo` and `text-embeddings-ada-002` as of Oct 2024), but we recommend using the
# latest efficient models instead for getting better results with lower cost.
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")
Settings.llm = OpenAI(model="gpt-4o-mini")
💡 MLflow 在记录模型时会自动将 Settings
配置记录到您的 MLflow 实验中,确保可复现性并减少环境之间差异的风险。
4. 设置网络搜索 API
在本博客的稍后部分,我们将为 QA 机器人添加网络搜索功能。我们将使用 Tavily AI,这是一个为 LLM 应用程序优化并与 LlamaIndex 原生集成的搜索 API。访问他们的网站以获取免费试用版 API 密钥,或者使用与 LlamaIndex 集成的其他搜索引擎,例如 GoogleSearchToolSpec。
获取 API 密钥后,将其设置为环境变量:
os.environ["TAVILY_AI_API_KEY"] = getpass.getpass("Enter Tavily AI API Key")
5. 设置用于检索的文档索引
下一步是为从 MLflow 文档检索构建文档索引。data
目录中的 urls.txt
文件包含 MLflow 文档页面的列表。这些页面可以使用网页阅读器实用程序作为文档对象加载。
from llama_index.readers.web import SimpleWebPageReader
with open("data/urls.txt", "r") as file:
urls = [line.strip() for line in file if line.strip()]
documents = SimpleWebPageReader(html_to_text=True).load_data(urls)
接下来,将这些文档摄取到向量数据库中。在本教程中,我们将使用 Qdrant 向量存储,如果自托管则免费。如果您的机器上安装了 Docker,可以通过运行官方 Docker 容器来启动 Qdrant 数据库:
$ docker pull qdrant/qdrant
$ docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/.qdrant_storage:/qdrant/storage:z \
qdrant/qdrant
容器运行后,您可以创建一个连接到 Qdrant 数据库的索引对象:
import qdrant_client
from llama_index.vector_stores.qdrant import QdrantVectorStore
client = qdrant_client.QdrantClient(host="localhost", port=6333)
vector_store = QdrantVectorStore(client=client, collection_name="mlflow_doc")
from llama_index.core import StorageContext, VectorStoreIndex
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents=documents,
storage_context=storage_context
)
当然,您可以在此处使用您首选的向量存储。LlamaIndex 支持多种向量数据库,例如 FAISS、Chroma 和 Databricks Vector Search。如果您选择替代方案,请按照相关的 LlamaIndex 文档更新 workflow/workflow.py
文件。
除了评估向量搜索检索外,我们稍后还将评估基于关键词的检索器 (BM25)。让我们设置本地文档存储,以在工作流中启用 BM25 检索。
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
splitter = SentenceSplitter(chunk_size=512)
nodes = splitter.get_nodes_from_documents(documents)
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes)
bm25_retriever.persist(".bm25_retriever")
6. 定义工作流
现在环境和数据源都已准备就绪,我们可以构建工作流并进行实验。完整的工作流代码在 workflow
目录中定义。让我们探讨一些关键的实现组件。
事件
workflow/events.py
文件定义了工作流中使用的所有事件。这些是简单的 Pydantic 模型,用于在工作流步骤之间传递信息。例如,VectorSearchRetrieveEvent
通过传递用户的查询来触发向量搜索步骤。
class VectorSearchRetrieveEvent(Event):
"""Event for triggering VectorStore index retrieval step."""
query: str
提示
在整个工作流执行过程中,我们多次调用 LLM。这些 LLM 调用的提示模板定义在 workflow/prompts.py
文件中。
工作流类
主工作流类在 workflow/workflow.py
中定义。让我们分解一下它的工作原理。
构造函数接受一个 retrievers
参数,该参数指定了工作流中要使用的检索方法。例如,如果传递了 ["vector_search", "bm25"]
,工作流将执行向量搜索和基于关键词的搜索,跳过网络搜索。
💡 动态决定使用哪些检索器使我们能够试验不同的检索策略,而无需复制几乎相同的模型代码。
class HybridRAGWorkflow(Workflow):
VALID_RETRIEVERS = {"vector_search", "bm25", "web_search"}
def __init__(self, retrievers=None, **kwargs):
super().__init__(**kwargs)
self.llm = Settings.llm
self.retrievers = retrievers or []
if invalid_retrievers := set(self.retrievers) - self.VALID_RETRIEVERS:
raise ValueError(f"Invalid retrievers specified: {invalid_retrievers}")
self._use_vs_retriever = "vector_search" in self.retrievers
self._use_bm25_retriever = "bm25" in self.retrievers
self._use_web_search = "web_search" in self.retrievers
if self._use_vs_retriever:
qd_client = qdrant_client.QdrantClient(host=_QDRANT_HOST, port=_QDRANT_PORT)
vector_store = QdrantVectorStore(client=qd_client, collection_name=_QDRANT_COLLECTION_NAME)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
self.vs_retriever = index.as_retriever()
if self._use_bm25_retriever:
self.bm25_retriever = BM25Retriever.from_persist_dir(_BM25_PERSIST_DIR)
if self._use_web_search:
self.tavily_tool = TavilyToolSpec(api_key=os.environ.get("TAVILY_AI_API_KEY"))
工作流从执行一个以 StartEvent
作为输入的步骤开始,在本例中是 route_retrieval
步骤。此步骤检查 retrievers
参数并触发必要的检索步骤。通过使用上下文对象的 send_event()
方法,可以从此单个步骤并行分派多个事件。
# If no retriever is specified, proceed directly to the final query step with an empty context
if len(self.retrievers) == 0:
return QueryEvent(context="")
# Trigger the retrieval steps based on the configuration
if self._use_vs_retriever:
ctx.send_event(VectorSearchRetrieveEvent(query=query))
if self._use_bm25_retriever:
ctx.send_event(BM25RetrieveEvent(query=query))
if self._use_web_search:
ctx.send_event(TransformQueryEvent(query=query))
检索步骤很简单。然而,网络搜索步骤更为高级,因为它包含一个额外的步骤,用于使用 LLM 将用户的问题转换为适合搜索的查询。
所有检索步骤的结果都在 gather_retrieval_results
步骤中聚合。在这里,使用 ctx.collect_events()
方法轮询异步执行步骤的结果。
results = ctx.collect_events(ev, [RetrievalResultEvent] * len(self.retrievers))
传递来自多个检索器的所有结果通常会导致上下文过大,其中包含不相关或重复的内容。为了解决这个问题,我们需要过滤并选择最相关的结果。虽然基于分值的方法很常见,但网络搜索结果不返回相似度分值。因此,我们使用 LLM 对不相关结果进行排序和过滤。重新排名步骤通过利用与 RankGPT 的内置重新排名器集成来实现这一点。
reranker = RankGPTRerank(llm=self.llm, top_n=5)
reranked_nodes = reranker.postprocess_nodes(ev.nodes, query_str=query)
reranked_context = "\n".join(node.text for node in reranked_nodes)
最后,将重新排名后的上下文与用户查询一起传递给 LLM,以生成最终答案。结果以带有 result
键的 StopEvent
返回。
@step
async def query_result(self, ctx: Context, ev: QueryEvent) -> StopEvent:
"""Get result with relevant text."""
query = await ctx.get("query")
prompt = FINAL_QUERY_TEMPLATE.format(context=ev.context, query=query)
response = self.llm.complete(prompt).text
return StopEvent(result=response)
现在,让我们实例化工作流并运行它。
# Workflow with VS + BM25 retrieval
from workflow.workflow import HybridRAGWorkflow
workflow = HybridRAGWorkflow(retrievers=["vector_search", "bm25"], timeout=60)
response = await workflow.run(query="Why use MLflow with LlamaIndex?")
print(response)
7. 在 MLflow 实验中记录工作流
现在我们希望使用各种不同的检索策略运行工作流并评估它们的性能。然而,在运行评估之前,我们将模型记录在 MLflow 中,以便在 MLflow 实验中跟踪模型及其性能。
对于 LlamaIndex Workflow,我们使用新的代码模型方法,该方法将模型记录为独立的 Python 脚本。这种方法避免了像 pickle 这样的序列化方法相关的风险和不稳定,而是依靠代码作为模型定义的唯一真相来源。当与 MLflow 的环境冻结功能结合使用时,它提供了一种可靠的模型持久化方式。有关更多详细信息,请参阅 MLflow 文档。
💡 在 workflow
目录中,有一个 model.py
脚本,它导入 HybridRAGWorkflow
并使用通过日志记录期间通过 model_config
参数传递的动态配置对其进行实例化。这种设计允许您跟踪具有不同配置的模型,而无需复制模型定义。
我们将启动一个 MLflow 运行,并使用 mlflow.llama_index.log_model() API 记录具有不同配置的模型脚本 model.py
。
# Different configurations we will evaluate. We don't run evaluation for all permutation
# for demonstration purpose, but you can add as many patterns as you want.
run_name_to_retrievers = {
# 1. No retrievers (prior knowledge in LLM).
"none": [],
# 2. Vector search retrieval only.
"vs": ["vector_search"],
# 3. Vector search and keyword search (BM25)
"vs + bm25": ["vector_search", "bm25"],
# 4. All retrieval methods including web search.
"vs + bm25 + web": ["vector_search", "bm25", "web_search"],
}
# Create an MLflow Run and log model with each configuration.
models = []
for run_name, retrievers in run_name_to_retrievers.items():
with mlflow.start_run(run_name=run_name):
model_info = mlflow.llama_index.log_model(
# Specify the model Python script.
llama_index_model="workflow/model.py",
# Specify retrievers to use.
model_config={"retrievers": retrievers},
# Define dependency files to save along with the model
code_paths=["workflow"],
# Subdirectory to save artifacts (not important)
artifact_path="model",
)
models.append(model_info)
现在再次打开 MLflow UI,这次它应该显示记录了 4 个 MLflow 运行,其 retrievers
参数值不同。通过点击每个运行名称并导航到“Artifacts”选项卡,您可以看到 MLflow 记录了模型和各种元数据,例如依赖版本和设置。
8. 启用 MLflow Tracing
在运行评估之前,还有一个最后一步:启用 MLflow Tracing。我们稍后会深入探讨此功能以及我们为什么在此处进行此操作,但现在,您可以使用简单的单行命令启用它。MLflow 将自动跟踪每个 LlamaIndex 执行。
mlflow.llama_index.autolog()
9. 使用不同的检索策略评估工作流
示例存储库包含一个示例评估数据集 mlflow_qa_dataset.csv
,其中包含 30 对与 MLflow 相关的问题-答案对。
import pandas as pd
eval_df = pd.read_csv("data/mlflow_qa_dataset.csv")
display(eval_df.head(3))
要评估工作流,请使用 mlflow.evaluate() API,该 API 需要 (1) 您的数据集,(2) 已记录的模型,以及 (3) 您要计算的指标。
from mlflow.metrics import latency
from mlflow.metrics.genai import answer_correctness
for model_info in models:
with mlflow.start_run(run_id=model_info.run_id):
result = mlflow.evaluate(
# Pass the URI of the logged model above
model=model_info.model_uri,
data=eval_df,
# Specify the column for ground truth answers.
targets="ground_truth",
# Define the metrics to compute.
extra_metrics=[
latency(),
answer_correctness("openai:/gpt-4o-mini"),
],
# The answer_correctness metric requires "inputs" column to be
# present in the dataset. We have "query" instead so need to
# specify the mapping in `evaluator_config` parameter.
evaluator_config={"col_mapping": {"inputs": "query"}},
)
在此示例中,我们使用两个指标评估模型:
- 延迟:衡量执行单个查询的工作流所需的时间。
- 答案正确性:根据真实情况评估答案的准确性,由 OpenAI GPT-4o 模型以 1-5 分制评分。
这些指标仅用于演示目的——您可以添加其他指标,如毒性或忠实度,甚至创建自己的指标。有关完整的内置指标以及如何定义自定义指标,请参阅 MLflow 文档。
评估过程将需要几分钟。完成后,您可以在 MLflow UI 中查看结果。打开实验页面并单击运行列表上方的图表图标 📈。
*💡 评估结果可能因模型设置和一些随机性而有所不同。
第一行显示答案正确性指标的条形图,而第二行显示延迟结果。表现最佳的组合是“向量搜索 + BM25”。有趣的是,添加网络搜索不仅显著增加了延迟,而且还降低了答案正确性。
为什么会发生这种情况?启用网络搜索的模型的一些答案似乎偏离了主题。例如,在回答关于启动模型注册表的问题时,网络搜索模型提供了关于模型部署的不相关答案,而“vs + bm25”模型提供了正确答案。
这个不正确的答案是从哪里来的?这似乎是一个检索器问题,因为我们只更改了检索策略。但是,很难从最终结果中看到每个检索器返回了什么。为了更深入地了解幕后发生的事情,MLflow Tracing 是完美的解决方案。
10. 使用 MLflow Trace 检查质量问题
MLflow Tracing 是一个为 LLM 应用程序带来可观测性的新功能。它与 LlamaIndex 无缝集成,记录工作流执行期间的所有输入、输出和中间步骤的元数据。由于我们在开始时调用了 mlflow.llama_index.autolog()
,因此每个 LlamaIndex 操作都已在 MLflow 实验中进行跟踪和记录。
要检查评估中特定问题的跟踪,请导航到实验页面上的“Traces”选项卡。查找请求列中包含特定问题且运行名称为“vs + bm25 + web”的行。单击请求 ID 链接将打开跟踪 UI,您可以在其中查看有关执行中每个步骤的详细信息,包括输入、输出、元数据和延迟。
在这种情况下,我们通过检查重新排名步骤发现了问题。网络搜索检索器返回了与模型服务相关的无关上下文,并且重新排名器错误地将其排名为最相关。通过这种洞察,我们可以确定潜在的改进,例如改进重新排名器以更好地理解 MLflow 主题,提高网络搜索精度,甚至完全删除网络搜索检索器。
结论
在本博客中,我们探讨了 LlamaIndex 和 MLflow 的组合如何提升检索增强生成 (RAG) 工作流的开发,将强大的模型管理和可观测性功能结合在一起。通过集成多种检索策略(例如向量搜索、BM25 和网络搜索),我们演示了灵活检索如何增强 LLM 驱动应用程序的性能。
- 实验跟踪使我们能够组织和记录不同的工作流配置,确保可复现性并使我们能够跟踪多个运行中的模型性能。
- MLflow Evaluate 使我们能够轻松记录和评估具有不同检索器策略的工作流,使用延迟和答案正确性等关键指标来比较性能。
- MLflow UI 为我们提供了关于各种检索策略如何影响准确性和延迟的清晰可视化,帮助我们识别最有效的配置。
- MLflow Tracing 与 LlamaIndex 集成,提供了对工作流每个步骤的详细可观测性,用于诊断质量问题,例如搜索结果的错误重新排名。
有了这些工具,您就拥有了一个用于构建、记录和优化 RAG 工作流的完整框架。随着 LLM 技术的不断发展,跟踪、评估和微调模型性能的各个方面的能力将变得至关重要。我们强烈鼓励您进一步实验,看看如何将这些工具应用于您自己的应用程序。
要继续学习,请探索以下资源:
- 了解有关 MLflow LlamaIndex 集成的更多信息。
- 在 MLflow 中的 LLM 发现其他 MLflow LLM 功能。
- 使用 MLflow 部署将您的工作流部署到服务端点。
- 查看 LlamaIndex 的更多 Workflow 示例。