从零开始使用 LangGraph、LLaMA3 和 Elasticsearch 向量存储构建本地代理的教程
2024年9月5日 | by mebius
作者:来自 ElasticPratik Rana
在本教程中,我们将了解如何使用 LangGraph、LLaMA3 和 Elasticsearch Vector Store 从头开始创建可靠的代理。我们将结合 3 篇高级 RAG 论文中的想法:
- 用于路由的自适应 RAG:根据内容将问题引导到向量存储或网络搜索
- 纠正性 RAG 回退机制:使用该机制,当问题与向量存储不相关时,我们将引入回退检索,改为使用网络搜索。。
- 用于自我校正的自我 RAG:此外,我们还将添加自我校正功能,以检查生成的内容是否存在幻觉或不相关的情况,如果不合适,我们将再次回退到网络搜索。
因此,我们的目标是构建一个复杂的 RAG 流程,并展示其在我们的系统上的可靠性和本地执行。
更多阅读 “Elasticsearch:基于 Langchain 的 Elasticsearch Agent 对文档的搜索”。
背景信息
什么是 LLM 代理?
LLM 驱动的代理可以描述为一个利用大型语言模型 (LLM) 推理问题、制定解决问题的计划并使用一组工具执行这些计划的系统。
本质上,这些代理具有复杂的推理能力、记忆和执行任务的手段。
以 LLM 为核心控制器构建代理是一个令人兴奋的概念。几个概念验证演示,如 AutoGPT、GPT-Engineer 和 BabyAGI,都是鼓舞人心的例子。LLM 的潜力不仅限于生成精心编写的文本、故事、论文和程序;它们可以被视为强大的通用问题解决方案。
代理系统概述
在 LLM 驱动的自主代理系统中,LLM 充当代理的大脑,并由几个关键组件补充:
规划
- 子目标和分解(Subgoal and decomposition):代理将大任务分解为更小、更易于管理的子目标,从而高效处理复杂任务。
-
反思和改进(Reflection and refinement):代理对过去的行为进行自我批评和自我反思,从错误中吸取教训,改进未来的步骤,从而提高最终结果的质量。
记忆
- 短期记忆 (short-term memory – STM):充当代理当前行为和想法的动态存储库,类似于其 “思路 – train of thought”,因为它会努力实时响应用户的查询。它允许代理保持对正在进行的交互的上下文理解,从而实现无缝和连贯的通信。
- 长期记忆 (long-term memory – LTM):充当综合日志,记录代理与用户在数周甚至数月的长期交互。它捕获对话历史记录,保存从过去交流中收集到的宝贵背景和见解。这个积累的知识库增强了代理提供个性化和知情响应的能力,借鉴过去的经验来丰富其与用户的交互。
- 混合记忆:它结合了 STM 和 LTM 的优势,以增强代理的认知能力。STM 确保代理可以快速访问和操作最近的数据,在对话或任务中保持上下文。 LTM 通过存储过去的交互、学习模式和特定领域的信息来扩展代理的知识库,使其能够提供更明智的响应并随着时间的推移做出更好的决策。
工具使用
在 LLM(大型语言模型)代理的上下文中,工具是指代理可以利用来执行特定任务或增强其功能的外部资源、服务或 API(应用程序编程接口)。这些工具充当补充组件,将 LLM 代理的功能扩展到其固有的语言生成功能之外。
工具还可以包括数据库、知识库和外部模型。
例如,代理可以使用 RAG 管道来生成上下文相关的响应、使用代码解释器来应对编程挑战、使用 API 来进行互联网搜索,甚至可以使用简单的 API 服务,例如用于天气更新或即时消息应用程序的服务。
LLM 代理的类型和用例
- 对话代理(Conversational Agents):让用户参与自然语言对话,以提供信息、回答问题并协助完成任务。他们利用 LLM 生成类似人类的响应。
- 任务导向型代理(Task-orientedAgents):通过了解用户需求并执行相关操作,专注于完成特定任务或目标。示例包括虚拟助手和自动化工具。
- 创意代理(Creative Agents):生成原创内容,如艺术品、音乐或写作。他们使用 LLM 来了解人类的偏好和艺术风格,制作吸引观众的内容。
- 协作代理(Collaborative Agents):通过促进沟通和合作与人类合作以实现共同目标。LLM 帮助这些代理协助决策、生成报告和提供见解。
方法:ReAct/Langchain 代理与 LangGraph?
现在,让我们考虑使用代理来构建一个纠正性 RAG(检索增强生成)系统,由上图中可以看到的中间蓝色组件表示。当人们想到代理时,他们经常提到 “ReAct”——一种流行的构建代理框架(不要与 React.js 框架混淆)。ReAct 代理中的典型流程如下所示:
- LLM(语言学习模型)通过选择一个动作、观察结果、反思它,然后选择下一个动作来规划。ReAct 代理通常利用聊天记录或向量存储等记忆,并可以利用各种工具。如果我们要将此流程实现为 ReAct 代理,它将看起来像这样:
- 代理将收到问题并执行操作,例如使用其向量存储来检索相关文档。
- 然后,它将观察检索到的文档并决定对其进行评分。代理将返回其操作阶段并选择评分工具。
- 此过程将循环重复,遵循定义的轨迹,直到任务完成。
这就是基于 ReAct 的代理通常的工作方式。
但是,这种方法可能非常复杂,涉及大量决策。相反,我们将使用不同的方法来实现这个系统。我们不会让代理在循环的每一步都做出决策,而是提前定义一个“控制流 – control flow”。作为工程师,我们可以规划出我们希望代理每次运行时遵循的确切步骤顺序,从而有效地将规划责任从 LLM 身上移开。这种预定义的控制流允许 LLM 专注于每个步骤中的特定任务。
在内存方面,我们可以使用所谓的 “图形状态” 来保存控制流中的信息,使其与 RAG 流程相关(例如,文档和问题)。对于工具的使用,每个图形节点可以使用不同的工具:Vectorstore 检索节点(以灰色表示)将使用检索器工具,Grade Documents 节点(以蓝色表示)将使用分级工具,Web 搜索节点(以红色表示)将使用 Web 搜索工具:
这种方法简化了 LLM 的决策,使得系统更加可靠,尤其是在使用较小的 LLM 时。
先决条件
在深入研究代码之前,我们需要设置必要的工具:
1)Elasticsearch:在本教程中,我们将使用 Elasticsearch 作为我们的数据存储,因为它提供的不仅仅是一个向量数据库,还可以提供卓越的搜索体验。Elasticsearch 提供完整的向量数据库、多种检索方法(文本、稀疏和密集向量、混合)以及选择机器学习模型架构的灵活性。它是世界上下载次数最多的数据库,这是有原因的!要继续,你需要部署一个 Elasticsearch 集群,这可以在 3 分钟内完成,这是我们 14 天免费试用的一部分(无需信用卡)。单击此处开始。
2)Ollama:Ollama 是一个使用开源大型语言模型 (LLM) 简化本地开发的平台。它将运行 LLM 所需的一切(模型权重和配置)打包到单个 Modelfile 中,类似于 Docker 对容器的工作方式。你可以单击此处为你的机器下载 Ollama。
- 安装后,通过运行以下命令进行验证:
% ollama --version
ollama version is 0.3.4
- 接下来,安装 llama3 模型,它将作为本教程的本地 LLM:
% ollama pull llama3
这里需要注意的一件小事是,llama3 带有一种特殊的提示格式,需要注意。
3)Tavily Search:Tavily 的 SearchAPI 是专为 AI 代理 (LLM) 设计的专用搜索引擎,能够以惊人的速度提供实时、准确和真实的结果。要在教程中使用此 API,你需要在 Tavily 平台上注册并获取 API 密钥。好消息是,这个强大的工具是免费使用的。你可以单击此处开始使用。
太棒了!现在你的环境已经准备好了,我们可以继续进行有趣的部分 – 编写我们的 Python 代码!
Python 代码
1)安装所需的软件包:首先,通过运行以下命令安装所有必需的软件包:
%pip install langchain-nomic langchain_community tiktoken langchainhub langchain-elasticsearch langchain langgraph tavily-python gpt4all langchain-text-splitters
2)设置本地 LLM 和 Tavily Search API:安装完成后,将变量 local_llm 设置为 “llama3”。这将定义你在本教程中使用的本地 LLM。如果你想在系统上尝试其他本地 LLM,请稍后随意更改此参数,并在环境变量中定义在先决条件中获得的 Tavily Search API 密钥,如下所示:
%pip install langchain-nomic langchain_community tiktoken langchainhub langchain-elasticsearch langchain langgraph tavily-python gpt4all langchain-text-splitters
1. 索引
首先,我们需要将目标数据加载、处理并索引到我们的向量存储中。在本教程中,我们将索引来自以下相应博客文章的文档:
- “LLM Powered Autonomous Agents | Lil’Log“,
- “Prompt Engineering | Lil’Log“,
- “Adversarial Attacks on LLMs | Lil’Log“,
进入我们的向量存储,然后将其添加为我们的 RAG 实现的数据源,因为索引是我们 RAG 流的关键组件,没有它我们将无法检索文档。
### Index
from langchain_community.document_loaders import WebBaseLoader
from langchain_nomic.embeddings import NomicEmbeddings
from langchain_elasticsearch import ElasticsearchStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)
documents=doc_splits
embeddings=NomicEmbeddings(model="nomic-embed-text-v1.5", inference_mode="local")
db = ElasticsearchStore.from_documents(
documents,
embeddings,
es_url="https://pratikrana23.es.us-central1.gcp.cloud.es.io",
es_user="elastic",
es_password="9Y9Xwz0J65gPCbJeUoSPdzHO",
index_name="rag-elastic",
)
retriever = db.as_retriever()
代码说明:
- 定义了一个 URL 列表,指向 Lilian Weng 网站上的三篇不同的博客文章。
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
- 使用 WebBaseLoader 加载每个 URL 的内容,并将结果存储在文档列表中。
docs = [WebBaseLoader(url).load() for url in urls]
- 加载的文档将存储为列表列表(每个列表包含一个或多个文档)。使用列表推导式将这些列表展平为单个列表。
docs_list = [item for sublist in docs for item in sublist]
- RecursiveCharacterTextSplitter 初始化时指定了块大小(250 个字符),并且不重叠。这用于将文档分割成更小的块。
- 分割后的块存储在 documents 变量中。
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)
- 创建 NomicEmbeddings 实例来为文档块生成嵌入。使用的模型指定为 “nomic-embed-text-v1.5”,并在本地进行推理。
embeddings=NomicEmbeddings(model="nomic-embed-text-v1.5", inference_mode="local")
- 文档及其嵌入都存储在 Elasticsearch 数据库中。提供了连接详细信息(URL、username、password)和索引名称。
db = ElasticsearchStore.from_documents(
documents,
embeddings,
es_url="url",
es_user="username",
es_password="password",
index_name="rag-elastic",
)
- 最后,从 Elasticsearch 数据库创建一个检索器对象,可用于根据文档的嵌入查询和检索文档。
retriever = db.as_retriever()
2. 检索评分器 -Retrieval Grader
一旦我们将各自的文档索引到数据存储中,就需要创建一个评分器来评估我们检索到的文档与给定用户问题的相关性。现在,llama3 就派上用场了,我将 local_llm 设置为 llama3,llama 具有 “json” 模式,这确认 LLM 的输出也是 json,所以我的提示基本上是说对文档进行评分并返回带有分数 yes/no的 json
### Retrieval Grader
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)
prompt = PromptTemplate(
template="""system You are a grader assessing relevance
of a retrieved document to a user question. If the document contains keywords related to the user question,
grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. n
Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
user
Here is the retrieved document: nn {document} nn
Here is the user question: {question} n assistant
""",
input_variables=["question", "document"],
)
retrieval_grader = prompt | llm | JsonOutputParser()
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))
代码说明:
-
Imports:
- langchain_community.chat_models 中的 ChatOllama:该类用于以对话方式与 LLaMA 语言模型交互,允许以结构化格式进行输入和输出。
- langchain_core.output_parsers 中的 JsonOutputParser:该解析器用于从 LLM 的响应中提取 JSON 格式的输出。
- langchain_core.prompts 中的 PromptTemplate:该类用于定义用于生成发送到 LLM 的提示的模板。
-
LLM 初始化:
- ChatOllama 模型使用特定配置进行实例化。该模型设置为以 JSON 格式输出响应,温度为 0,这意味着输出是确定性的(无随机性)。
llm = ChatOllama(model=local_llm, format="json", temperature=0)
-
Prompt Template:
- 定义了一个 PromptTemplate,它设置了将发送给 LLM 的指令。此提示指示 LLM 充当评分者,评估检索到的文档是否与用户的问题相关。
- 评分者的任务很简单:如果文档包含与用户问题相关的关键字,它应该返回一个二进制分数(yes 或 no)来表示相关性。
- 响应应采用 JSON 格式,并带有一个关键分数。
prompt = PromptTemplate(
template="""system You are a grader assessing relevance
of a retrieved document to a user question. If the document contains keywords related to the user question,
grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. n
Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
user
Here is the retrieved document: nn {document} nn
Here is the user question: {question} n assistant
""",
input_variables=["question", "document"],
)
-
Retrieval Grader Pipeline:
- retrieval_grader 是通过将提示、llm 和 JsonOutputParser 链接在一起创建的。这形成了一个管道,其中用户的问题和文档首先由 PromptTemplate 格式化,然后由 LLM 处理,最后由 JsonOutputParser 解析输出。
retrieval_grader = prompt | llm | JsonOutputParser()
- 示例用法:
- 定义一个示例问题(“agent memory”)。
- 使用 retriever.invoke(question) 方法获取与问题相关的文档。
- 提取第二个检索到的文档(docs[1])的内容。
- 然后使用问题和文档作为输入调用 retrieval_grader 管道。输出是 JSON 格式的二进制分数,指示文档是否相关。
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))
3. 生成器
接下来,我们需要编写一个代码,该代码可以使用从检索到的文档中的上下文生成对用户问题的简洁答案。
### Generate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
# Prompt
prompt = PromptTemplate(
template="""system You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise user
Question: {question}
Context: {context}
Answer: assistant""",
input_variables=["question", "document"],
)
llm = ChatOllama(model=local_llm, temperature=0)
# Post-processing
def format_docs(docs):
return "nn".join(doc.page_content for doc in docs)
# Chain
rag_chain = prompt | llm | StrOutputParser()
# Run
question = "agent memory"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)
代码说明:
- prompt:这是一个 PromptTemplate 对象,它定义发送给语言模型 (LLM) 的提示的结构。提示指示 LLM 充当回答问题的助手。向 LLM 提供问题和上下文(检索到的文档),并指示其用三句或更少的句子生成简洁的答案。如果 LLM 不知道答案,则指示它简单地说它不知道。
prompt = PromptTemplate(
template="""...""",
input_variables=["question", "context"],
)
- llm:这将使用温度为 0 的 ChatOllama 模型初始化 LLM,从而确保输出更加确定且随机性更低。
llm = ChatOllama(model=local_llm, temperature=0)
- format_docs(docs):此函数获取文档对象列表并将其内容 (page_content) 连接成单个字符串,每个文档的内容由双换行符 (nn) 分隔。然后,此格式化字符串将用作提示中的上下文。
def format_docs(docs):
retutgcodern "nn".join(doc.page_content for doc in docs)
- rag_chain:这会创建一个处理链,结合 prompt、LLM(llm)和 StrOutputParser。prompt 中填充 question 和 context,发送到 LLM 进行处理,并使用 StrOutputParser 将输出解析为字符串。
rag_chain = prompt | llm | StrOutputParser()
- 运行链:
question = "agent memory"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": format_docs(docs), "question": question})
print(generation)
- question:用户的问题,在本例中为 “agent memory.”。
- docs:使用 retriever.invoke(question) 函数检索的文档列表,该函数检索与问题相关的文档。
- format_docs(docs):将检索到的文档格式化为单个上下文字符串,以双换行符分隔。
- rag_chain.invoke({“context”: format_docs(docs), “question”: question}):此行执行链。它将格式化的上下文和问题传递到 rag_chain,后者通过 LLM 处理输入并返回生成的答案。
- print(generation):将生成的答案输出到控制台。
4. 幻觉评分器和答案评分器
此代码片段定义了两个独立的评分器 – 一个用于评估生成的答案中的幻觉,另一个用于评估答案在解决问题中的有用性。两个评分器都使用语言模型 (LLM) 根据特定标准提供二进制分数(“yes” 或 “no”)
### Hallucination Grader
# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)
# Prompt
prompt = PromptTemplate(
template=""" system You are a grader assessing whether
an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate
whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a
single key 'score' and no preamble or explanation. user
Here are the facts:
n ------- n
{documents}
n ------- n
Here is the answer: {generation} assistant""",
input_variables=["generation", "documents"],
)
hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})
### Answer Grader
# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)
# Prompt
prompt = PromptTemtgcodeplate(
template="""system You are a grader assessing whether an
answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is
useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
user Here is the answer:
n ------- n
{generation}
n ------- n
Here is the question: {question} assistant""",
input_variables=["generation", "question"],
)
answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question, "generation": generation})
幻觉评分器
代码说明:
- LLM 初始化:
- llm:以 JSON 输出格式和温度 0 初始化 ChatOllama 语言模型,使模型的输出具有确定性。
llm = ChatOllama(model=local_llm, format="json", temperature=0)
- Prompt 创建:
- Prompt:创建 PromptTemplate 来定义发送给 LLM 的提示的结构。提示指示 LLM 评估给定的答案(generation)是否基于一组事实(documents)或由一组事实(documents)支持。指示模型以 JSON 格式输出二进制分数(“yes” 或 “no”),表明根据提供的文档,答案是否符合事实。
prompt = PromptTemplate(
template="""...""",
input_variables=["generation", "documents"],
)
- 幻觉评分器设置:
- hallucination_grader:这是结合 prompt、LLM 和 JsonOutputParser 的管道。Prompt 中填充了输入变量(generation 和 documents),由 LLM 处理,输出由 JsonOutputParser 解析为 JSON 格式。
hallucination_grader = prompt | llm | JsonOutputParser()
- 运行幻觉评分器:
- hallucination_grader.invoke(…):通过传入 documents(事实)和 generation(正在评估的答案)来执行幻觉评分器。然后,LLM 评估答案是否基于提供的事实,并以 JSON 格式返回二进制分数。
答案评分器
代码说明:
- LLM 初始化:
- llm:与幻觉分级器类似,这会使用相同的设置初始化 ChatOllama 模型以获得确定性输出。
llm = ChatOllama(model=local_llm, format="json", temperature=0)
- Prompt 创建:
- prompt:创建 PromptTemplate 来评估答案的有用性。此提示指示 LLM 评估给定的答案(generation)是否有用来解决特定问题(question)。同样,LLM 以 JSON 格式输出二进制分数(“yes” 或 “no”),表明答案是否有用。
prompt = PromptTemplate(
template="""...""",
input_variables=["generation", "question"],
)
- 答案评分器设置:
- answer_grader:该管道结合了 prompt、LLM 和 JsonOutputParser,类似于幻觉评分器。
answer_grader = prompt | llm | JsonOutputParser()
- 运行答案评分器:
- answer_grader.invoke(…):通过传入 question 和 generation(正在评估的答案)来执行答案评分器。LLM 评估答案是否有助解决问题,并以 JSON 格式返回二进制分数。
answer_grader.invoke({"question": question, "generation": generation})
5. 路由器 – router
此代码片段定义了一个 “路由器” 系统,旨在确定用户的问题是否应定向到向量库或网络搜索以进行进一步的信息检索。以下是每个部分的详细说明:
### Router
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)
prompt = PromptTemplate(
template="""system You are an expert at routing a
user question to a vectorstore or web search. Use the vectorstore for questions on LLM agents,
prompt engineering, and adversarial attacks. You do not need to be stringent with the keywords
in the question related to these topics. Otherwise, use web-search. Give a binary choice 'web_search'
or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and
no premable or explanation. Question to route: {question} assistant""",
input_variables=["question"],
)
question_router = prompt | llm | JsonOutputParser()
question = "llm agent memory"
docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
print(question_router.invoke({"question": question}))
代码说明:
- LLM 初始化:
- llm:使用 JSON 输出格式和 temperature 0 初始化 ChatOllama 语言模型,确保模型获得确定性(非随机)结果。
llm = ChatOllama(model=local_llm, format="json", temperature=0)
- Prompt 创建:
- Prompt:创建 PromptTemplate 来定义发送给 LLM 的输入提示的结构。此提示指示 LLM 充当专家,将用户问题路由到适当的数据源:向量库或网络搜索。决定基于问题的内容:
- 如果问题与 “LLM agents”、“prompt engineering” 或 “adversarial attacks” 等主题有关,则应将其路由到向量库。
- 否则,应将问题路由到网络搜索。
- Prompt:创建 PromptTemplate 来定义发送给 LLM 的输入提示的结构。此提示指示 LLM 充当专家,将用户问题路由到适当的数据源:向量库或网络搜索。决定基于问题的内容:
prompt = PromptTemplate(
template="""...""",
input_variables=["question"],
)
LLM 被要求返回一个二元选择:“vectorstore” 或 “web_search”。响应应为 JSON 格式,且只有一个键 “datasource”。
- 路由器设置:
- question_router:这是一条结合了 prompt、LLM 和 JsonOutputParser 的处理链。Prompt 中填充 question,由 LLM 处理以做出路由决策,输出由 JsonOutputParser 解析为 JSON 格式。
question_router = prompt | llm | JsonOutputParser()
- 运行路由器:
- question:用户的查询,在本例中为 “llm agent memory”。
- docs:使用 retriever.get_relevant_documents(question) 函数检索的文档列表,该函数获取与问题相关的文档。这部分代码似乎检索文档,但并不直接参与路由决策。
- question_router.invoke({“question”: question}):此行执行路由器。问题被传递给 question_router,后者通过 LLM 对其进行处理并返回一个带有键 “datasource” 的 JSON 对象,该键指示问题是否应路由到 “vectorstore” 或 “web_search”。
- print(question_router.invoke(…)):将路由决策(“vectorstore” 或 “web_search”)输出到控制台。
6. Web 搜索
代码设置了一个 Web 搜索工具,可用于查询 Web 并检索有限数量的搜索结果(在本例中为 3)。这在你想要将外部 Web 搜索功能集成到系统中的场景中非常有用,使其能够从互联网获取信息并使用该信息进行进一步处理或决策。
### Search
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)
代码说明:
- Imports:
- TavilySearchResults:这是从 langchain_community.tools.tavily_search 模块导入的类。它用于执行网络搜索并检索搜索结果。
from langchain_community.tools.tavily_search import TavilySearchResults
- Web 搜索工具初始化:
- web_search_tool:此变量是 TavilySearchResults 类的一个实例。它表示配置为执行网络搜索的工具。
- k=3:此参数指定该工具应返回任何给定查询的前 3 个搜索结果。k 值决定搜索工具获取和处理多少个结果。
web_search_tool = TavilySearchResults(k=3)
7. 控制流程 – control flow
此代码定义了一个有状态的、基于图形的工作流,用于处理用户查询。它检索文档、生成答案、评估相关性并根据当前状态路由流程。该系统高度模块化,允许独立定义和控制流程中的每个步骤,使其灵活且可扩展,适用于涉及文档检索、问答以及确保生成内容的质量和相关性的各种用例。
from pprint import pprint
from typing import List
import time
from langchain_core.documents import Document
from typing_extensions import TypedDict
from langgraph.graph import END, StateGraph
### State
class GraphState(TypedDict):
"""
Represents the state of our graph.
Attributes:
question: question
generation: LLM generation
web_search: whether to add search
documents: list of documents
"""
question: str
generation: str
web_search: str
documents: List[str]
### Nodes
def retrieve(state):
"""
Retrieve documents from vectorstore
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, documents, that contains retrieved documents
"""
print("---RETRIEVE---")
question = state["question"]
# Retrieval
documents = retriever.invoke(question)
return {"documents": documents, "question": question}
def generate(state):
"""
Generate answer using RAG on retrieved documents
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, generation, that contains LLM generation
"""
print("---GENERATE---")
question = state["question"]
documents = state["documents"]
# RAG generation
generation = rag_chain.invoke({"context": documents, "question": question})
return {"documents": documents, "question": question, "generation": generation}
def grade_documents(state):
"""
Determines whether the retrieved documents are relevant to the question
If any document is not relevant, we will set a flag to run web search
Args:
state (dict): The current graph state
Returns:
state (dict): Filtered out irrelevant documents and updated web_search state
"""
print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]
# Score each doc
filtered_docs = []
web_search = "No"
for d in documents:
score = retrieval_grader.invoke(
{"question": question, "document": d.page_content}
)
grade = score["score"]
# Document relevant
if grade.lower() == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
# Document not relevant
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
# We do not include the document in filtered_docs
# We set a flag to indicate that we want to run web search
web_search = "Yes"
continue
return {"documents": filtered_docs, "question": question, "web_search": web_search}
def web_search(state):
"""
Web search based based on the question
Args:
state (dict): The current graph state
Returns:
state (dict): Appended web results to documents
"""
print("---WEB SEARCH---")
question = state["question"]
documents = state["documents"]
# Web search
docs = web_search_tool.invoke({"query": question})
web_results = "n".join([d["content"] for d in docs])
web_results = Document(page_content=web_results)
if documents is not None:
documents.append(web_results)
else:
documents = [web_results]
return {"documents": documents, "question": question}
### Conditional edge
def route_question(state):
"""
Route question to web search or RAG.
Args:
state (dict): The current graph state
Returns:
str: Next node to call
"""
print("---ROUTE QUESTION---")
question = state["question"]
print(question)
source = question_router.invoke({"question": question})
print(source)
print(source["datasource"])
if source["datasource"] == "web_search":
print("---ROUTE QUESTION TO WEB SEARCH---")
return "websearch"
elif source["datasource"] == "vectorstore":
print("---ROUTE QUESTION TO RAG---")
return "vectorstore"
def decide_to_generate(state):
"""
Determines whether to generate an answer, or add web search
Args:
state (dict): The current graph state
Returns:
str: Binary decision for next node to call
"""
print("---ASSESS GRADED DOCUMENTS---")
state["question"]
web_search = state["web_search"]
state["documents"]
if web_search == "Yes":
# All documents have been filtered check_relevance
# We will re-generate a new query
print(
"---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
)
return "websearch"
else:
# We have relevant documents, so generate answer
print("---DECISION: GENERATE---")
return "generate"
### Conditional edge
def grade_generation_v_documents_and_question(state):
"""
Determines whether the generation is grounded in the document and answers question.
Args:
state (dict): The current graph state
Returns:
str: Decision for next node to call
"""
print("---CHECK HALLUCINATIONS---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]
score = hallucination_grader.invoke(
{"documents": documents, "generation": generation}
)
# Debug print to see what `score` contains
# pprint(score)
# Check if 'score' key exists in the score dictionary
if "score" not in score:
print("Error: 'score' key not found in the result")
return "error" # Or handle the error appropriately
grade = score["score"]
# Check hallucination
if grade == "yes":
print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
# Check question-answering
print("---GRADE GENERATION vs QUESTION---")
score = answer_grader.invoke({"question": question, "generation": generation})
# Debug print to see wtgcodehat `score` contains
pprint(score)
if "score" not in score:
print("Error: 'score' key not found in the result")
return "error" # Or handle the error appropriately
grade = score["score"]
if grade == "yes":
print("---DECISION: GENERATION ADDRESSES QUESTION---")
return "useful"
else:
print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
return "not useful"
else:
pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
return "not supported"
workflow = StateGraph(GraphState)
# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae
状态定义 – state definition
class GraphState(TypedDict):
question: str
generation: str
web_search: str
documents: List[str]
- GraphState:定义图形将管理的状态结构的 TypedDict。它包括:
- question:用户的查询。
- generation:LLM 生成的答案。
- web_search:指示是否应添加网络搜索的标志。
- documents:在过程中检索到的文档列表。
节点函数 – node function
以下每个函数代表图中的一个节点,执行工作流中的特定任务。
- retrieve(state)
- 目的:根据用户的问题从向量存储中检索文档。
- 返回:使用检索到的文档更新状态。
- generate(state)
- 目的:使用检索增强生成 (RAG) 模型对检索到的文档生成答案。
- 返回:使用生成的答案更新状态。
def generate(state):
print("---GENERATE---")
question = state["question"]
documents = state["documents"]
generation = rag_chain.invoke({"context": documents, "question": question})
return {"documents": documents, "question": question, "generation": generation}
- grade_documents(state)
- 目的:对每个检索到的文档与问题的相关性进行评分,并过滤掉不相关的文档。如果任何文档不相关,它会设置一个标志来表明需要进行网络搜索。
- 返回:使用过滤后的文档和网络搜索标志更新状态。
def grade_documents(state):
print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]
filtered_docs = []
web_search = "No"
for d in documents:
score = retrieval_grader.invoke({"question": question, "document": d.page_content})
grade = score["score"]
if grade.lower() == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
web_search = "Yes"
continue
return {"documents": filtered_docs, "question": question, "web_search": web_search}
- web_search(state)
- 目的:根据用户的问题进行网络搜索,并将结果附加到文档列表中。
-
返回:使用网络搜索结果更新状态。
def web_search(state):
print("---WEB SEARCH---")
question = state["question"]
documents = state["documents"]
docs = web_search_tool.invoke({"query": question})
web_results = "n".join([d["content"] for d in docs])
web_results = Document(page_content=web_results)
if documents is not None:
documents.append(web_results)
else:
documents = [web_results]
return {"documents": documents, "question": question}
条件边缘 – conditional edges
这些函数根据当前状态确定工作流程中的下一步。
- route_question(state)
- 目的:根据问题内容,将其路由到 Web 搜索或 vectorstore 检索。
- 返回:要执行的下一个节点,即 “websearch” 或 “vectorstore”。
def route_question(state):
print("---ROUTE QUESTION---")
question = state["question"]
source = question_router.invoke({"question": question})
if source["datasource"] == "web_search":
print("---ROUTE QUESTION TO WEB SEARCH---")
return "websearch"
elif source["datasource"] == "vectorstore":
print("---ROUTE QUESTION TO RAG---")
return "vectorstore"
- decide_to_generate(state)
- 目的:根据已评分文档的相关性决定是否生成答案或执行网络搜索。
- 返回:要执行的下一个节点,可以是 “websearch” 或 “generate”。
def decide_to_generate(state):
print("---ASSESS GRADED DOCUMENTS---")
web_search = state["web_search"]
if web_search == "Yes":
print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
return "websearch"
else:
print("---DECISION: GENERATE---")
return "generate"
- grade_generation_v_documents_and_question(state)
- 目的:对生成的答案进行幻觉评分(无论其是否基于提供的文档)并检查答案是否解决了用户的问题。
- 返回:根据答案是否有根据和有用,执行下一个节点。
def grade_generation_v_documents_and_question(state):
print("---CHECK HALLUCINATIONS---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]
score = hallucination_grader.invoke({"documents": documents, "generation": generation})
if "score" not in score:
print("Error: 'score' key not found in the result")
return "error"
grade = score["score"]
if grade == "yes":
print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
score = answer_grader.invoke({"question": question, "generation": generation})
if "score" not in score:
print("Error: 'score' key not found in the result")
return "error"
grade = score["score"]
if grade == "yes":
print("---DECISION: GENERATION ADDRESSES QUESTION---")
return "useful"
else:
print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
return "not useful"
else:
pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
return "not supported"
工作流定义
- StateGraph:初始化将管理状态转换的图表。
- add_node:将节点(functions)添加到图表中,将每个节点与可用于在工作流中调用它的名称相关联。
workflow = StateGraph(GraphState)
workflow.add_node("websearch", web_search)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
8. 构建图表
此代码使用状态图构建有状态工作流的逻辑和流程。它根据每个步骤的条件和结果确定流程应如何从一个节点(操作)移动到下一个节点。
- 工作流首先决定是从向量存储中检索文档还是根据用户的问题执行网络搜索。
- 然后,它评估检索到的文档的相关性,如果文档不相关,则决定是生成答案还是进行进一步的网络搜索。
- 最后,它生成答案并检查答案是否得到良好支持且有用,根据结果重复步骤或结束工作流。
这种结构确保工作流是动态的,能够根据每个阶段的结果进行调整,并最终旨在为用户的问题提供得到良好支持且相关的答案。
# Build graph
workflow.set_conditional_entry_point(
route_question,
{
"websearch": "websearch",
"vectorstore": "retrieve",
},
)
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"websearch": "websearch",
"generate": "generate",
},
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
"generate",
grade_generation_v_documents_and_question,
{
"not supported": "generate",
"useful": END,
"not useful": "websearch",
},
)
代码说明:
- 设置条件入口点
- set_conditional_entry_point:此方法根据条件决策设置工作流的起点。
- route_question:确定问题是否应路由到 Web 搜索或 vectorstore 检索的函数。
- “websearch”:“websearch”:如果 route_question 决定问题应路由到 Web 搜索,则工作流从 Websearch 节点开始。
- “vectorstore”:“retrieve”:如果 route_question 决定问题应路由到 vectorstore,则工作流从检索节点开始。
workflow.set_conditional_entry_point(
route_question,
{
"websearch": "websearch",
"vectorstore": "retrieve",
},
)
- 在节点之间添加边缘(edge)
- add_edge:此方法在工作流中创建从一个节点到另一个节点的直接转换。
- “retrieve”->“grade_documents”:在检索节点中检索到文档后,工作流移动到 grade_documents 节点,在该节点评估检索到的文档的相关性。
workflow.add_edge("retrieve", "grade_documents")
- 添加条件边缘
- add_conditional_edges:此方法根据决策函数的结果在节点之间创建条件转换。
- “grade_documents”:评估检索到的文档的相关性的节点。
- decide_to_generate:根据文档的相关性决定下一步的函数。
- “websearch”:“websearch”:如果 decide_to_generate 确定需要进行网络搜索(因为文档不相关),则工作流将转换到 websearch 节点。
- “generate”:“generate”:如果文档相关,则工作流将转换到生成节点,在该节点使用文档生成答案。
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"websearch": "websearch",
"generate": "generate",
},
)
- 在节点之间添加边缘(edge)
- “websearch” -> “generate”:执行网络搜索后,工作流程转到生成节点,使用网络搜索的结果生成答案。
workflow.add_edge("websearch", "generate")
- 添加条件边缘以进行最终决策
- “generate”:使用 documents(检索或从网络搜索中)生成答案的节点。
- grade_generation_v_documents_and_question:检查生成的答案是否基于文档并与问题相关的函数。
- “notsupported”:“generate”:如果生成的答案没有得到文档的充分支持,工作流将循环回到生成节点以尝试生成更好的答案。
- “useful”:END:如果生成的答案既基于文档又解决了问题,则工作流结束(END)。
- “notuseful”:“websearch”:如果生成的答案基于文档但未充分解决问题,则工作流将转换回 websearch 节点以收集更多信息并重试。
workflow.add_conditional_edges(
"generate",
grade_generation_v_documents_and_question,
{
"not supported": "generate",
"useful": END,
"not useful": "websearch",
},
)
全部完成!!
现在我们的实现已经完成,让我们通过编译和执行整个图表来测试它,好处是这也会打印出我们进行的步骤:
- 测试 1:让我们写一个与我们在数据存储中创建索引的博客文章相关的问题?
from pprint import pprint
# Compile
app = workflow.compile()
inputs = {"question": "What is agent memory?"}
for output in app.stream(inputs):
for key, value in output.items():
pprint(f"Finished running: {key}:")
pprint(value["generation"])
- 测试 2:让我们写另一个与时事相关的问题,即与我们从博客文章中索引的数据完全无关的问题?
from pprint import pprint
# Compile
app = workflow.compile()
inputs = {"question": "Who are the LA Lakers expected to draft first in the NBA draft?"}
for output in app.stream(inputs):
for key, value in output.items():
pprint(f"Finished running: {key}:")
pprint(value["generation"])
你在这两个测试的输出中看到了什么?
对于测试 1
输出显示了工作流程的逐步执行以及每个阶段做出的决策:
---ROUTE QUESTION---
What is agent memory?
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
{'score': 'yes'}
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('Based on the provided context, Agent Memory refers to a long-term memory '
"module that records a comprehensive list of agents' experiences in natural "
'language. This allows the agent to retain and recall information over '
'extended periods, leveraging an external vector store and fast retrieval.')
-
Routing the Question:
-
Output:
---ROUTE QUESTION---
-
Question:
"What is agent memory?"
-
Decision: The workflow determines that the question should be routed to the
vectorstore
based on the question’s content. -
Result:
{'datasource': 'vectorstore'}
and---ROUTE QUESTION TO RAG---
.
-
Output:
-
Retrieving Documents:
-
Output:
---RETRIEVE---
- The workflow retrieves documents related to the question from the vectorstore.
-
Output:
-
Grading Document Relevance:
-
Output:
---CHECK DOCUMENT RELEVANCE TO QUESTION---
- The workflow grades each retrieved document to determine if it is relevant to the question.
-
Results: All retrieved documents are graded as relevant (
---GRADE: DOCUMENT RELEVANT---
repeated four times).
-
Output:
-
Deciding to Generate an Answer:
-
Output:
---ASSESS GRADED DOCUMENTS---
- Since the documents are relevant, the workflow decides to proceed with generating an answer (
---DECISION: GENERATE---
).
-
Output:
-
Generating the Answer:
-
Output:
---GENERATE---
- The workflow generates an answer using the relevant documents.
-
Output:
-
Checking for Hallucinations:
-
Output:
---CHECK HALLUCINATIONS---
- The workflow checks if the generated answer is grounded in the documents.
-
Result: The answer is grounded (
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
).
-
Output:
-
Grading the Answer Against the Question:
-
Output:
---GRADE GENERATION vs QUESTION---
- The workflow evaluates whether the generated answer addresses the question.
-
Result: The answer is useful (
{'score': 'yes'}
and---DECISION: GENERATION ADDRESSES QUESTION---
).
-
Output:
-
Final Output:
-
Output:
'Finished running: generate:'
- Generated Answer:
-
Output:
'Based on the provided context, Agent Memory refers to a long-term memory '
"module that records a comprehensive list of agents' experiences in natural "
'language. This allows the agent to retain and recall information over '
'extended periods, leveraging an external vector store and fast retrieval.'
对于测试 2
此输出遵循与上一个示例相同的工作流程,但涉及与 NBA 选秀和洛杉矶湖人队相关的不同问题。以下是此运行期间发生的情况的细分:
---ROUTE QUESTION---
Who are the LA Lakers expected to draft first in the NBA draft?
{'datasource': 'web_search'}
web_search
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---
'Finished running: websearch:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
{'score': 'yes'}
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('According to the provided context, the LA Lakers are expected to draft '
'Dalton Knecht at number 17 overall in the first round of the NBA draft.')
-
Routing the Question:
-
Output:
---ROUTE QUESTION---
-
Question:
"Who are the LA Lakers expected to draft first in the NBA draft?"
-
Decision: The workflow determines that the question should be routed to a web search (
'datasource': 'web_search'
), as it likely requires up-to-date information that isn’t stored in the vectorstore. -
Result:
web_search
and---ROUTE QUESTION TO WEB SEARCH---
.
-
Output:
-
Web Search:
-
Output:
---WEB SEARCH---
- The workflow performs a web search to gather the most current and relevant information regarding the Laker’s draft picks.
-
Result:
'Finished running: websearch:'
indicates that the web search step is complete.
-
Output:
-
Generating the Answer:
-
Output:
---GENERATE---
- Using the information retrieved from the web search, the workflow generates an answer to the question.
-
Output:
-
Checking for Hallucinations:
-
Output:
---CHECK HALLUCINATIONS---
- The workflow checks if the generated answer is grounded in the retrieved web search documents.
-
Result: The answer is well-supported (
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
).
-
Output:
-
Grading the Answer Against the Question:
-
Output:
---GRADE GENERATION vs QUESTION---
- The workflow evaluates whether the generated answer directly addresses the question.
-
Result: The answer is useful and relevant (
{'score': 'yes'}
and---DECISION: GENERATION ADDRESSES QUESTION---
).
-
Output:
-
Final Output:
-
Output:
'Finished running: generate:'
- Generated Answer:
-
Output:
'According to the provided context, the LA Lakers are expected to draft '
'Dalton Knecht at number 17 overall in the first round of the NBA draft.'
测试 1 与测试 2 的工作流程要点
- 路由到网络搜索:工作流程正确识别出问题需要最新信息,因此它将查询定向到网络搜索而不是向量存储。
- 答案生成:工作流程成功地利用了来自网络的最新信息,生成了有关湖人队预期选秀权的连贯且相关的回答。
- 有根据且有用的答案:工作流程验证了生成的答案既基于搜索结果,又直接解决了问题。
结论
在相对较短的时间内,我们成功构建了一个复杂的检索增强生成 (RAG) 工作流,其中包括路由、检索、评分和各种决策点,例如回退到网络搜索和对生成内容进行双标准评分。特别令人印象深刻的是,这个复杂的 RAG 流程结合了多篇研究论文中的概念,可以在本地机器上可靠地运行。实现这一点的关键在于定义明确的控制流,这确保了本地代理平稳有效地运行。
我们鼓励你尝试不同的查询和实现,因为这种方法为创建更高级的 RAG 代理提供了强大的基础。希望这可以作为开发你自己的 RAG 工作流的有用指南。
准备好自己尝试一下了吗?开始免费试用。
Elasticsearch 集成了 LangChain、Cohere 等工具。加入我们的 Beyond RAG Basics 网络研讨会,构建你的下一个 GenAI 应用程序!
原文:Llama3, LangGraph and Elasticsearch: Build a local agent for vector search — Search Labs
文章来源于互联网:从零开始使用 LangGraph、LLaMA3 和 Elasticsearch 向量存储构建本地代理的教程
相关推荐: Elasticsearch:使用 semantic_text 简化语义搜索
作者:来自 ElasticCarlos Delgado, Mike Pellegrini semantic_text – 你知道,用于语义搜索! 你是否想开始使用语义搜索来搜索数据,但专注于模型和结果而不是技术细节?我们引入了 semantic_text 字段…