使用 MLflow 和 LlamaIndex 工作流构建高级 RAG

使用各种数据源增强 LLM 是一项构建 LLM 应用程序的强大策略。然而,随着系统的复杂性增加,原型设计和迭代改进这些更复杂的系统变得具有挑战性。
LlamaIndex Workflow 是构建此类复合系统的绝佳框架。与 MLflow 结合使用时,Workflow API 为开发周期带来了效率和稳健性,从而可以轻松调试、实验跟踪和评估,以实现持续改进。
在这篇博客中,我们将通过使用 LlamaIndex 的 Workflow API 和 MLflow 构建一个复杂的聊天机器人,完成整个过程。
什么是 LlamaIndex Workflow?
LlamaIndex Workflow 是一个事件驱动的编排框架,用于设计动态 AI 应用程序。LlamaIndex Workflow 的核心包括
-
Steps(步骤)是执行单元,代表工作流中不同的操作。 -
Events(事件)触发这些步骤,充当控制工作流流程的信号。 -
Workflow(工作流)将这两者作为一个 Python 类连接起来。每个步骤都作为工作流类的方法实现,定义了输入和输出事件。
这种简单而强大的抽象允许您将复杂的任务分解为可管理的步骤,从而实现更大的灵活性和可扩展性。作为一个体现事件驱动设计的框架,使用 Workflow API 可以直观地设计并行和异步执行流程,显著提高长时间运行任务的效率,并有助于提供可投入生产的扩展性。
为什么要在 LlamaIndex Workflow 中使用 MLflow?
Workflow 提供了极大的灵活性来设计几乎任意的执行流程。然而,伴随这种巨大能力而来的是巨大的责任。如果不妥善管理您的更改,它可能会变成一个充满不确定状态和令人困惑的配置的混乱局面。在经历了数十次更改后,您可能会问自己:“我的工作流到底是如何运行的?”
MLflow 在整个端到端开发周期中为 LlamaIndex 工作流带来了强大的 MLOps 支撑。
-
实验跟踪:MLflow 允许您记录各种组件,如步骤、提示、LLM 和工具,从而可以轻松地迭代改进系统。
-
可复现性:MLflow 会打包环境信息,如全局配置(
Settings)、库版本和元数据,以确保在 ML 生命周期的不同阶段部署一致。 -
跟踪:调试复杂的事件驱动工作流中的问题是很繁琐的。MLflow Tracing 是一种可投入生产的观测解决方案,它与 LlamaIndex 原生集成,为您提供了对工作流中每个内部阶段的可观测性。
-
评估:衡量是改进模型的关键任务。MLflow Evaluation 是评估 LLM 应用程序质量、速度和成本的绝佳工具。它与 MLflow 的实验跟踪功能紧密集成,简化了迭代改进的过程。
让我们开始构建!🛠️
策略:使用多种检索方法的混合方法
检索增强生成 (RAG) 是一个强大的框架,但检索步骤通常会成为瓶颈,因为基于嵌入的检索不一定总能捕获最相关的上下文。虽然存在许多提高检索质量的技术,但没有一种解决方案是普遍适用的。因此,有效的策略是将多种检索方法结合起来。
我们在这里探讨的概念是并行运行几种检索方法:(1)标准向量搜索,(2)基于关键字的搜索 (BM25),以及(3)网络搜索。然后合并检索到的上下文,并过滤掉不相关的数据以提高整体质量。

我们如何将这个概念付诸实践?让我们深入研究并使用 LlamaIndex Workflow 和 MLflow 来构建这个混合 RAG。
1. 设置仓库
示例代码,包括环境设置脚本,可在 GitHub 仓库中获取。它包含一个完整的工作流定义、一个实践笔记本和一个用于运行实验的示例数据集。要将其克隆到您的工作环境中,请使用以下命令
git clone https://github.com/mlflow/mlflow.git
克隆仓库后,通过运行以下命令设置虚拟环境
cd mlflow/examples/llama_index/workflow
chmod +x install.sh
./install.sh
安装完成后,在 Poetry 环境中使用以下命令启动 Jupyter Notebook
poetry run jupyter notebook
接下来,打开位于根目录下的 Tutorial.ipynb 笔记本。在整篇博客中,我们将浏览此笔记本以指导您完成开发过程。
2. 启动 MLflow 实验
MLflow 实验是您跟踪模型开发所有方面的地方,包括模型定义、配置、参数、依赖项版本等。让我们首先创建一个名为“LlamaIndex Workflow RAG”的新 MLflow 实验
import mlflow
mlflow.set_experiment("LlamaIndex Workflow RAG")
此时,实验还没有任何记录的数据。要查看 MLflow UI 中的实验,请打开一个新的终端并运行 mlflow ui 命令,然后在浏览器中导航到提供的 URL
poetry run mlflow ui

