AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


从PDF到智能回答:LLama3和Adaptive RAG在问答系统中的应用
发布日期:2024-05-10 10:00:04 浏览次数: 1899 来源:旭阳录


引言

在这篇文章里,我将向你展示如何利用 LangGraph、Adaptive Rag 和 LLama3 构建一个完全本地化的聊天机器人,无论是用于商务还是个人目的,它都将是一个强大的助手。

在这篇文章里,我将向你展示如何利用 LangGraph、Adaptive Rag 和 LLama3 构建一个完全本地化的聊天机器人,无论是用于商务还是个人目的,它都将是一个强大的助手。

什么是 Adaptive RAG

Adaptive Rag 是一个创新的框架,通过一个分类器根据查询的复杂度动态选择最合适的策略来处理查询。这种自适应方法能够根据每个查询的特定需求定制检索过程,实现计算效率与准确性的平衡。

Adaptive RAG 的工作原理是什么

Adaptive Rag 框架利用一个分类器根据查询的复杂度动态选择最适合大语言模型的策略。这一过程始于一个经过训练,能够使用自动注释的数据集将查询分类到不同复杂度等级的小型模型。这些数据集是通过结合不同模型的预测结果和现有数据中的固有偏差来创建的。

一旦分类器预测出一个传入查询的复杂度,Adaptive-RAG 框架便决定是否使用迭代检索、一步检索或非检索大语言模型来提供答案。

通过为复杂查询分配更多资源,这种动态选择方法提高了效率,并通过将最佳策略与每个任务相匹配,提高了准确性。

这个框架能够通过为每个查询分配一个复杂性标签来决定最有效的处理策略。这种灵活性使得系统比固定的、一刀切的方法具有更好的性能,结果是一个更高效、更快速且能够以精确和速度处理各种查询复杂性的问答框架。

Llama 3 的 8B 和 70B 模型在“指导”模式下与其他模型相比如何?

为特定聊天用例精细调整预训练模型,引入了一种创新的指令调整方法。这种方法结合了监督式微调 (SFT)、拒绝采样、邻近策略优化 (PPO) 和直接策略优化 (DPO)。

通过 PPO 和 DPO 学习优先级排名,Meta 能够更好地选择如何生成答案,在推理和编码任务中大幅提高性能。

更详细 llama3 介绍:

开始编码

