利用各种数据源增强 LLM 是构建 LLM 应用的强大策略。然而,随着系统变得越来越复杂,对这些复杂系统进行原型设计和迭代改进也变得充满挑战。
LlamaIndex Workflow 是构建此类复合系统的绝佳框架。结合 MLflow,Workflow API 为开发周期带来效率和鲁棒性,支持轻松调试、实验跟踪和评估以实现持续改进。
在本篇博客中,我们将带您一起探索使用 LlamaIndex 的 Workflow API 和 MLflow 构建一个复杂聊天机器人的过程。
什么是 LlamaIndex Workflow?
LlamaIndex Workflow 是一个事件驱动的编排框架,用于设计动态 AI 应用程序。LlamaIndex Workflow 的核心组成部分包括
-
步骤 (Steps)
是执行单元,代表工作流中的不同动作。 -
事件 (Events)
触发这些步骤,充当控制工作流流程的信号。 -
工作流 (Workflow)
将这两者连接起来,形成一个 Python 类。每个步骤都作为工作流类的一个方法来实现,并定义了输入和输出事件。
这种简单而强大的抽象允许您将复杂任务分解为可管理的步骤,从而实现更大的灵活性和可扩展性。作为一个体现事件驱动设计的框架,使用 Workflow
API 可以直观地设计并行和异步执行流程,显着提高长时间运行任务的效率,并有助于提供生产就绪的可扩展性。
为什么将 MLflow 与 LlamaIndex Workflow 结合使用?
Workflow 提供了极大的灵活性来设计几乎任意的执行流程。然而,巨大的力量伴随着巨大的责任。如果未能妥善管理您的更改,它可能会变成一个充满不确定状态和混乱配置的无序混乱。在经过几十次更改后,您可能会问自己:“我的工作流究竟是怎么工作的?”
MLflow 为 LlamaIndex Workflows 在端到端开发周期中带来了强大的 MLOps 支持。
-
实验跟踪 (Experiment Tracking):MLflow 允许您记录各种组件,如步骤、提示、LLMs 和工具,从而轻松迭代改进系统。
-
可重现性 (Reproducibility):MLflow 打包环境信息,例如全局配置 (
Settings
)、库版本和元数据,以确保在 ML 生命周期不同阶段的一致部署。 -
追踪 (Tracing):在复杂的事件驱动工作流中调试问题很麻烦。MLflow Tracing 是一种生产就绪的可观测性解决方案,它与 LlamaIndex 原生集成,让您能够观察工作流中每个内部阶段的情况。
-
评估 (Evaluation):衡量是改进模型的关键任务。MLflow Evaluation 是一个很好的工具,用于评估 LLM 应用程序的质量、速度和成本。它与 MLflow 的实验跟踪功能紧密集成,简化了进行迭代改进的过程。
开始构建!🛠️
策略:使用多种检索方法的混合方法
检索增强生成 (RAG) 是一个强大的框架,但检索步骤通常会成为瓶颈,因为基于嵌入的检索可能无法始终捕获最相关的上下文。虽然存在许多提高检索质量的技术,但没有一种解决方案是通用的。因此,一种有效的策略是结合多种检索方法。
我们在此探索的概念是并行运行几种检索方法:(1) 标准向量搜索,(2) 基于关键词的搜索 (BM25),以及 (3) 网络搜索。然后将检索到的上下文合并,并过滤掉不相关的数据以提高整体质量。
我们如何将这个概念变为现实?让我们深入了解并使用 LlamaIndex Workflow 和 MLflow 构建这个混合 RAG。
1. 设置代码仓库
示例代码,包括环境设置脚本,可在GitHub 仓库中找到。它包含完整的工作流定义、一个实践性笔记本和用于运行实验的示例数据集。要将其克隆到您的工作环境,请使用以下命令
git clone https://github.com/mlflow/mlflow.git
克隆仓库后,通过运行以下命令设置虚拟环境
cd mlflow/examples/llama_index/workflow
chmod +x install.sh
./install.sh
安装完成后,使用以下命令在 Poetry 环境中启动 Jupyter Notebook
poetry run jupyter notebook
接下来,打开位于根目录的 Tutorial.ipynb
笔记本。在本篇博客中,我们将逐步讲解此笔记本,指导您完成开发过程。
2. 启动一个 MLflow 实验
一个 MLflow 实验 是您跟踪模型开发所有方面的地方,包括模型定义、配置、参数、依赖版本等。让我们首先创建一个名为“LlamaIndex Workflow RAG”的新 MLflow 实验
import mlflow
mlflow.set_experiment("LlamaIndex Workflow RAG")
此时,实验还没有任何记录的数据。要在 MLflow UI 中查看实验,请打开一个新的终端并运行 mlflow ui
命令,然后导航到浏览器中提供的 URL
poetry run mlflow ui
3. 选择您的 LLM 和 Embedding 模型
现在,将您首选的 LLM 和 embedding 模型设置到 LlamaIndex 的 Settings 对象中。这些模型将在 LlamaIndex 的各个组件中使用。
为了进行演示,我们将使用 OpenAI 模型,但您可以按照笔记本中的说明轻松切换到不同的 LLM 提供商或本地模型。
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter OpenAI API Key")
from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
# LlamaIndex by default uses OpenAI APIs for LLMs and embeddings models. You can use the default
# model (`gpt-3.5-turbo` and `text-embeddings-ada-002` as of Oct 2024), but we recommend using the
# latest efficient models instead for getting better results with lower cost.
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")
Settings.llm = OpenAI(model="gpt-4o-mini")
💡 MLflow 在记录模型时会自动将 Settings
配置记录到您的 MLflow 实验中,确保可重现性并降低环境之间出现差异的风险。
4. 设置网络搜索 API
在本篇博客的稍后部分,我们将为问答机器人添加网络搜索功能。我们将使用 Tavily AI,这是一个针对 LLM 应用优化的搜索 API,并与 LlamaIndex 原生集成。访问他们的网站以获取免费使用层级的 API 密钥,或者使用与 LlamaIndex 集成的不同搜索引擎,例如GoogleSearchToolSpec。
获取 API 密钥后,将其设置为环境变量
os.environ["TAVILY_AI_API_KEY"] = getpass.getpass("Enter Tavily AI API Key")
5. 设置用于检索的文档索引
下一步是构建一个用于从 MLflow 文档中检索的文档索引。data
目录中的 urls.txt
文件包含 MLflow 文档页面的列表。可以使用网页读取工具将这些页面加载为文档对象。
from llama_index.readers.web import SimpleWebPageReader
with open("data/urls.txt", "r") as file:
urls = [line.strip() for line in file if line.strip()]
documents = SimpleWebPageReader(html_to_text=True).load_data(urls)
接下来,将这些文档摄取到向量数据库中。在本教程中,我们将使用 Qdrant 向量存储,如果自托管则是免费的。如果您的机器上安装了 Docker,您可以通过运行官方 Docker 容器来启动 Qdrant 数据库
$ docker pull qdrant/qdrant
$ docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/.qdrant_storage:/qdrant/storage:z \
qdrant/qdrant
容器运行后,您可以创建一个连接到 Qdrant 数据库的索引对象
import qdrant_client
from llama_index.vector_stores.qdrant import QdrantVectorStore
client = qdrant_client.QdrantClient(host="localhost", port=6333)
vector_store = QdrantVectorStore(client=client, collection_name="mlflow_doc")
from llama_index.core import StorageContext, VectorStoreIndex
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents=documents,
storage_context=storage_context
)
当然,您可以在此处使用您首选的向量存储。LlamaIndex 支持多种向量数据库,例如 FAISS、Chroma 和 Databricks Vector Search。如果您选择其他替代方案,请遵循相关的 LlamaIndex 文档并相应地更新 workflow/workflow.py
文件。
除了评估向量搜索检索之外,我们稍后还将评估基于关键词的检索器 (BM25)。让我们设置本地文档存储,以便在工作流中启用 BM25 检索。
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
splitter = SentenceSplitter(chunk_size=512)
nodes = splitter.get_nodes_from_documents(documents)
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes)
bm25_retriever.persist(".bm25_retriever")
6. 定义工作流
现在环境和数据源已准备就绪,我们可以构建工作流并进行实验。完整的工作流代码定义在 workflow
目录中。让我们探讨实现中的一些关键组件。
事件
workflow/events.py
文件定义了工作流中使用的所有事件。这些是简单的 Pydantic 模型,用于在工作流步骤之间传递信息。例如,VectorSearchRetrieveEvent
通过传递用户查询来触发向量搜索步骤。
class VectorSearchRetrieveEvent(Event):
"""Event for triggering VectorStore index retrieval step."""
query: str
提示
在整个工作流执行过程中,我们会多次调用 LLMs。这些 LLM 调用的提示模板定义在 workflow/prompts.py
文件中。
工作流类
主要工作流类定义在 workflow/workflow.py
中。让我们分解一下它的工作原理。
构造函数接受一个 retrievers
参数,该参数指定工作流中使用的检索方法。例如,如果传递 ["vector_search", "bm25"]
,则工作流执行向量搜索和基于关键词的搜索,跳过网络搜索。
💡 动态决定使用哪些检索器,使我们无需复制几乎相同的模型代码,即可尝试不同的检索策略。
class HybridRAGWorkflow(Workflow):
VALID_RETRIEVERS = {"vector_search", "bm25", "web_search"}
def __init__(self, retrievers=None, **kwargs):
super().__init__(**kwargs)
self.llm = Settings.llm
self.retrievers = retrievers or []
if invalid_retrievers := set(self.retrievers) - self.VALID_RETRIEVERS:
raise ValueError(f"Invalid retrievers specified: {invalid_retrievers}")
self._use_vs_retriever = "vector_search" in self.retrievers
self._use_bm25_retriever = "bm25" in self.retrievers
self._use_web_search = "web_search" in self.retrievers
if self._use_vs_retriever:
qd_client = qdrant_client.QdrantClient(host=_QDRANT_HOST, port=_QDRANT_PORT)
vector_store = QdrantVectorStore(client=qd_client, collection_name=_QDRANT_COLLECTION_NAME)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
self.vs_retriever = index.as_retriever()
if self._use_bm25_retriever:
self.bm25_retriever = BM25Retriever.from_persist_dir(_BM25_PERSIST_DIR)
if self._use_web_search:
self.tavily_tool = TavilyToolSpec(api_key=os.environ.get("TAVILY_AI_API_KEY"))
工作流通过执行一个以 StartEvent
作为输入的步骤开始,在本例中是 route_retrieval
步骤。此步骤检查 retrievers
参数并触发必要的检索步骤。通过使用上下文对象的 send_event()
方法,可以从这一个步骤中并行调度多个事件。
# If no retriever is specified, proceed directly to the final query step with an empty context
if len(self.retrievers) == 0:
return QueryEvent(context="")
# Trigger the retrieval steps based on the configuration
if self._use_vs_retriever:
ctx.send_event(VectorSearchRetrieveEvent(query=query))
if self._use_bm25_retriever:
ctx.send_event(BM25RetrieveEvent(query=query))
if self._use_web_search:
ctx.send_event(TransformQueryEvent(query=query))
检索步骤很简单。然而,网络搜索步骤更高级,因为它包含一个额外的步骤,使用 LLM 将用户的问题转换为适合搜索的查询。
所有检索步骤的结果在 gather_retrieval_results
步骤中汇总。在这里,使用 ctx.collect_events()
方法轮询异步执行的步骤的结果。
results = ctx.collect_events(ev, [RetrievalResultEvent] * len(self.retrievers))
传递来自多个检索器的所有结果通常会导致大量上下文,其中包含不相关或重复的内容。为了解决这个问题,我们需要过滤并选择最相关的结果。虽然基于分数的做法很常见,但网络搜索结果不返回相似度分数。因此,我们使用 LLM 对结果进行排序和过滤掉不相关的结果。通过利用与 RankGPT 的内置 reranker 集成,rerank 步骤实现了这一点。
reranker = RankGPTRerank(llm=self.llm, top_n=5)
reranked_nodes = reranker.postprocess_nodes(ev.nodes, query_str=query)
reranked_context = "\n".join(node.text for node in reranked_nodes)
最后,将重新排序后的上下文与用户查询一起传递给 LLM,以生成最终答案。结果以带有 result
键的 StopEvent
形式返回。
@step
async def query_result(self, ctx: Context, ev: QueryEvent) -> StopEvent:
"""Get result with relevant text."""
query = await ctx.get("query")
prompt = FINAL_QUERY_TEMPLATE.format(context=ev.context, query=query)
response = self.llm.complete(prompt).text
return StopEvent(result=response)
现在,让我们实例化工作流并运行它。
# Workflow with VS + BM25 retrieval
from workflow.workflow import HybridRAGWorkflow
workflow = HybridRAGWorkflow(retrievers=["vector_search", "bm25"], timeout=60)
response = await workflow.run(query="Why use MLflow with LlamaIndex?")
print(response)
7. 在 MLflow 实验中记录工作流
现在我们想使用各种不同的检索策略运行工作流并评估每个策略的性能。然而,在运行评估之前,我们将在 MLflow 中记录模型,以在 MLflow 实验 中跟踪模型及其性能。
对于 LlamaIndex Workflow,我们使用新的Model-from-code方法,该方法将模型记录为独立的 Python 脚本。这种方法避免了与 pickle 等序列化方法相关的风险和不稳定性,而是依靠代码作为模型定义的单一事实来源。当与 MLflow 的环境冻结功能结合使用时,它提供了一种可靠的方式来持久化模型。更多详细信息,请参阅MLflow 文档。
💡 在 workflow
目录中,有一个 model.py
脚本,它导入 HybridRAGWorkflow
并使用在记录期间通过 model_config
参数传递的动态配置对其进行实例化。这种设计允许您跟踪具有不同配置的模型,而无需复制模型定义。
我们将启动一个 MLflow Run,并使用 mlflow.llama_index.log_model() API 记录具有不同配置的模型脚本 model.py
。
# Different configurations we will evaluate. We don't run evaluation for all permutation
# for demonstration purpose, but you can add as many patterns as you want.
run_name_to_retrievers = {
# 1. No retrievers (prior knowledge in LLM).
"none": [],
# 2. Vector search retrieval only.
"vs": ["vector_search"],
# 3. Vector search and keyword search (BM25)
"vs + bm25": ["vector_search", "bm25"],
# 4. All retrieval methods including web search.
"vs + bm25 + web": ["vector_search", "bm25", "web_search"],
}
# Create an MLflow Run and log model with each configuration.
models = []
for run_name, retrievers in run_name_to_retrievers.items():
with mlflow.start_run(run_name=run_name):
model_info = mlflow.llama_index.log_model(
# Specify the model Python script.
llama_index_model="workflow/model.py",
# Specify retrievers to use.
model_config={"retrievers": retrievers},
# Define dependency files to save along with the model
code_paths=["workflow"],
# Subdirectory to save artifacts (not important)
artifact_path="model",
)
models.append(model_info)
现在再次打开 MLflow UI,这次应该显示记录了 4 个具有不同 retrievers
参数值的 MLflow Runs。通过点击每个 Run 名称并导航到“Artifacts”选项卡,您可以看到 MLflow 记录了模型和各种元数据,例如依赖版本和设置。
8. 启用 MLflow Tracing
在运行评估之前,还有最后一个步骤:启用 MLflow Tracing。我们稍后将深入探讨此功能及其在此处使用的原因,但现在您可以通过一个简单的单行命令启用它。MLflow 将自动跟踪每次 LlamaIndex 执行。
mlflow.llama_index.autolog()
9. 使用不同的检索器策略评估工作流
示例仓库包含一个示例评估数据集 mlflow_qa_dataset.csv
,其中包含 30 对与 MLflow 相关的问答对。
import pandas as pd
eval_df = pd.read_csv("data/mlflow_qa_dataset.csv")
display(eval_df.head(3))
要评估工作流,请使用 mlflow.evaluate() API,它需要 (1) 您的数据集,(2) 记录的模型,以及 (3) 您想计算的指标。
from mlflow.metrics import latency
from mlflow.metrics.genai import answer_correctness
for model_info in models:
with mlflow.start_run(run_id=model_info.run_id):
result = mlflow.evaluate(
# Pass the URI of the logged model above
model=model_info.model_uri,
data=eval_df,
# Specify the column for ground truth answers.
targets="ground_truth",
# Define the metrics to compute.
extra_metrics=[
latency(),
answer_correctness("openai:/gpt-4o-mini"),
],
# The answer_correctness metric requires "inputs" column to be
# present in the dataset. We have "query" instead so need to
# specify the mapping in `evaluator_config` parameter.
evaluator_config={"col_mapping": {"inputs": "query"}},
)
在本例中,我们使用两个指标评估模型
- 延迟 (Latency):测量执行单个查询工作流所需的时间。
- 答案正确性 (Answer Correctness):根据真实答案评估答案的准确性,由 OpenAI GPT-4o 模型按 1-5 分制评分。
这些指标仅用于演示目的——您可以添加其他指标,如毒性或忠实度,甚至创建自己的指标。有关内置指标的完整列表以及如何定义自定义指标,请参阅MLflow 文档。
评估过程将需要几分钟。完成后,您可以在 MLflow UI 中查看结果。打开实验页面,点击 Run 列表上方的图表图标 📈。
*💡 评估结果可能会因模型设置和一些随机性而有所不同。
第一行显示答案正确性指标的柱状图,而第二行显示延迟结果。表现最佳的组合是“Vector Search + BM25”。有趣的是,添加网络搜索不仅显着增加了延迟,还降低了答案正确性。
为什么会发生这种情况?似乎启用网络搜索的模型提供的一些答案偏离了主题。例如,在回答关于启动模型注册表的问题时,网络搜索模型提供了一个与模型部署不相关的答案,而“vs + bm25”模型提供了正确的回应。
这个错误的答案从何而来?这似乎是一个检索器问题,因为我们只更改了检索策略。然而,很难从最终结果中看出每个检索器返回了什么。要更深入地了解幕后发生的情况,MLflow Tracing 是完美的解决方案。
10. 使用 MLflow Trace 检查质量问题
MLflow Tracing 是一项新功能,为 LLM 应用程序带来了可观测性。它与 LlamaIndex 无缝集成,记录工作流执行期间中间步骤的所有输入、输出和元数据。由于我们在开始时调用了 mlflow.llama_index.autolog(),因此每个 LlamaIndex 操作都已被跟踪并记录在 MLflow 实验中。
要检查评估中特定问题的跟踪,请导航到实验页面上的“Traces”选项卡。查找请求列中包含特定问题且运行名称为“vs + bm25 + web.”的行。点击请求 ID 链接即可打开 Trace UI,在那里您可以查看执行中每个步骤的详细信息,包括输入、输出、元数据和延迟。
在本例中,我们通过检查 reranker 步骤发现了问题。网络搜索检索器返回了与模型服务相关的不相关上下文,并且 reranker 错误地将其排名为最相关的。通过此洞察,我们可以确定潜在的改进措施,例如改进 reranker 以更好地理解 MLflow 主题,提高网络搜索精度,甚至完全移除网络搜索检索器。
结论
在本篇博客中,我们探讨了 LlamaIndex 和 MLflow 的结合如何提升检索增强生成 (RAG) 工作流的开发水平,汇集了强大的模型管理和可观测性功能。通过集成多种检索策略(如向量搜索、BM25 和网络搜索),我们展示了灵活检索如何增强 LLM 驱动应用程序的性能。
- 实验跟踪 (Experiment Tracking) 使我们能够组织和记录不同的工作流配置,确保可重现性并使我们能够跟踪跨多个运行的模型性能。
- MLflow Evaluate 使我们能够轻松记录和评估使用不同检索器策略的工作流,使用延迟和答案正确性等关键指标来比较性能。
- MLflow UI 为我们提供了清晰的可视化界面,展示了各种检索策略如何影响准确性和延迟,帮助我们确定最有效的配置。
- MLflow Tracing 与 LlamaIndex 集成,提供了对工作流每个步骤的详细可观测性,用于诊断质量问题,例如搜索结果的重新排名不正确。
有了这些工具,您就拥有了一个完整的框架,用于构建、记录和优化 RAG 工作流。随着 LLM 技术的不断发展,跟踪、评估和微调模型性能各个方面的能力将至关重要。我们强烈鼓励您进一步实验,看看如何根据您自己的应用程序调整这些工具。
要继续学习,请查阅以下资源
- 了解有关MLflow LlamaIndex 集成的更多信息。
- 在MLflow 中的 LLMs 处发现更多 MLflow LLM 功能。
- 使用MLflow Deployment 将您的工作流部署到服务终结点。
- 查看更多来自 LlamaIndex 的 Workflow 示例。