3. 选择您的 LLM 和嵌入模型
现在,将您首选的 LLM 和嵌入模型设置到 LlamaIndex 的 Settings 对象中。这些模型将在整个 LlamaIndex 组件中使用。
对于本次演示,我们将使用 OpenAI 模型,但您可以按照笔记本中的说明轻松切换到不同的 LLM 提供商或本地模型。
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter OpenAI API Key")
from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
# LlamaIndex by default uses OpenAI APIs for LLMs and embeddings models. You can use the default
# model (`gpt-3.5-turbo` and `text-embeddings-ada-002` as of Oct 2024), but we recommend using the
# latest efficient models instead for getting better results with lower cost.
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")
Settings.llm = OpenAI(model="gpt-4o-mini")
💡 MLflow 会在记录模型时自动将 Settings 配置记录到您的 MLflow 实验中,确保可复现性并降低不同阶段环境之间出现差异的风险。
4. 设置网络搜索 API
稍后在这篇博客中,我们将为 QA 机器人添加网络搜索功能。我们将使用 Tavily AI,这是一个针对 LLM 应用优化并与 LlamaIndex 原生集成的搜索 API。请访问 他们的网站免费获取 API 密钥,或者使用与 LlamaIndex 集成的其他搜索引擎,例如 GoogleSearchToolSpec。
获取 API 密钥后,将其设置为环境变量
os.environ["TAVILY_AI_API_KEY"] = getpass.getpass("Enter Tavily AI API Key")
5. 设置文档索引以用于检索
下一步是从 MLflow 文档构建一个用于检索的文档索引。data 目录中的 urls.txt 文件包含 MLflow 文档页面的列表。可以使用网页读取器工具将这些页面加载为文档对象。
from llama_index.readers.web import SimpleWebPageReader
with open("data/urls.txt", "r") as file:
urls = [line.strip() for line in file if line.strip()]
documents = SimpleWebPageReader(html_to_text=True).load_data(urls)
接下来,将这些文档摄取到向量数据库中。在本教程中,我们将使用 Qdrant 向量存储,如果自托管则是免费的。如果您的机器上安装了 Docker,可以通过运行官方 Docker 容器来启动 Qdrant 数据库
$ docker pull qdrant/qdrant
$ docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/.qdrant_storage:/qdrant/storage:z \
qdrant/qdrant
容器运行后,您可以创建一个连接到 Qdrant 数据库的索引对象
import qdrant_client
from llama_index.vector_stores.qdrant import QdrantVectorStore
client = qdrant_client.QdrantClient(host="localhost", port=6333)
vector_store = QdrantVectorStore(client=client, collection_name="mlflow_doc")
from llama_index.core import StorageContext, VectorStoreIndex
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents=documents,
storage_context=storage_context
)
当然,您可以在此处使用您首选的向量存储。LlamaIndex 支持多种向量数据库,例如 FAISS、Chroma 和 Databricks Vector Search。如果选择其他选项,请遵循相关的 LlamaIndex 文档并相应地更新 workflow/workflow.py 文件。
除了评估向量搜索检索外,我们稍后还将评估基于关键字的检索器 (BM25)。让我们设置本地文档存储以在工作流中启用 BM25 检索。
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
splitter = SentenceSplitter(chunk_size=512)
nodes = splitter.get_nodes_from_documents(documents)
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes)
bm25_retriever.persist(".bm25_retriever")
6. 定义工作流
现在环境和数据源都已准备就绪,我们可以构建工作流并对其进行实验。完整的工作流代码定义在 workflow 目录中。让我们探讨一下实现的一些关键组件。
事件
workflow/events.py 文件定义了工作流中使用的所有事件。这些是简单的 Pydantic 模型,在工作流步骤之间传递信息。例如,VectorSearchRetrieveEvent 通过传递用户的查询来触发向量搜索步骤。
class VectorSearchRetrieveEvent(Event):
"""Event for triggering VectorStore index retrieval step."""
query: str
提示
在整个工作流执行过程中,我们会多次调用 LLM。这些 LLM 调用的提示模板定义在 workflow/prompts.py 文件中。
工作流类
主要的工作流类定义在 workflow/workflow.py 中。让我们分解一下它是如何工作的。
构造函数接受一个 retrievers 参数,该参数指定工作流中要使用的检索方法。例如,如果传递了 ["vector_search", "bm25"],工作流将执行向量搜索和基于关键字的搜索,跳过网络搜索。
💡 动态决定使用哪些检索器使我们能够实验不同的检索策略,而无需复制几乎相同的模型代码。
class HybridRAGWorkflow(Workflow):
VALID_RETRIEVERS = {"vector_search", "bm25", "web_search"}
def __init__(self, retrievers=None, **kwargs):
super().__init__(**kwargs)
self.llm = Settings.llm
self.retrievers = retrievers or []
if invalid_retrievers := set(self.retrievers) - self.VALID_RETRIEVERS:
raise ValueError(f"Invalid retrievers specified: {invalid_retrievers}")
self._use_vs_retriever = "vector_search" in self.retrievers
self._use_bm25_retriever = "bm25" in self.retrievers
self._use_web_search = "web_search" in self.retrievers
if self._use_vs_retriever:
qd_client = qdrant_client.QdrantClient(host=_QDRANT_HOST, port=_QDRANT_PORT)
vector_store = QdrantVectorStore(client=qd_client, collection_name=_QDRANT_COLLECTION_NAME)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
self.vs_retriever = index.as_retriever()
if self._use_bm25_retriever:
self.bm25_retriever = BM25Retriever.from_persist_dir(_BM25_PERSIST_DIR)
if self._use_web_search:
self.tavily_tool = TavilyToolSpec(api_key=os.environ.get("TAVILY_AI_API_KEY"))
工作流首先执行一个步骤,该步骤以 StartEvent 作为输入,即本例中的 route_retrieval 步骤。此步骤检查 retrievers 参数并触发必要的检索步骤。通过使用上下文对象的 send_event() 方法,可以从这个单一的步骤中并行地分派多个事件。
# If no retriever is specified, proceed directly to the final query step with an empty context
if len(self.retrievers) == 0:
return QueryEvent(context="")
# Trigger the retrieval steps based on the configuration
if self._use_vs_retriever:
ctx.send_event(VectorSearchRetrieveEvent(query=query))
if self._use_bm25_retriever:
ctx.send_event(BM25RetrieveEvent(query=query))
if self._use_web_search:
ctx.send_event(TransformQueryEvent(query=query))
检索步骤很简单。然而,网络搜索步骤更为复杂,因为它包含一个额外的步骤,使用 LLM 将用户的问题转换成适合搜索的查询。
来自所有检索步骤的结果在 gather_retrieval_results 步骤中进行聚合。在这里,使用 ctx.collect_events() 方法来轮询异步执行步骤的结果。
results = ctx.collect_events(ev, [RetrievalResultEvent] * len(self.retrievers))
传递来自多个检索器的所有结果通常会导致上下文很大,其中包含不相关或重复的内容。为了解决这个问题,我们需要过滤和选择最相关的结果。虽然基于分数的方法很常见,但网络搜索结果不返回相似性分数。因此,我们使用 LLM 对不相关的结果进行排序和过滤。rerank 步骤通过利用与 RankGPT 的内置重排序集成来实现此目的。
reranker = RankGPTRerank(llm=self.llm, top_n=5)
reranked_nodes = reranker.postprocess_nodes(ev.nodes, query_str=query)
reranked_context = "\n".join(node.text for node in reranked_nodes)
最后,重新排序的上下文与用户查询一起传递给 LLM,以生成最终答案。结果以带有 result 键的 StopEvent 返回。
@step
async def query_result(self, ctx: Context, ev: QueryEvent) -> StopEvent:
"""Get result with relevant text."""
query = await ctx.get("query")
prompt = FINAL_QUERY_TEMPLATE.format(context=ev.context, query=query)
response = self.llm.complete(prompt).text
return StopEvent(result=response)
现在,让我们实例化工作流并运行它。
# Workflow with VS + BM25 retrieval
from workflow.workflow import HybridRAGWorkflow
workflow = HybridRAGWorkflow(retrievers=["vector_search", "bm25"], timeout=60)
response = await workflow.run(query="Why use MLflow with LlamaIndex?")
print(response)
7. 在 MLflow 实验中记录工作流
现在我们希望使用各种不同的检索策略运行工作流,并评估每种策略的性能。但在运行评估之前,我们将使用 MLflow 记录模型,以便在 MLflow 实验中跟踪模型及其性能。
对于 LlamaIndex Workflow,我们使用新的 来自代码的模型 (Model-from-code) 方法,它将模型记录为独立的 Python 脚本。这种方法避免了与序列化方法(如 pickle)相关的风险和不稳定性,而是依赖代码作为模型定义的单一事实来源。当与 MLflow 的免环境冻结功能结合使用时,它提供了一种持久化模型的可靠方法。有关更多详细信息,请参阅 MLflow 文档。
💡 在 workflow 目录中,有一个 model.py 脚本,它导入 HybridRAGWorkflow 并在记录期间通过 model_config 参数使用动态配置进行实例化。这种设计允许您跟踪具有不同配置的模型,而无需重复模型定义。
我们将启动一个 MLflow Run,并使用 mlflow.llama_index.log_model() API 使用不同的配置记录模型脚本 model.py。
# Different configurations we will evaluate. We don't run evaluation for all permutation
# for demonstration purpose, but you can add as many patterns as you want.
run_name_to_retrievers = {
# 1. No retrievers (prior knowledge in LLM).
"none": [],
# 2. Vector search retrieval only.
"vs": ["vector_search"],
# 3. Vector search and keyword search (BM25)
"vs + bm25": ["vector_search", "bm25"],
# 4. All retrieval methods including web search.
"vs + bm25 + web": ["vector_search", "bm25", "web_search"],
}
# Create an MLflow Run and log model with each configuration.
models = []
for run_name, retrievers in run_name_to_retrievers.items():
with mlflow.start_run(run_name=run_name):
model_info = mlflow.llama_index.log_model(
# Specify the model Python script.
llama_index_model="workflow/model.py",
# Specify retrievers to use.
model_config={"retrievers": retrievers},
# Define dependency files to save along with the model
code_paths=["workflow"],
# Subdirectory to save artifacts (not important)
artifact_path="model",
)
models.append(model_info)
现在再次打开 MLflow UI,这次它应该显示记录了 4 个 MLflow Run,具有不同的 retrievers 参数值。单击每个 Run 名称并导航到“Artifacts”选项卡,您可以看到 MLflow 记录了模型和各种元数据,例如依赖项版本和设置。

