微信扫码
与创始人交个朋友
我要投稿
深入揭秘LangGraph,掌握构建动态订单管理系统的关键技巧。核心内容:1. LangGraph库及其在订单管理中的应用2. 详细步骤:从Python环境设置到工作流图构建3. 节点、边的定义与可视化测试工作流技巧
在这个极为详细的教程中,我们将探索 **LangGraph** — 一个用于协调复杂多步骤工作流的强大库,适用于大型语言模型(LLMs) — 并将其应用于一个常见的电子商务问题:根据用户的查询决定是否下单或取消订单。到本博客结束时,您将了解如何:
我们将逐步进行,详细解释每个概念 — 非常适合初学者和希望使用 LLMs 构建动态或循环工作流的人,我还提供了数据集的链接供您尝试。
什么是 LangGraph?
问题陈述:订单管理
导入说明
数据加载和状态定义
创建工具和 LLM 集成
定义工作流节点
构建工作流图
可视化和测试工作流
结论
LangGraph 是一个库,它为 LangChain 工作流带来了基于图的方式。传统的管道通常线性地从一个步骤移动到另一个步骤,但现实世界的任务常常需要分支、条件逻辑,甚至循环(重试失败的步骤、澄清用户输入等)。
LangGraph 的主要特点:
在此场景中,用户的查询可以是关于下新订单或取消现有订单:
order_id
并将订单标记为已取消。因为我们需要分支(决定“PlaceOrder”与“CancelOrder”),我们将使用 LangGraph 创建一个条件流程:
order_id
并调用取消工具。下面是您提供的代码的确切第一部分,展示了导入和环境设置。我们在代码后添加了注释以解释每个部分。
### Import required libraries
import os
import pandas as pd
import random
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
from typing importLiteral
from langchain_core.prompts import ChatPromptTemplate
from typing importDict, TypedDict
### Load environment variables
os.environ["OPENAI_API_KEY"] = ""
langchain_core.tools, langchain_openai, ToolNode, 等:
tool
(一个装饰器)将 Python 函数转换为 LLM 可以调用的“工具”。ChatOpenAI
是我们与 GPT 模型进行交互的 LLM 客户端。ToolNode
是来自 langgraph.prebuilt
的一个预构建节点,负责工具执行。StateGraph
、MessagesState
、START
、END
来自 langgraph.graph
——它们对于定义我们的工作流程至关重要。MermaidDrawMethod
帮助将工作流程可视化为 Mermaid.js 图表。数据链接:Data
在下一个代码片段中,我们加载 CSV 文件(用于库存和客户)并转换它们为字典。我们还定义了我们的状态类型字典。
### Load datasets
inventory_df = pd.read_csv("inventory.csv")
customers_df = pd.read_csv("customers.csv")
### Convert datasets to dictionaries
inventory = inventory_df.set_index("item_id").T.to_dict()
customers = customers_df.set_index("customer_id").T.to_dict()
classState(TypedDict):
query: str
category: str
next_node: str
item_id: str
order_status: str
cost: str
payment_status: str
location: str
quantity: int
CSV 到字典:
inventory
和 customers
是以 item_id
或 customer_id
为键的字典。这使得查找像 inventory[item_51]
这样的操作变得简单。状态:
query
、category
、item_id
等。category
通常是“PlaceOrder”或“CancelOrder”。next_node
可以存储下一个节点名称,尽管我们依赖图的边缘进行转换。现在我们定义我们的LLM和工具。这里的主要工具是cancel_order
,它使用LLM从查询中提取order_id
。
@tool
defcancel_order(query: str) -> dict:
"""Simulate order cancelling"""
order_id = llm.with_structured_output(method='json_mode').invoke(f'Extract order_id from the following text in json format: {query}')['order_id']
#amount = query.get("amount")
ifnot order_id:
return {"error": "Missing 'order_id'."}
return {"order_status": "Order stands cancelled"}
### Initialize LLM and bind tools
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools_2 = [cancel_order]
llm_with_tools_2 = llm.bind_tools(tools_2)
tool_node_2 = ToolNode(tools_2)
@tool
:
cancel_order
函数现在是LLM可以调用的工具,如果它决定需要取消订单的话。提取order_id
:
llm.with_structured_output(method='json_mode')
来指示LLM返回JSON。然后我们解析出'order_id'
。LLM初始化:
model="gpt-4o-mini"
是选择的模型,temperature=0
用于确定性响应。绑定和ToolNode
:
llm.bind_tools(tools_2)
将我们的LLM与cancel_order
工具连接起来。ToolNode
是一个专门的节点,可以自动处理这些绑定的工具。我们现在将开始逐个定义节点。
模型调用节点
这些节点可以调用模型
def call_model_2(state: MessagesState):
"""Use the LLM to decide the next step."""
messages = state["messages"]
response = llm_with_tools_2.invoke(str(messages))
return {"messages": [response]}
defcall_tools_2(state: MessagesState) -> Literal["tools_2", END]:
"""Route workflow based on tool calls."""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return"tools_2"
return END
call_model_2
: 接收对话(messages
)并将其传递给带工具的LLM。如果LLM触发了工具调用,我们将在call_tools_2
中检测到。call_tools_2
: 检查LLM是否请求了工具调用(tool_calls
)。如果是,我们将路由到"tools_2"
,即ToolNode
;否则,我们结束工作流。在这里,我们定义一个节点来对查询进行分类:
def categorize_query(state: MessagesState) -> MessagesState:
"""Categorize user query into PlaceOrder or CancelOrder"""
prompt = ChatPromptTemplate.from_template(
"Categorize user query into PlaceOrder or CancelOrder"
"Respond with either 'PlaceOrder', 'CancelOrder' Query: {state}"
)
chain = prompt | ChatOpenAI(temperature=0)
category = chain.invoke({"state": state}).content
return {"query":state,"category": category}
"category"
。def check_inventory(state: MessagesState) -> MessagesState:
"""Check if the requested item is in stock."""
item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
if not item_id or not quantity:
return {"error": "Missing 'item_id' or 'quantity'."}
if inventory.get(item_id, {}).get("stock", 0) >= quantity:
print("IN STOCK")
return {"status": "In Stock"}
return {"query":state,"order_status": "Out of Stock"}
item_id
和 quantity
。inventory[item_id]["stock"]
以确认可用性。我们为特定客户定义了一个计算运费的节点
def compute_shipping(state: MessagesState) -> MessagesState:
"""Calculate shipping costs."""
item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
customer_id = llm.with_structured_output(method='json_mode').invoke(f'Extract customer_id from the following text in json format: {state}')['customer_id']
location = customers[customer_id]['location']
ifnot item_id ornot quantity ornot location:
return {"error": "Missing 'item_id', 'quantity', or 'location'."}
weight_per_item = inventory[item_id]["weight"]
total_weight = weight_per_item * quantity
rates = {"local": 5, "domestic": 10, "international": 20}
cost = total_weight * rates.get(location, 10)
print(cost,location)
return {"query":state,"cost": f"${cost:.2f}"}
customers
字典中查找他们的 location。我们将定义一个用于处理支付的节点:
def process_payment(state: State) -> State:
"""Simulate payment processing."""
cost = llm.with_structured_output(method='json_mode').invoke(f'Extract cost from the following text in json format: {state}')
if not cost:
return {"error": "Missing 'amount'."}
print(f"PAYMENT PROCESSED: {cost} and order successfully placed!")
payment_outcome = random.choice(["Success", "Failed"])
return {"payment_status": payment_outcome}
random.choice
来模拟成功或失败。我们现在定义一个用于路由查询的节点:
def route_query_1(state: State) -> str:
"""Route the query based on its category."""
print(state)
if state["category"] == "PlaceOrder":
return "PlaceOrder"
elif state["category"] == "CancelOrder":
return "CancelOrder"
下面,我们创建一个 StateGraph
,添加节点,并定义边和条件边。
### Create the workflow
workflow = StateGraph(MessagesState)
#Add nodes
workflow.add_node("RouteQuery", categorize_query)
workflow.add_node("CheckInventory", check_inventory)
workflow.add_node("ComputeShipping", compute_shipping)
workflow.add_node("ProcessPayment", process_payment)
workflow.add_conditional_edges(
"RouteQuery",
route_query_1,
{
"PlaceOrder": "CheckInventory",
"CancelOrder": "CancelOrder"
}
)
workflow.add_node("CancelOrder", call_model_2)
workflow.add_node("tools_2", tool_node_2)
### Define edges
workflow.add_edge(START, "RouteQuery")
workflow.add_edge("CheckInventory", "ComputeShipping")
workflow.add_edge("ComputeShipping", "ProcessPayment")
workflow.add_conditional_edges("CancelOrder", call_tools_2)
workflow.add_edge("tools_2", "CancelOrder")
workflow.add_edge("ProcessPayment", END)
StateGraph(MessagesState)
:
MessagesState
来保存对话数据。节点:
RouteQuery
是分类用户意图的入口节点。条件边:
workflow.add_conditional_edges("RouteQuery", route_query_1, ...)
确保我们在“下单”的情况下转到 CheckInventory,或者在“取消订单”的情况下转到 CancelOrder。循环:
call_tools_2
)。如果是,我们转到 tools_2
(ToolNode
);在工具被调用后,它会返回到“取消订单”,给 LLM 机会产生进一步的动作或结束。结束:
END
,结束“下单”路径。下一个代码片段将工作流编译成一个代理,渲染为Mermaid图,并使用示例查询进行测试。
### Compile the workflow
agent = workflow.compile()
### Visualize the workflow
mermaid_graph = agent.get_graph()
mermaid_png = mermaid_graph.draw_mermaid_png(draw_method=MermaidDrawMethod.API)
display(Image(mermaid_png))
### Query the workflow
user_query = "I wish to cancel order_id 223"
for chunk in agent.stream(
{"messages": [("user", user_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
auser_query = "customer_id: customer_14 : I wish to place order for item_51 with order quantity as 4 and domestic"
for chunk in agent.stream(
{"messages": [("user", auser_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
编译:
agent = workflow.compile()
将我们的节点/边定义转换为可执行的代理。可视化:
mermaid_png
),可以在Jupyter笔记本中显示,以便进行调试或演示。测试查询:
CancelOrder
。通过利用 LangGraph,我们构建了一个动态的分支工作流,该工作流根据用户意图来下单或取消订单。我们展示了:
categorize_query
) 对查询进行分类。cancel_order
) 并将其集成到工作流中。这种方法是可扩展的:您可以添加更多步骤(例如,地址验证、促销代码)或额外的分支(例如,更新现有订单),而无需重写单体脚本。如果您需要 循环 来重试失败的付款或验证用户确认,LangGraph 也可以处理。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-01-22
LangChain实战 | OutputParser:让大模型输出从 “鸡肋” 变 “瑰宝” 的关键!
2025-01-21
Ambient Agent: 让 AI 主动工作的新范式
2025-01-19
LangChain实战 | 实现一个检索增强生成系统(RAG)
2025-01-19
LangChain:构建智能语言模型应用的开源框架
2025-01-17
报告分享|谷歌 AI Agent 白皮书宣告 2025 年迈入 Agent 时代
2025-01-17
从零开始,用LangChain构建你的第一个智能应用
2025-01-16
深度解析两种增强的AI Agent反思模式
2025-01-07
Agent 最全 Playbook:场景、记忆和交互创新
2024-10-10
2024-04-08
2024-08-18
2024-06-03
2024-09-04
2024-07-13
2024-06-24
2024-04-08
2024-04-17
2024-07-10
2025-02-05
2024-12-02
2024-11-25
2024-10-30
2024-10-11
2024-08-18
2024-08-16
2024-08-04