用于检索评估的问题生成
MLflow 提供了一个先进的框架,用于构建检索增强生成 (RAG) 模型。RAG 是一种前沿方法,它结合了检索模型(根据用户的问题选择和排序文档相关片段的模型)和生成模型的优势。它有效地融合了搜索和文本生成的能力,以提供与上下文相关且连贯的响应,使得生成的文本可以引用现有文档。RAG 利用检索器查找上下文文档,这种新颖的方法彻底改变了各种 NLP 任务。
自然而然,我们希望能够评估 RAG 模型的检索系统,以比较和判断其性能。为了评估检索系统,我们首先需要一个基于文档的问题测试集。这些问题需要多样化、相关且连贯。手动生成问题可能具有挑战性,因为它首先需要您理解文档,并花费大量时间来构思问题。
我们希望通过利用大型语言模型 (LLM) 为此测试集生成问题来简化此过程。本教程将介绍如何生成问题以及如何分析问题的多样性和相关性。
步骤 1:安装和加载包
我们还定义了一些实用函数来缓存 LLM 的响应,以节省成本。您可以跳过阅读下一个单元格中的实现细节。
%pip install beautifulsoup4 langchain openai pandas seaborn scikit-learn
import json
import os
# For cost-saving, create a cache for the LLM responses
import threading
# For data analysis and visualization
import matplotlib.pyplot as plt
import numpy as np
import openai
import pandas as pd
# For scraping
import requests
import seaborn as sns
from bs4 import BeautifulSoup
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
class Cache:
def __init__(self, persist_path, cache_loading_fn):
"""
The cache_loading_fn should be a function that takes arbitrary
serializable arguments and returns a serilaizable value.
value = cache_loading_fn(**kwargs)
For example, for openai.chat.completions.create(...), the
cache_loading_fn should be:
def cache_loading_fn(**kwargs):
result = openai.chat.completions.create(**kwargs)
return result.to_dict_recursive()
"""
self._cache = self._get_or_create_cache_dict(persist_path)
self._persist_path = persist_path
self._cache_loading_fn = cache_loading_fn
self._cache_lock = threading.Lock()
@classmethod
def _get_or_create_cache_dict(cls, persist_path):
if os.path.exists(persist_path):
# File exists, load it as a JSON string into a dict
with open(persist_path) as f:
cache = json.load(f)
else:
# File does not exist, create an empty dict
cache = {}
return cache
def _save_to_file(self):
with open(self._persist_path, "w") as file:
json.dump(self._cache, file)
def _update_cache(self, key, value):
with self._cache_lock:
self._cache[key] = value
self._save_to_file()
def get_from_cache_or_load_cache(self, **kwargs):
key = json.dumps(kwargs)
with self._cache_lock:
value = self._cache.get(key, None)
if value is None:
value = self._cache_loading_fn(**kwargs)
self._update_cache(key, value)
else:
print("Loaded from cache")
return value
def chat_completion_create_fn(**kwargs):
result = openai.chat.completions.create(**kwargs)
return result.to_dict_recursive()
def cached_openai_ChatCompletion_create(**kwargs):
cache = kwargs.pop("cache")
return cache.get_from_cache_or_load_cache(**kwargs)
def embeddings_embed_documents_fn(**kwargs):
chunk = kwargs.get("chunk")
return embeddings.embed_documents([chunk])
def cached_langchain_openai_embeddings(**kwargs):
cache = kwargs.pop("cache")
return cache.get_from_cache_or_load_cache(**kwargs)
步骤 2:设置 OpenAI 密钥
问题生成系统可以使用任何 LLM 完成。我们选择在此使用 OpenAI,因此需要其 API 密钥。
openai.api_key = "<redacted>"
os.environ["OPENAI_API_KEY"] = openai.api_key
# Other configurations
# Choose a seed for reproducible results
SEED = 2023
# For cost-saving purposes, choose a path to persist the responses for LLM calls
CACHE_PATH = "_cache.json"
EMBEDDINGS_CACHE_PATH = "_embeddings_cache.json"
# To avoid re-running the scraping process, choose a path to save the scrapped docs
SCRAPPED_DATA_PATH = "mlflow_docs_scraped.csv"
# Choose a path to save the generated dataset
OUTPUT_DF_PATH = "question_answer_source.csv"
cache = Cache(CACHE_PATH, chat_completion_create_fn)
embeddings_cache = Cache(EMBEDDINGS_CACHE_PATH, embeddings_embed_documents_fn)
步骤 3:确定块大小
CHUNK_SIZE = 1500
步骤 4:准备文档数据
从 MLflow 网站抓取文档
page = requests.get("https://mlflow.org.cn/docs/latest/index.html")
soup = BeautifulSoup(page.content, "html.parser")
mainLocation = "https://mlflow.org.cn/docs/latest/"
header = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11",
"Accept-Language": "en-US,en;q=0.8",
"Connection": "keep-alive",
}
data = []
for a_link in soup.find_all("a"):
document_url = mainLocation + a_link["href"]
page = requests.get(document_url, headers=header)
soup = BeautifulSoup(page.content, "html.parser")
file_to_store = a_link.get("href")
if soup.find("div", {"class": "rst-content"}):
data.append(
[
file_to_store,
soup.find("div", {"class": "rst-content"}).text.replace("
", " "),
]
)
df = pd.DataFrame(data, columns=["source", "text"])
df.to_csv(SCRAPPED_DATA_PATH, index=False)
df = pd.read_csv(SCRAPPED_DATA_PATH)
选择文档的子集并将其分割成块
# For demonstration purposes, let's pick 5 popular MLflow documantation pages from the dataset
mask = df["source"].isin(
{
"tracking.html",
"models.html",
"model-registry.html",
"search-runs.html",
"projects.html",
}
)
sub_df = df[mask]
# Split documents into chunks
text_splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, separator=" ")
def get_chunks(input_row):
new_rows = []
chunks = text_splitter.split_text(input_row["text"])
for i, chunk in enumerate(chunks):
new_rows.append({"chunk": chunk, "source": input_row["source"], "chunk_index": i})
return new_rows
expanded_df = pd.DataFrame(columns=["chunk", "source", "chunk_index"])
for index, row in sub_df.iterrows():
new_rows = get_chunks(row)
expanded_df = pd.concat([expanded_df, pd.DataFrame(new_rows)], ignore_index=True)
expanded_df.head(3)
块 | 来源 | 块索引 | |
---|---|---|---|
0 | 文档 MLflow Tracking MLflow Tracking ... | tracking.html | 0 |
1 | 标签 概念 MLflow Tracking 是围绕... | tracking.html | 1 |
2 | 运行到实验,这些实验将运行组合在一起... | tracking.html | 2 |
# For cost-saving purposes, let's pick the first 3 chunks from each doc
# To generate questions with more chunks, change the start index and end index in iloc[]
start, end = 0, 3
filtered_df = (
expanded_df.groupby("source").apply(lambda x: x.iloc[start:end]).reset_index(drop=True)
)
filtered_df.head(3)
块 | 来源 | 块索引 | |
---|---|---|---|
0 | 文档 MLflow Model Registry MLflow 模型... | model-registry.html | 0 |
1 | 记录后,该模型可以注册到... | model-registry.html | 1 |
2 | 与已注册模型和模型版本关联... | model-registry.html | 2 |
filtered_df["chunk"][0]
'Documentation MLflow Model Registry MLflow Model Registry The MLflow Model Registry component is a centralized model store, set of APIs, and UI, to collaboratively manage the full lifecycle of an MLflow Model. It provides model lineage (which MLflow experiment and run produced the model), model versioning, model aliasing, model tagging, and annotations. Table of Contents Concepts Model Registry Workflows UI Workflow Register a Model Find Registered Models Deploy and Organize Models API Workflow Adding an MLflow Model to the Model Registry Deploy and Organize Models with Aliases and Tags Fetching an MLflow Model from the Model Registry Serving an MLflow Model from Model Registry Promoting an MLflow Model across environments Adding or Updating an MLflow Model Descriptions Renaming an MLflow Model Listing and Searching MLflow Models Deleting MLflow Models Registering a Model Saved Outside MLflow Registering an Unsupported Machine Learning Model Transitioning an MLflow Model’s Stage Archiving an MLflow Model Concepts The Model Registry introduces a few concepts that describe and facilitate the full lifecycle of an MLflow Model. ModelAn MLflow Model is created from an experiment or run that is logged with one of the model flavor’s mlflow.<model_flavor>.log_model() methods. Once logged, this model can then be registered with the Model Registry. Registered ModelAn MLflow Model can be registered with the Model Registry. A registered model has a unique name, contains versions,'
步骤 5:生成问题
下面的提示指示 LLM 为每个给定的块生成一个问题,并生成问题的答案,以便于人工验证。此外,以结构化格式返回结果。
此示例使用 OpenAI 的 gpt-4o-mini 模型生成问题,您可以将其替换为最适合您用例的 LLM。
def get_raw_response(content):
prompt = f"""Please generate a question asking for the key information in the given paragraph.
Also answer the questions using the information in the given paragraph.
Please ask the specific question instead of the general question, like
'What is the key information in the given paragraph?'.
Please generate the answer using as much information as possible.
If you are unable to answer it, please generate the answer as 'I don't know.'
The answer should be informative and should be more than 3 sentences.
Paragraph: {content}
Please call the submit_function function to submit the generated question and answer.
"""
messages = [{"role": "user", "content": prompt}]
submit_function = {
"name": "submit_function",
"description": "Call this function to submit the generated question and answer.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question asking for the key information in the given paragraph.",
},
"answer": {
"type": "string",
"description": "The answer to the question using the information in the given paragraph.",
},
},
"required": ["question", "answer"],
},
}
return cached_openai_ChatCompletion_create(
messages=messages,
model="gpt-4o-mini",
functions=[submit_function],
function_call="auto",
temperature=0.0,
seed=SEED,
cache=cache,
)
def generate_question_answer(content):
if content is None or len(content) == 0:
return "", "N/A"
response = get_raw_response(content)
try:
func_args = json.loads(response["choices"][0]["message"]["function_call"]["arguments"])
question = func_args["question"]
answer = func_args["answer"]
return question, answer
except Exception as e:
return str(e), "N/A"
queries = []
get_raw_response(filtered_df["chunk"][0])
{'id': 'chatcmpl-8NPsIJQZYDP4aqiWEUlUyLakv3lyR', 'object': 'chat.completion', 'created': 1700591698, 'model': 'gpt-3.5-turbo-0613', 'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': None, 'function_call': {'name': 'submit_function', 'arguments': '{ "question": "What is the purpose of the MLflow Model Registry?", "answer": "The purpose of the MLflow Model Registry is to provide a centralized model store, set of APIs, and UI to collaboratively manage the full lifecycle of an MLflow Model. It allows for model lineage, versioning, aliasing, tagging, and annotations." }'}}, 'finish_reason': 'function_call'}], 'usage': {'prompt_tokens': 490, 'completion_tokens': 81, 'total_tokens': 571}}
# The requests sometimes get ratelimited, you can re-execute this cell without losing the existing results.
n = len(filtered_df)
for i, row in filtered_df.iterrows():
chunk = row["chunk"]
question, answer = generate_question_answer(chunk)
print(f"{i + 1}/{n}: {question}")
queries.append(
{
"question": question,
"answer": answer,
"chunk": chunk,
"chunk_id": row["chunk_index"],
"source": row["source"],
}
)
Loaded from cache 1/15: What is the purpose of the MLflow Model Registry? Loaded from cache 2/15: What are the key features of a registered model in the Model Registry? Loaded from cache 3/15: What can you do with tags in MLflow? Loaded from cache 4/15: What is the purpose of an MLflow Model? Loaded from cache 5/15: What are the flavors defined in the MLmodel file for the mlflow.sklearn library? Loaded from cache 6/15: What are the fields that can be contained in the MLmodel YAML format? Loaded from cache 7/15: What is an MLflow Project? Loaded from cache 8/15: What can you specify for the entry points in a MLproject file? Loaded from cache 9/15: What are the project environments supported by MLflow? Loaded from cache 10/15: What does the MLflow UI and API support in terms of searching runs? Loaded from cache 11/15: What are the key information in the given paragraph? 12/15: What are some examples of entity names that contain special characters? 13/15: What is the purpose of MLflow Tracking? 14/15: What information does each run record in MLflow Tracking? 15/15: How can you create an experiment in MLflow?
有时,LLM 可能无法生成问题。我们可以检查上面的数据,看看是否存在任何错误。如果存在,请删除错误记录。
result_df = pd.DataFrame(queries)
result_df = result_df[result_df["answer"] != "N/A"]
def add_to_output_df(result_df=pd.DataFrame({})):
"""
This function adds the records in result_df to the existing records saved at OUTPUT_DF_PATH,
remove the duplicate rows and save the new collection of records back to OUTPUT_DF_PATH.
"""
if os.path.exists(OUTPUT_DF_PATH):
all_result_df = pd.read_csv(OUTPUT_DF_PATH)
else:
all_result_df = pd.DataFrame({})
all_result_df = (
pd.concat([all_result_df, result_df], ignore_index=True)
.drop_duplicates()
.sort_values(by=["source", "chunk_id"])
.reset_index(drop=True)
)
all_result_df.to_csv(OUTPUT_DF_PATH, index=False)
return all_result_df
all_result_df = add_to_output_df(result_df)
all_result_df.head(3)
问题 | 答案 | 块 | 块 ID | 来源 | |
---|---|---|---|---|---|
0 | MLflow 模型注册表的作用是什么? | MLflow 模型注册表的作用是... | 文档 MLflow Model Registry MLflow 模型... | 0 | model-registry.html |
1 | 将模型注册到... 的目的是什么? | 将模型注册到模型的目的是... | 记录后,该模型可以注册到... | 1 | model-registry.html |
2 | 您可以使用已注册模型和模型... 做些什么? | 使用已注册模型和模型版本,您可以... | 与已注册模型和模型版本关联... | 2 | model-registry.html |
生成问题质量分析(可选)
如果您想比较不同提示生成的问题质量,我们可以手动和汇总分析问题质量。我们希望从两个维度评估问题:多样性和相关性。
https://github.com/mlflow/mlflow/blob/master/examples/llms/question_generation/question_answer_source.csv 是一个预生成的数据集,包含 56 个问题。您可以下载它并通过 OUTPUT_DF_PATH
指定路径,然后加载它来运行 Notebook 的其余部分,如果您想直接跳到此部分的话。
注意:目前没有明确的方法来分析生成问题的质量,因此这只是您可以用来了解生成问题多样性和相关性的其中一种方法。
all_result_df = add_to_output_df()
评估问题的多样性
问题的多样性很重要,因为我们希望问题能够覆盖文档的大部分内容。此外,我们希望能够通过不同形式的提问来评估检索器。我们希望能够有更难和更容易的问题。所有这些都不容易直接分析,因此我们决定通过问题长度和潜在空间嵌入来分析它。
长度
长度可以反映问题多样性的程度。有些问题可能冗长,而另一些则直奔主题。它还可以帮助我们识别生成问题的问题。
# Length
questions = all_result_df["question"].to_list()
question_len = pd.DataFrame([len(q) for q in questions], columns=["length"])
question_len.hist(bins=5)
plt.title("Histogram of Question Lengths")
plt.xlabel("Question Length")
plt.ylabel("Frequency")
plt.show()
除了可视化表示之外,我们还想查看更具体的百分位数。
# Calculating percentile values
p10 = int(question_len["length"].quantile(0.10))
p90 = int(question_len["length"].quantile(0.90))
print("p10-p90 range is", p90 - p10)
p10-p90 range is 46
也有一些查询比正常长度要长。然而,这些看起来没问题。
[q for q in questions if len(q) > 100]
['What is a common configuration for lowering the total memory pressure for pytorch models within transformers pipelines?', 'How can you get all active runs from experiments IDs 3, 4, and 17 that used a CNN model with 10 layers and had a prediction accuracy of 94.5% or higher?', 'What interfaces does the MLflow client use to record MLflow entities and artifacts when running MLflow on a local machine with a SQLAlchemy-compatible database?']
潜在空间
潜在空间嵌入包含关于问题的语义信息。这可以用来评估两个问题的多样性以及它们在语义上的差异。为此,我们需要将高维空间映射到低维空间。我们利用 PCA 和 TSNE 将嵌入映射到二维空间进行可视化。
我们附加了 5 个基准查询,以帮助可视化问题有多么多样化。这 5 个问题中的前四个在语义上相似,都与 MLflow 相关,而最后一个则不同,与 RAG 相关。
benchmark_questions = [
"What is MLflow?",
"What is MLflow about?",
"What is MLflow Tracking?",
"What is MLflow Evaluation?",
"Why is RAG so popular?",
]
questions_to_embed = questions + benchmark_questions
我们先应用 PCA 将嵌入维度降至 10,然后再应用 TSNE 将其降至 2 维,这是由于 TSNE 的计算复杂性而由 sklearn 推荐的方法。
# Apply embeddings
embeddings = OpenAIEmbeddings()
question_embeddings = embeddings.embed_documents(questions_to_embed)
# PCA on embeddings to reduce to 10-dim
pca = PCA(n_components=10)
question_embeddings_reduced = pca.fit_transform(question_embeddings)
# TSNE on embeddings to reduce to 2-dim
tsne = TSNE(n_components=2, random_state=SEED)
lower_dim_embeddings = tsne.fit_transform(question_embeddings_reduced)
现在我们有了代表问题语义的二维嵌入,我们可以使用散点图进行可视化,区分生成的问题和基准问题。
labels = np.concatenate(
[
np.full(len(lower_dim_embeddings) - len(benchmark_questions), "generated"),
np.full(len(benchmark_questions), "benchmark"),
]
)
data = pd.DataFrame(
{"x": lower_dim_embeddings[:, 0], "y": lower_dim_embeddings[:, 1], "label": labels}
)
sns.scatterplot(data=data, x="x", y="y", hue="label")
观察散点图上的橙色点,其中有一个点比其他点更远。那是关于 RAG 的独特基准问题。此图表显示了生成问题的多样性程度。
评估文档相关性
另一个需要考虑的重要维度是问题与我们提供的文档的相关性。我们想了解 LLM 生成的问题是否确实指代我们提供的文本,还是它在虚构不相关的问题。我们将通过首先手动对照文档块检查某些问题来评估相关性。然后,我们定义一个相关性度量标准来进行定量分析。
手动检查文档相关性
手动定性检查问题是否与文档相关。
all_result_df.sample(3)
问题 | 答案 | 块 | 块 ID | 来源 | |
---|---|---|---|---|---|
27 | 什么是 MLflow 项目? | MLflow Project 是一种用于打包数据的格式... | 文档 MLflow 项目 MLflow 项目... | 0 | projects.html |
54 | autologging 在...时捕获哪些信息? | Autologging 捕获以下信息... | 启动需要...的短期 MLflow 运行时 | 21 | tracking.html |
38 | 使用...搜索运行的语法是什么? | 使用 MLflow...搜索运行的语法是... | 文档 搜索运行 搜索运行 MLfl... | 0 | search-runs.html |
嵌入余弦相似度
块和查询的嵌入被放置在同一个潜在空间中,检索器模型会提取与查询嵌入相似的块嵌入。因此,对于检索器来说,相关性是由此潜在空间中嵌入的距离定义的。
余弦相似度是衡量向量相似度的一种度量,可用于确定块和查询之间嵌入的距离。它是一种距离度量,当问题和块相似时接近 1,当它们不同时变为 0。
我们可以直接使用余弦相似度分数来衡量相关性。
embedded_queries = all_result_df.copy()
embedded_queries["chunk_emb"] = all_result_df["chunk"].apply(
lambda x: np.squeeze(cached_langchain_openai_embeddings(chunk=x, cache=embeddings_cache))
)
embedded_queries["question_emb"] = all_result_df["question"].apply(
lambda x: np.squeeze(cached_langchain_openai_embeddings(chunk=x, cache=embeddings_cache))
)
def cossim(x, y):
return np.dot(x, y) / (np.linalg.norm(x) * np.linalg.norm(y))
embedded_queries["cossim"] = embedded_queries.apply(
lambda row: cossim(row["question_emb"], row["chunk_emb"]), axis=1
)
在我们根据相对相关性对每个问题评分后,我们可以整体评估生成的问题。
scores = embedded_queries["cossim"].to_list()
plt.hist(scores, bins=5)
(array([ 1., 8., 15., 20., 12.]), array([0.72730601, 0.76292693, 0.79854785, 0.83416876, 0.86978968, 0.9054106 ]), <BarContainer object of 5 artists>)
有一些较低的分数。我们来看看这些分数。
mask = embedded_queries["cossim"] < 0.75
lower_cossim = embedded_queries[mask]
for i, row in lower_cossim.iterrows():
print(f"Question: {i}")
print(row["question"])
print("Chunk:")
print(row["chunk"])
print("cossim:")
print(row["cossim"])
Question: 45 What is the purpose of the 'experimentIds' variable in the given paragraph? Chunk: API. List<Long> experimentIds = Arrays.asList("1", "2", "4", "8"); List<RunInfo> searchResult = client.searchRuns(experimentIds, "metrics.accuracy_score < 99.90"); Previous Next © MLflow Project, a Series of LF Projects, LLC. All rights reserved. cossim: 0.7273060141018568
人工检查这些相关性较低的问题发现,有些块信息量较少或主要由代码组成,因此生成的问题可能不太有用。您可以根据需要选择过滤掉这些问题。