8. 启用 MLflow 跟踪
在运行评估之前,最后一步是启用 MLflow 跟踪。我们稍后将深入探讨此功能以及我们在此处执行此操作的原因,但目前,您可以通过一个简单的单行命令启用它。MLflow 将自动跟踪每次 LlamaIndex 执行。
mlflow.llama_index.autolog()
9. 使用不同的检索器策略评估工作流
示例仓库包含一个示例评估数据集 mlflow_qa_dataset.csv,其中包含 30 个与 MLflow 相关的问答对。
import pandas as pd
eval_df = pd.read_csv("data/mlflow_qa_dataset.csv")
display(eval_df.head(3))
要评估工作流,请使用 mlflow.evaluate() API,它需要(1)您的数据集,(2)记录的模型,以及(3)您想要计算的指标。
from mlflow.metrics import latency
from mlflow.metrics.genai import answer_correctness
for model_info in models:
with mlflow.start_run(run_id=model_info.run_id):
result = mlflow.evaluate(
# Pass the URI of the logged model above
model=model_info.model_uri,
data=eval_df,
# Specify the column for ground truth answers.
targets="ground_truth",
# Define the metrics to compute.
extra_metrics=[
latency(),
answer_correctness("openai:/gpt-4o-mini"),
],
# The answer_correctness metric requires "inputs" column to be
# present in the dataset. We have "query" instead so need to
# specify the mapping in `evaluator_config` parameter.
evaluator_config={"col_mapping": {"inputs": "query"}},
)
在此示例中,我们使用以下两个指标评估模型
- 延迟:衡量执行工作流处理单个查询所需的时间。
- 答案正确性:根据地面实况评估答案的准确性,使用 OpenAI GPT-4o 模型按 1-5 分制评分。
这些指标仅用于演示目的——您可以添加额外的指标,如毒性或忠实度,甚至创建自己的指标。请参阅 MLflow 文档,了解全套内置指标以及如何定义自定义指标。
评估过程需要几分钟时间。完成后,您可以在 MLflow UI 中查看结果。打开“实验”页面,然后单击 Run 列表上方的图表图标 📈。