在我们开始使用 LangGraph、Adaptive Rag 对文本数据进行操作之前,我们需要导入各种库和包。这里是一份库及其用途的清单:

  • Langchain:提供 Langchain 功能访问的主要库。

  • LangChain_Community:包含实现 LangChain 核心定义的基础接口的第三方集成。

  • langchain_core:编译 LCEL 序列为优化的执行计划,具有自动并行化、流式处理、跟踪和异步支持。

  • Chroma:用于存储文本嵌入的向量存储库的一部分。

  • LangGraph:一个 alpha 阶段库,用于构建带有 LLM 的有状态、多参与者应用程序。

  • Streamlit:允许您在几分钟内将 Python 脚本转换为交互式网络应用程序。

  • gpt4all:一个生态系统,用于在消费级 CPU 上本地运行强大且定制的大语言模型。

  • tavily-python:为 LLM 和 RAG 优化的搜索引擎。

  • TextSplitter:用于将大型文档拆分为更小、更易管理的块的工具。

  • Ollama:允许您本地运行开源大语言模型,例如 Llama 3。

    from langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain_community.vectorstores import Chromafrom langchain_community.embeddings import GPT4AllEmbeddingsfrom langchain.prompts import PromptTemplatefrom langchain_community.chat_models import ChatOllamafrom langchain_core.output_parsers import JsonOutputParserfrom langchain.prompts import PromptTemplatefrom langchain_community.chat_models import ChatOllamafrom langchain_core.output_parsers import JsonOutputParserfrom langchain import hubfrom langchain_community.chat_models import ChatOllamafrom langchain_core.output_parsers import StrOutputParserfrom typing_extensions import TypedDictfrom typing import Listfrom langchain.schema import Documentfrom langgraph.graph import END, StateGraphfrom langchain_community.tools.tavily_search import TavilySearchResultsfrom langchain.schema import Documentfrom langchain_community.document_loaders import PyPDFLoaderimport streamlit as stimport os

    我们设置了一个名为 local_llm 的变量并赋值为 'llama3',紧接着,配置了名为 Tavily API的环境变量并附上了 API 密钥。

    利用 Streamlit 的 st.title 函数,我们为网页设置了标题。之后,我们创建了一个文本输入框供用户输入问题,并在网页旁边添加了一个文件上传栏。

    通过这行代码,我们在侧边栏中加入了一个文件上传工具,设置为只接受 PDF 文件。最后,我们加上了一个名为“处理”的按钮,用于处理上传的 PDF 文件。

      local_llm = "llama3"tavily_api_key = os.environ['TAVILY_API_KEY'] = 'API_KEY'st.title("Multi-PDF ChatBot using LLAMA3 & Adaptive RAG")user_input = st.text_input("Question:", placeholder="Ask about your PDF", key='input')
      with st.sidebar:uploaded_files = st.file_uploader("Upload your file", type=['pdf'], accept_multiple_files=True)process = st.button("Process")if process:if not uploaded_files:st.warning("Please upload at least one PDF file.")st.stop()

      我们创建了一个名为 temp_dir 的变量,用来指向计算机上用于存储临时文件的目录路径。然后,检查该目录是否存在,如果不存在,则创建该目录。

      接下来,我们开始遍历用户上传的每个文件。对于每个文件,我们通过拼接临时目录路径和文件名来构建文件保存的完整路径。然后在指定的路径下打开文件,并将上传的文件内容写入磁盘。

      之后,我们用保存文件的路径初始化了一个 PyPDFLoader 实例。最后,我们使用该加载器读取 PDF 文件,并将其内容存储在名为 Data 的变量中。

        # Ensure the temp directory existstemp_dir = 'Users/home/temp/'if not os.path.exists(temp_dir):os.makedirs(temp_dir)
        # Process each uploaded filefor uploaded_file in uploaded_files:temp_file_path = os.path.join(temp_dir, uploaded_file.name)
        # Save the file to diskwith open(temp_file_path, "wb") as file:file.write(uploaded_file.getbuffer())# Use getbuffer() for Streamlit's UploadedFile
        # Load the PDF using PyPDFLoadertry:loader = PyPDFLoader(temp_file_path)data = loader.load()# Assuming loader.load() is the correct method callst.write(f"Data loaded for {uploaded_file.name}")except Exception as e:st.error(f"Failed to load {uploaded_file.name}: {str(e)}")

        我们创建了一个名为 RecursiveCharacterTextSplitter 的实例,配置其 chunk_size 为 250,chunk_overlap 为零。我们将使用 split_text 方法,它接收一个表示文本的字符串输入,并返回一系列字符串,每个字符串代表分割后的一个数据块。现在我们得到了数据块,就让我们将它们存储在向量数据库中。我使用的是 GPT4AllEmbeddings;你可以根据自己的偏好选择。

          text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=250, chunk_overlap=0)text_chunks = text_splitter.split_documents(data)
          # Add to vectorDBvectorstore = Chroma.from_documents(documents=text_chunks,collection_name="rag-chroma",embedding=GPT4AllEmbeddings(),)retriever = vectorstore.as_retriever()llm = ChatOllama(model=local_llm, format="json", temperature=0)

          我们利用 PromptTemplate 创建了一个字符串提示模板,指导专家系统如何决定用户的问题应该被引导至向量存储还是网络搜索。然后,我们建立了一个管道,它使用之前定义的提示作为输入,通过一个未指定的大语言模型处理,并定义了一个关于 LLM 代理记忆的示例问题。最后,管道提取第二个检索到的文档的内容。

             prompt = PromptTemplate(template="""You are an expert at routing a user question to a vectorstore or web search. \\nUse the vectorstore for questions on LLMagents, prompt engineering, and adversarial attacks. \\nYou do not need to be stringent with the keywords in the question related to these topics. \\nOtherwise, use web-search. Give a binary choice 'web_search' or 'vectorstore' based on the question. \\nReturn the a JSON with a single key 'datasource' and no premable or explaination. \\nQuestion to route: {question}""",input_variables=["question"],)
            question_router = prompt | llm | JsonOutputParser()question = "llm agent memory"docs = retriever.get_relevant_documents(question)doc_txt = docs[1].page_contentquestion_router.invoke({"question": question})llm = ChatOllama(model=local_llm, format="json", temperature=0)

            我们还使用 PromptTemplate 对文档与用户问题的相关性进行评分,以确定文档是否包含与问题相关的关键词,并提供一个表示相关性的二元得分(‘是’或‘否’),该得分以一个简单的包含 score 键的 JSON 格式返回。

              prompt = PromptTemplate(template="""You are a grader assessing relevance of a retrieved document to a user question. \\nHere is the retrieved document: \\n\\n {document} \\n\\nHere is the user question: {question} \\nIf the document contains keywords related to the user question, grade it as relevant. \\nIt does not need to be a stringent test. The goal is to filter out erroneous retrievals. \\nGive a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \\nProvide the binary score as a JSON with a single key 'score' and no premable or explaination.""",input_variables=["question", "document"],)
              retrieval_grader = prompt | llm | JsonOutputParser()question = "agent memory"docs = retriever.get_relevant_documents(question)doc_txt = docs[1].page_contentst.write(retrieval_grader.invoke({"question": question, "document": doc_txt}))

              我们使用 LangChain hub 来获取提示。接着,我们定义了一个名为 format_docs 的函数,它接受一系列文档对象作为输入。然后,我们创建了一个名为 rag_chain 的管道,并将用户的问题设置为“代理记忆”。

              最终,它打印出链生成的输出。这个输出预期是语言模型对输入问题的响应,经过链的处理和格式化。

                ### Generateprompt = hub.pull("rlm/rag-prompt")
                # LLMllm = ChatOllama(model=local_llm, temperature=0)
                # Post-processingdef format_docs(docs):return "\\n\\n".join(doc.page_content for doc in docs)
                # Chainrag_chain = prompt | llm | StrOutputParser()
                # Runquestion = "agent memory"generation = rag_chain.invoke({"context": docs, "question": question})print(generation)

                我们定义了一个 PromptTemplate,以帮助评估一组给定的事实是否支撑了一个答案。

                这需要展示一系列标记为“事实”的文档,随后是需要根据这些事实评估的“答案”。

                评分员被指示提供一个简单的‘是’或‘否’得分,指示答案是否得到事实的支持。这个决定应该以一个包含单个 score 键的 JSON 对象返回。

                  ### Hallucination Grader# LLMllm = ChatOllama(model=local_llm, format="json", temperature=0)
                  # Promptprompt = PromptTemplate(template="""You are a grader assessing whether an answer is grounded in / supported by a set of facts. \\nHere are the facts:\\n ------- \\n{documents}\\n ------- \\nHere is the answer: {generation}Give a binary score 'yes' or 'no' score to indicate whether the answer is grounded in / supported by a set of facts. \\nProvide the binary score as a JSON with a single key 'score' and no preamble or explanation.""",input_variables=["generation", "documents"],)
                  hallucination_grader = prompt | llm | JsonOutputParser()hallucination_grader.invoke({"documents": docs, "generation": generation})

                  我们还定义了一个 PromptTemplate,用于评估给定答案在解决特定问题方面的实用性。这个模板展示了答案和相关问题,由线条分隔,引导评分员评估答案的相关性和实用性。

                  评分员的任务是提供一个简单的‘是’或‘否’判断,表示答案的实用性,这应该以一个包含单个 score 键的 JSON 对象返回。

                    ### Answer Grader
                    # LLMllm = ChatOllama(model=local_llm, format="json", temperature=0)
                    # Promptprompt = PromptTemplate(template="""You are a grader assessing whether an answer is useful to resolve a question. \\nHere is the answer:\\n ------- \\n{generation}\\n ------- \\nHere is the question: {question}Give a binary score 'yes' or 'no' to indicate whether the answer is useful to resolve a question. \\nProvide the binary score as a JSON with a single key 'score' and no preamble or explanation.""",input_variables=["generation", "question"],)
                    answer_grader = prompt | llm | JsonOutputParser()answer_grader.invoke({"question": question,"generation": generation})

                    我们还定义了一个 PromptTemplate,用于重写问题以提高其在向量存储中的检索适用性。

                      ### Question Re-writer
                      # LLMllm = ChatOllama(model=local_llm, temperature=0)
                      # Promptre_write_prompt = PromptTemplate(template="""You a question re-writer that converts an input question to a better version that is optimized \\n for vectorstore retrieval. Look at the initial and formulate an improved question. \\n Here is the initial question: \\n\\n {question}. Improved question with no preamble: \\n """,input_variables=["generation", "question"],)
                      question_rewriter = re_write_prompt | llm | StrOutputParser()question_rewriter.invoke({"question": question})

                      我们添加了网络搜索工具 tavily 来帮助提取相关主题内容。

                        web_search_tool = TavilySearchResults(k=3,tavily_api_key=tavily_api_key)

                        我们为图定义了状态结构。在这个例子中,我们的状态包括用户的问题、问题的生成和一个文档。

                          class GraphState(TypedDict):"""Represents the state of our graph.
                          Attributes:question: questiongeneration: LLM generationdocuments: list of documents"""question : strgeneration : strdocuments : List[str]

                          我们创建了一个名为 retrieve 的函数,它以包含问题的当前状态作为输入。然后,它使用检索器基于提供的问题获取相关文档。检索到的文档连同原始问题一起添加到状态中。

                          我们还创建了一个名为 generate 的函数来改进问题并生成答案。它使用一个检索增强生成(RAG)模型来生成答案。

                          然后我们创建了一个名为 Grade documents 的函数,用于评估检索到的文档与原始问题的相关性。它遍历每个文档,使用检索评分员对其相关性进行评分。被认为是相关的文档被保留,而不相关的则从状态中过滤掉。

                          最后,我们创建了一个名为 transform_query 的函数,以改进原始问题以便更好地检索。它以原始问题和可能的检索文档作为输入。使用问题重写生成原始问题的更好表达版本。

                            def retrieve(state):"""Retrieve documents
                            Args:state (dict): The current graph state
                            Returns:state (dict): New key added to state, documents, that contains retrieved documents"""print("---RETRIEVE---")question = state["question"]
                            # Retrievaldocuments = retriever.get_relevant_documents(question)return {"documents": documents, "question": question}
                            def generate(state):"""Generate answer
                            Args:state (dict): The current graph state
                            Returns:state (dict): New key added to state, generation, that contains LLM generation"""print("---GENERATE---")question = state["question"]documents = state["documents"]
                            # RAG generationgeneration = rag_chain.invoke({"context": documents, "question": question})return {"documents": documents, "question": question, "generation": generation}
                            def grade_documents(state):"""Determines whether the retrieved documents are relevant to the question.
                            Args:state (dict): The current graph state
                            Returns:state (dict): Updates documents key with only filtered relevant documents"""
                            print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")question = state["question"]documents = state["documents"]
                            # Score each docfiltered_docs = []for d in documents:score = retrieval_grader.invoke({"question": question, "document": d.page_content})grade = score['score']if grade == "yes":print("---GRADE: DOCUMENT RELEVANT---")filtered_docs.append(d)else:print("---GRADE: DOCUMENT NOT RELEVANT---")continuereturn {"documents": filtered_docs, "question": question}
                            def transform_query(state):"""Transform the query to produce a better question.
                            Args:state (dict): The current graph state
                            Returns:state (dict): Updates question key with a re-phrased question"""
                            print("---TRANSFORM QUERY---")question = state["question"]documents = state["documents"]
                            # Re-write questionbetter_question = question_rewriter.invoke({"question": question})return {"documents": documents, "question": better_question}

                            我们有一个基于重述问题的网络搜索函数。它使用网络搜索工具检索网络结果,并将它们格式化为单个文档。

                            我们还有一个名为 route_question 的函数,根据问题的来源决定是将问题引导到网络搜索还是 RAG。它调用一个问题路由器来确定问题的来源,无论是来自网络搜索还是向量存储。根据来源,它返回调用下一个节点的相应节点。

                            然后我们创建了一个名为 decide_to_generate 的函数,这个函数基于过滤文档的相关性决定是生成答案还是重新生成问题。如果所有文档被认为是不相关的,它决定重新生成一个新查询。否则,如果存在相关文档,则生成答案。

                            最后,我们有 grade_generate_v_documents_and_question 函数,这个函数通过检查生成是否基于提供的文档并回答了原始问题来评估生成答案的质量。首先检查生成是否基于提供的文档。如果是,它进一步评估生成的答案是否基于评估解决了原始问题。根据评估,它决定生成是有用还是无用。

                              def web_search(state):"""Web search based on the re-phrased question.
                              Args:state (dict): The current graph state
                              Returns:state (dict): Updates documents key with appended web results"""
                              print("---WEB SEARCH---")question = state["question"]
                              # Web searchdocs = web_search_tool.invoke({"query": question})web_results = "\\n".join([d["content"] for d in docs])web_results = Document(page_content=web_results)
                              return {"documents": web_results, "question": question}
                              ### Edges ###
                              def route_question(state):"""Route question to web search or RAG.
                              Args:state (dict): The current graph state
                              Returns:str: Next node to call"""
                              print("---ROUTE QUESTION---")question = state["question"]print(question)source = question_router.invoke({"question": question})print(source)print(source['datasource'])if source['datasource'] == 'web_search':print("---ROUTE QUESTION TO WEB SEARCH---")return "web_search"elif source['datasource'] == 'vectorstore':print("---ROUTE QUESTION TO RAG---")return "vectorstore"
                              def decide_to_generate(state):"""Determines whether to generate an answer, or re-generate a question.
                              Args:state (dict): The current graph state
                              Returns:str: Binary decision for next node to call"""
                              print("---ASSESS GRADED DOCUMENTS---")question = state["question"]filtered_documents = state["documents"]
                              if not filtered_documents:# All documents have been filtered check_relevance# We will re-generate a new queryprint("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---")return "transform_query"else:# We have relevant documents, so generate answerprint("---DECISION: GENERATE---")return "generate"
                              def grade_generation_v_documents_and_question(state):"""Determines whether the generation is grounded in the document and answers question.
                              Args:state (dict): The current graph state
                              Returns:str: Decision for next node to call"""
                              print("---CHECK HALLUCINATIONS---")question = state["question"]documents = state["documents"]generation = state["generation"]
                              score = hallucination_grader.invoke({"documents": documents, "generation": generation})grade = score['score']
                              # Check hallucinationif grade == "yes":print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")# Check question-answeringprint("---GRADE GENERATION vs QUESTION---")score = answer_grader.invoke({"question": question,"generation": generation})grade = score['score']if grade == "yes":print("---DECISION: GENERATION ADDRESSES QUESTION---")return "useful"else:print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")return "not useful"else:pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")return "not supported"

                              我们已经定义了我们需要的所有节点。现在,我们可以定义工作流并将节点添加到其中。现在连接相应的节点并设置入口点。这是工作流开始的节点。

                              这个图将包括五个节点:检索器、生成器、文档评分员、查询转换器和网络搜索,以及 1 个边缘将是决定生成。

                                workflow = StateGraph(GraphState)
                                # Define the nodesworkflow.add_node("web_search", web_search) # web searchworkflow.add_node("retrieve", retrieve) # retrieveworkflow.add_node("grade_documents", grade_documents) # grade documentsworkflow.add_node("generate", generate) # generataeworkflow.add_node("transform_query", transform_query) # transform_query
                                # Build graphworkflow.set_conditional_entry_point(route_question,{"web_search": "web_search","vectorstore": "retrieve",},)workflow.add_edge("web_search", "generate")workflow.add_edge("retrieve", "grade_documents")workflow.add_conditional_edges("grade_documents",decide_to_generate,{"transform_query": "transform_query","generate": "generate",},)workflow.add_edge("transform_query", "retrieve")workflow.add_conditional_edges("generate",grade_generation_v_documents_and_question,{"not supported": "generate","useful": END,"not useful": "transform_query",},)
                                # Compileapp = workflow.compile()

                                现在,让我们执行这个过程。首先,让我们输入一个问题,该问题将执行一个查找向量数据并回答问题的管道。

                                  inputs = {"question": user_input}for output in app.stream(inputs):for key, value in output.items():# Nodest.write(f"Node '{key}':")# Optional: print full state at each node# pprint.pprint(value["keys"], indent=2, width=80, depth=None)print("\\n---\\n")
                                  # Final generationst.write(value["generation"])

                                  总结:

                                  我使用 LLama3 实现了 Adaptive RAG 示例。

                                  这次,只执行了一个简单的示例,但根据问题的不同,可能会执行更多迭代处理,例如查询转换。

                                  在实际操作中,需要考虑各种事项,例如调整参数和限制循环次数。

                                  然而,根据查询确定并执行路线在质量和效率方面是有意义的。

                                  资源:

                                  • Github:https://github.com/mcks2000/llm_notebooks/tree/main/rag/lang_graph

                                  • Tavily KEY 申请地址:https://app.tavily.com/home


                                  53AI,企业落地应用大模型首选服务商

                                  产品:大模型应用平台+智能体定制开发+落地咨询服务

                                  承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

                                  联系我们

                                  售前咨询
                                  186 6662 7370
                                  预约演示
                                  185 8882 0121

                                  微信扫码

                                  与创始人交个朋友

                                  回到顶部

                                   
                                  扫码咨询