微信扫码
与创始人交个朋友
我要投稿
备注:这是春节期间研究的一个笔记,先当个存稿发出来,为这一些系列加热一下。
相信大家对langchain都很熟悉了,看起来有些复杂,实则很简单的一个LLM应用开发框架。我从去年初,刚开始做RAG项目时开始接触这个框架,那时的langchain还做的相对较轻,框架的设计思想和编程技巧也用的比较友好,当时我们的项目为了跟langchain接口兼容,对一些接口做了重载实现,以适应我们项目的一些额外需求。最近又重新学习了langchain的一些资料,发现变化非常大,其中LangGraph借鉴了图的思路,可以用于构建一些带流程的LLM应用,如multi agents,也可以用于构建rag应用,实现更灵活的业务逻辑。因此,我打算写几篇系列来学习总结一下这个LangGraph。
LangGraph是一套在langchain框架之上的一套开发组件,可以使用LCEL(Langchain Expression Language)轻松的开发带有状态的,可控循环流程的LLM应用,例如多agent应用。它借鉴了NetworkX框架的设计思路,把一个应用流程定义成一个图,其中节点(node)可以代表一个agent tool,或者一次function call,亦或者一次大模型调用。而边(edge),则代表节点的执行顺序(数据流向),并且可以通过设置conditional edge,来控制流程分支。
我们要实现一个能够判断用户意图是闲聊还是知识库问答(QA)场景的RAG应用。我们可以通过以下的流程来实现。
State 将用于Graph图中,用来保存整个流程图中的消息,可以理解为节点之间通信的消息list。每个节点在被执行的时候,会传入这个消息列表参数,节点可以取出里面的消息然后进行处理,处理完成之后,只需按 {"messages": [response]}这种格式作为函数返回值,就会自动的append进这个消息列表中,从而继续被下一次执行的节点当作参数传入。这个State也可以存放其他的变量,不仅仅是messsage
import operator
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
这里把知识库检索当作一个工具调用,意图识别节点根据用户请求判断是否需要调用工具,如果需要则通过function call的方式来实现知识库检索。为了简化测试,我找了一篇《中华人民共和国道路交通安全法实施条例》网页作为知识内容加载到Chroma向量数据库中,使用最基本的文本分片建索引。
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.tools.retriever import create_retriever_tool
from langgraph.prebuilt import ToolExecutor
## 入库的知识内容
urls = [
"https://www.gov.cn/gongbao/content/2019/content_5468932.htm",
]
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=500, chunk_overlap=50
)
doc_splits = text_splitter.split_documents(docs_list)
## 文档加入到向量数据库中
vectorstore = Chroma.from_documents(
documents=doc_splits,
collection_name="rag-chroma",
embedding=OpenAIEmbeddings(),
)
##创建retriever对象
retriever = vectorstore.as_retriever()
## 创建tool
tool = create_retriever_tool(
retriever,
"retrieve_transportation_rules",
"搜索并返回中国道路交通安全管理条例相关的问题",
)
tools = [tool]
tool_executor = ToolExecutor(tools)
节点代表着一个执行步骤,可以是一次大模型调用,也可以是一次函数调用。
先导包
import json
import operator
from typing import Annotated, Sequence, TypedDict
from langchain import hub
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain.tools.render import format_tool_to_openai_function
from langchain_core.utils.function_calling import convert_to_openai_tool
from langchain_core.messages import BaseMessage, FunctionMessage
from langchain.output_parsers.openai_tools import PydanticToolsParser
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolInvocation
from langchain_core.output_parsers import StrOutputParser
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain_core.prompts import ChatPromptTemplate
定义一个意图识别agent节点,这个agent会调用大模型判断是否需要调用知识检索工具retriever。
def intention_agent(state):
"""
Args:
state (messages): The current state
Returns:
dict: The updated state with the agent response apended to messages
"""
print("---CALL intention detecting---")
messages = state["messages"]
functions = [convert_to_openai_function(t) for t in tools]
model = ChatOpenAI(temperature=0, streaming=True)
model = model.bind_functions(functions)
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
定义一个知识检索节点,这个节点会调用调用知识检索工具
def retrieve(state):
"""
Args:
state (messages): The current state
Returns:
dict: The updated state with retrieved docs
"""
print("---EXECUTE RETRIEVAL---")
messages = state["messages"]
# 最后一条消息,一定来自于上一个节点的输出消息,包含function call所需要的参数
last_message = messages[-1]
# 从消息里取出function call的函数名和参数,构建一个ToolInvocation
action = ToolInvocation(
tool=last_message.additional_kwargs["function_call"]["name"],
tool_input=json.loads(
last_message.additional_kwargs["function_call"]["arguments"]
),
)
# 执行调用结果,并最终返回FunctionMessage对象
response = tool_executor.invoke(action)
function_message = FunctionMessage(content=str(response), name=action.tool)
return {"messages": [function_message]}
定义一个QA 知识问答生成节点,用于将query,召回的知识context,组合成prompt扔给大模型回答。
# 定义一个QA repsonse生成节点
def generate(state):
print("---GENERATE---")
messages = state["messages"]
question = messages[0].content
last_message = messages[-1]
question = messages[0].content
docs = last_message.content
# Prompt
prompt = hub.pull("rlm/rag-prompt")
model = ChatOpenAI(temperature=0, streaming=True)
# Chain
rag_chain = prompt | model | StrOutputParser()
# Run
response = rag_chain.invoke({"context": docs, "question": question})
return {"messages": [response]}
再定义一个闲聊问答生成节点,用于将query直接扔给大模型回答。
# 定义一个chat response生成节点
def chat_generate(state):
print("---CHAT---")
messages = state["messages"]
question = messages[0].content
last_message = messages[-1]
question = messages[0].content
# Prompt
prompt = ChatPromptTemplate.from_messages([
("system", "You are helpful assistant."),
("user", "{question}")
])
# Chain
rag_chain = prompt | model | StrOutputParser()
# Run
response = rag_chain.invoke({"question": question})
return {"messages": [response]}
边edge相当于节点之间的执行顺序。有两种,一种是普通edge,还有一种条件edge。前者就是用于组织两个节点的上下游关系,后者是加上分支条件,可以根据自定义的条件判断函数的返回结果连接不同的节点,类似于switch的功能。
我们先加一个条件判断函数,用于conditional edge的判断
# 判断是否是走知识问答
def should_qa(state):
print("---INTENTION DETECTION---")
messages = state["messages"]
last_message = messages[-1]
# If there is no function call, then we finish
if "function_call" not in last_message.additional_kwargs:
print("---DECISION: chat---")
return "chat"
# Otherwise there is a function call, so we continue
else:
print("---DECISION: qa---")
return "qa"
图的定义逻辑是:
先创建一个状态图对象,构造参数是第一步创建的状态类。
from langgraph.graph import END, StateGraph
# 定义一个图对象
workflow = StateGraph(AgentState)
使用add_node 方法往加入节点
# 往图里加入节点
workflow.add_node("intention_agent", intention_agent) # intention_agent
workflow.add_node("retrieve", retrieve) # retrieval
workflow.add_node("generate", generate) # generate
workflow.add_node("chat_generate", chat_generate) # generate
使用 set_entry_point 方法设置流程的起点
# 设置初始起点
workflow.set_entry_point("intention_agent")
使用 add_conditional_edges 方法来设置条件判断的分支
# 加入条件边,通过should_qa的返回值来判断下一个节点
workflow.add_conditional_edges(
"intention_agent", ##上游节点
should_qa, ## 判断函数
{
"qa": "retrieve", ##根据判断函数的返回值,调用不同的下游节点
"chat": "chat_generate",
},
)
使用 add_edge 方法来设置上下游节点的关系,END节点代表流程的结束
# retrieve -> generate 节点的流向
workflow.add_edge("retrieve", "generate")
# generate ->end 节点的流向
workflow.add_edge("generate", END)
# chat_generate ->end 节点的流向
workflow.add_edge("chat_generate", END)
最后compile 一下
# Compile
app = workflow.compile()
from langchain_core.messages import HumanMessage
inputs = {
"messages": [
HumanMessage(
content="⼩型微型⾮营运载客汽车的年检规则是?"
)
]
}
for output in app.stream(inputs):
for key, value in output.items():
if key in ['generate','chat_generate']:
print(f"Output from node '{key}':")
print(value['messages'][0])
通过打印输出,看到整个调用步骤,最终使用QA问答,从“generate”节点生成了答复。
我们换一个问题“电动汽车有什么优势?”,可以看到,这时走的时chat,从“chat_generate”生成的答复
本文对LangGraph的使用有一个基本介绍,并且用它来实现一个带意图判断的RAG应用。跟传统的方式比起来,使用LangGraph来开发业务,可以用工作流的模式来形象思考,先设计DAG图,定义好节点和边,然后再写代码实现节点和边就行,从业务流程到转换为代码实现的过程非常直观,代码结构也相当更清晰。特别是如果我们构建一个流程相对比较复杂的,Multi-Agent应用,里面需要用到顺序,循环,分叉等各种复杂的流程,这时候用图的构建方式来做就会体现出很大的优势。都说一张图胜过千言万语,所以个人还是很看好这种框架,近期准备采用LangGraph做一个相对复杂的Multi-Agent应用,后续还会研究几个更复杂的场景,出几篇文章总结,感谢各位看官点赞?
53AI,企业落地应用大模型首选服务商
产品:大模型应用平台+智能体定制开发+落地咨询服务
承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-05-14
2024-04-26
2024-03-30
2024-04-12
2024-05-10
2024-07-18
2024-05-22
2024-05-28
2024-04-25
2024-04-26