*💡 评估结果可能因模型设置和一些随机性而异。
第一行显示答案正确性指标的条形图,第二行显示延迟结果。性能最佳的组合是“向量搜索 + BM25”。有趣的是,添加网络搜索不仅显著增加了延迟,还降低了答案的正确性。
为什么会发生这种情况?看起来启用网络搜索的某些答案与主题无关。例如,对于有关启动模型注册表的问题,网络搜索模型提供了与模型部署无关的答案,而“vs + bm25”模型则提供了正确的响应。

这个不正确的答案来自哪里?这似乎是一个检索器问题,因为我们只更改了检索策略。然而,从最终结果中很难看出每个检索器返回了什么。要深入了解幕后发生的事情,MLflow Tracing 是完美的解决方案。
10. 使用 MLflow 跟踪检查质量问题
MLflow Tracing 是一项新功能,为 LLM 应用程序带来了可观测性。它与 LlamaIndex 无缝集成,记录了工作流执行期间每个中间步骤的所有输入、输出和元数据。由于我们在开始时调用了 mlflow.llama_index.autolog(),因此每次 LlamaIndex 操作都已被跟踪并记录在 MLflow 实验中。
要检查评估中特定问题的跟踪,请导航到实验页面上的“Traces”选项卡。查找请求列中包含特定问题且 Run 名称为“vs + bm25 + web”的行。单击请求 ID 链接会打开 Trace UI,您可以在其中查看执行中每个步骤的详细信息,包括输入、输出、元数据和延迟。

