微信扫码
添加专属顾问
我要投稿
在信息爆炸的今天,如何快速准确地获取知识?本文介绍了一款基于LangGraph的RAG多智能体工具,它能够高效处理复杂问题,整合多源信息,并通过迭代步骤得出精准答案。核心内容:1. 传统问答系统的局限性与多智能体RAG的优势2. 多智能体RAG的功能:路由和工具使用、规划子步骤、反思和错误纠正、共享全局状态3. 系统架构与文档处理,构建智能问答的“大脑”
在当今信息爆炸的时代,快速准确地获取知识变得尤为重要。传统的问答系统虽然能够处理一些简单问题,但在面对复杂问题时往往显得力不从心。为了解决这一痛点,我们开发了一款基于 LangGraph 的 RAG 多智能体工具,它能够高效地处理复杂问题,整合多源信息,并通过迭代步骤得出精准答案。今天,就让我们深入了解一下这个强大的工具。
在项目开发初期,我们发现传统的“简单 RAG”方法存在诸多不足。简单 RAG 无法拆解复杂问题,只能在单一层面处理查询,无法深入分析每个步骤并得出统一结论;它缺乏对幻觉(即模型生成错误信息)或错误处理的能力,无法通过验证步骤纠正错误;此外,简单 RAG 系统也无法根据工作流条件动态使用工具、调用外部 API 或与数据库交互。
为了解决这些问题,我们引入了多智能体 RAG 研究系统。基于智能体的框架能够实现以下功能:
我们的系统包含两个核心部分:研究者子图和主图。研究者子图负责生成用于检索和重排向量数据库中 top-k 文档的不同查询;主图则包含主要工作流程,例如分析用户查询、生成完成任务所需的步骤、生成回应,并通过人工参与机制检查幻觉。
对于结构复杂的 PDF 文档,尤其是包含复杂布局的表格,选择合适的解析工具至关重要。许多库在处理复杂页面布局或表格结构的 PDF 时精度不足。为此,我们采用了 Docling 这一开源库,它能够高效地解析文档,并将内容导出为所需格式。Docling 支持从 PDF、DOCX、PPTX、XLSX、图片、HTML、AsciiDoc 和 Markdown 等多种常用文档格式读取和导出 Markdown 和 JSON 格式。它对 PDF 文档有全面的理解,包括表格结构、阅读顺序和页面布局,还支持对扫描 PDF 的 OCR 功能。
以下是使用 Docling 将 PDF 转换为 Markdown 格式的代码示例:
from docling.document_converter import DocumentConverter
logger.info("Starting document processing.")
converter = DocumentConverter()
markdown_document = converter.convert(source).document.export_to_markdown()
我们使用 Chroma 构建向量数据库,将句子存储为向量嵌入,并在数据库中进行搜索。我们将持久化数据库存储在本地目录 “db_vector” 中。通过 OpenAI 的嵌入模型,我们将文档列表转换为向量,并存储在 Chroma 中。
以下是构建向量数据库的代码:
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embd = OpenAIEmbeddings()
vectorstore_from_documents = Chroma.from_documents(
documents=docs_list,
collection_name="rag-chroma-google-v1",
embedding=embd,
persist_directory='db_vector'
)
LangGraph 的核心概念之一是状态。每个图执行都会创建一个状态,该状态在图的节点执行时传递,并在每个节点执行后更新内部状态。
我们定义了两个类:Router 和 GradeHallucinations,分别用于存储用户查询的分类结果和回应中幻觉的存在与否。基于这些状态,我们构建了输入状态(InputState)和智能体状态(AgentState),其中 AgentState 包含用户查询的分类、研究计划的步骤列表、智能体可以引用的检索文档列表,以及幻觉的二进制评分。
以下是状态类的定义代码:
from pydantic import BaseModel, Field
from typing import Literal, TypedDict
class Router(TypedDict):
"""Classify user query."""
logic: str
type: Literal["more-info", "environmental", "general"]
class GradeHallucinations(BaseModel):
"""Binary score for hallucination present in generation answer."""
binary_score: str = Field(description="Answer is grounded in the facts, '1' or '0'")
这一步会更新智能体状态中的 Router 对象,其类型变量包含 “more-info”、“environmental” 或 “general” 中的一个值。根据这个信息,工作流将被路由到合适的节点,例如 “create_research_plan”、“ask_for_more_info” 或 “respond_to_general_query”。
以下是实现代码:
async def analyze_and_route_query(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Router]:
"""Analyze the user's query and determine the appropriate routing."""
model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": ROUTER_SYSTEM_PROMPT}
] + state.messages
logging.info("---ANALYZE AND ROUTE QUERY---")
response = cast(
Router, await model.with_structured_output(Router).ainvoke(messages)
)
return {"router": response}
如果查询分类返回 “environmental”,用户的请求与文档相关,工作流将到达 “create_research_plan” 节点,该节点的功能是为回答与环境相关的查询创建逐步研究计划。
以下是实现代码:
async def create_research_plan(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:
"""Create a step-by-step research plan for answering an environmental-related query."""
class Plan(TypedDict):
"""Generate research plan."""
steps: list[str]
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": RESEARCH_PLAN_SYSTEM_PROMPT}
] + state.messages
logging.info("---PLAN GENERATION---")
response = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))
return {"steps": response["steps"], "documents": "delete"}
这一步会从研究计划中取出第一个步骤,并调用研究者子图来执行研究。研究者子图会返回一系列文档片段,我们将在后续步骤中进一步处理。
以下是实现代码:
async def conduct_research(state: AgentState) -> dict[str, Any]:
"""Execute the first step of the research plan."""
result = await researcher_graph.ainvoke({"question": state.steps[0]}) # graph call directly
docs = result["documents"]
step = state.steps[0]
logging.info(f"\n{len(docs)} documents retrieved in total for the step: {step}.")
return {"documents": result["documents"], "steps": state.steps[1:]}
研究者子图包含查询生成和文档检索两个关键步骤。查询生成步骤会根据研究计划中的问题生成多个搜索查询,以帮助回答问题。文档检索步骤则使用混合搜索和 Cohere 重排技术,从向量数据库中检索相关文档。
以下是查询生成的代码:
async def generate_queries(
state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:
"""Generate search queries based on the question."""
class Response(TypedDict):
queries: list[str]
logger.info("---GENERATE QUERIES---")
model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0)
messages = [
{"role": "system", "content": GENERATE_QUERIES_SYSTEM_PROMPT},
{"role": "human", "content": state.question},
]
response = cast(Response, await model.with_structured_output(Response).ainvoke(messages))
queries = response["queries"]
queries.append(state.question)
logger.info(f"Queries: {queries}")
return {"queries": response["queries"]}
以下是文档检索和重排的代码:
def _setup_vectorstore() -> Chroma:
"""Set up and return the Chroma vector store instance."""
embeddings = OpenAIEmbeddings()
return Chroma(
collection_name=VECTORSTORE_COLLECTION,
embedding_function=embeddings,
persist_directory=VECTORSTORE_DIRECTORY
)
# Create base retrievers
retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K})
retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K})
ensemble_retriever = EnsembleRetriever(
retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25],
weights=ENSEMBLE_WEIGHTS,
)
# Set up Cohere re-ranking
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=ensemble_retriever,
)
这一步通过检查研究计划中是否还有剩余步骤来确定研究过程是否完成。如果还有步骤,工作流将返回 “conduct_research” 节点继续执行;如果没有剩余步骤,则进入 “respond” 节点生成最终回应。
以下是实现代码:
def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:
"""Determine if the research process is complete."""
if len(state.steps or []) > 0:
return "conduct_research"
else:
return "respond"
这一步根据研究过程中检索到的文档和对话历史,生成对用户查询的最终回应。它利用语言模型将所有相关信息整合成一个全面的答案。
以下是实现代码:
async def respond(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""Generate the final response to the user's query."""
model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0)
context = format_docs(state.documents)
prompt = RESPONSE_SYSTEM_PROMPT.format(context=context)
messages = [{"role": "system", "content": prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
这一步会分析语言模型生成的回应,判断其是否得到了检索到的文档事实的支持,并给出一个二进制评分结果。如果评分表明回应可能包含幻觉,工作流将被中断,并提示用户决定是否重新生成回应或结束流程。
以下是实现代码:
async def check_hallucinations(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Any]:
"""Analyze the response for hallucinations."""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = CHECK_HALLUCINATIONS.format(
documents=state.documents,
generation=state.messages[-1]
)
messages = [
{"role": "system", "content": system_prompt}
] + state.messages
logging.info("---CHECK HALLUCINATIONS---")
response = cast(GradeHallucinations, await model.with_structured_output(GradeHallucinations).ainvoke(messages))
return {"hallucination": response}
如果语言模型的回应未得到事实支持,可能包含幻觉,此时工作流将暂停,并将控制权交给用户。用户可以选择仅重新执行最后的生成步骤,而无需重新启动整个工作流,或者选择结束流程。这种人工参与机制确保了用户对整个过程的控制,避免了不必要的循环或不期望的操作。
以下是实现代码:
def human_approval(state: AgentState):
_binary_score = state.hallucination.binary_score
if _binary_score == "1":
return"END"
else:
retry_generation = interrupt(
{
"question": "Is this correct?",
"llm_output": state.messages[-1]
}
)
if retry_generation == "y":
print("Continue with retry...")
return"respond"
else:
return"END"
为了验证系统的性能,我们使用了一份关于谷歌环境可持续性战略的年度报告进行了测试。这份报告包含了丰富的数据和复杂的表格结构,非常适合用来测试系统的多步骤处理能力和文档解析功能。
我们提出了一个复杂的问题:“检索新加坡第二个数据中心 2019 年和 2022 年的 PUE 效率值,以及 2023 年亚太地区的区域平均 CFE 值。”
系统成功地将这个问题拆解为多个步骤,并生成了相应的查询:
通过检索和重排文档,系统最终给出了准确的答案:
为了进一步验证系统的可靠性,我们将同样的问题提交给了 ChatGPT。结果发现,ChatGPT 返回的值是错误的,明显出现了幻觉现象。这表明,在处理复杂问题时,简单的语言模型可能会生成不准确的信息,而我们的多智能体 RAG 系统通过幻觉检查步骤能够有效避免这种情况。
尽管多智能体 RAG 在性能上有显著提升,但在实际应用中仍面临一些挑战:
总的来说,多智能体 RAG 是人工智能领域的一项重大突破。它将大型语言模型的能力与自主推理和信息检索相结合,引入了一种新的智能和灵活性标准。随着人工智能的不断发展,多智能体 RAG 将在各个行业中发挥基础性作用,彻底改变我们使用技术的方式。
关注我们,一起进步,一起成长!
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-10-27
2024-09-04
2024-07-18
2024-05-05
2024-06-20
2024-06-13
2024-07-09
2024-07-09
2024-05-19
2024-07-07
2025-04-26
2025-04-25
2025-04-22
2025-04-22
2025-04-20
2025-04-19
2025-04-18
2025-04-16