在这种情况下,我们通过检查 reranker 步骤确定了问题。网络搜索检索器返回了与模型服务无关的上下文,并且重排序器错误地将其排为最相关。有了这些见解,我们可以确定潜在的改进措施,例如改进重排序器以更好地理解 MLflow 主题、提高网络搜索的精度,甚至完全移除网络搜索检索器。
结论
在这篇博客中,我们探讨了 LlamaIndex 和 MLflow 的组合如何提升检索增强生成 (RAG) 工作流的开发,将强大的模型管理和可观测性功能结合在一起。通过集成多种检索策略(如向量搜索、BM25 和网络搜索),我们展示了灵活的检索如何增强 LLM 驱动的应用程序的性能。
- 实验跟踪使我们能够组织和记录不同的工作流配置,确保可复现性并使我们能够跟踪多个 Run 中模型的性能。
- MLflow Evaluate 使我们能够轻松地使用不同检索策略记录和评估工作流,使用延迟和答案正确性等关键指标来比较性能。
- MLflow UI 为我们清晰地展示了各种检索策略如何影响准确性和延迟,帮助我们确定最有效的配置。
- MLflow Tracing 与 LlamaIndex 集成,为诊断质量问题(例如搜索结果的错误重排序)提供了工作流每个步骤的详细可观测性。
有了这些工具,您就拥有了一个用于构建、记录和优化 RAG 工作流的完整框架。随着 LLM 技术不断发展,跟踪、评估和微调模型性能的每个方面将变得至关重要。我们强烈建议您进一步实验,看看如何根据您自己的应用程序定制这些工具。
要继续学习,请浏览以下资源
- 了解有关 MLflow LlamaIndex 集成的更多信息。
- 在 MLflow 中的 LLM 处发现更多 MLflow LLM 功能。
- 使用 MLflow 部署将您的工作流部署到服务终端节点。
- 查看 LlamaIndex 提供的更多 Workflow 示例。
