AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


LangGraph AI智能体如何通过知识图谱实现更强智能?以供应链管理系统为例
发布日期:2025-01-18 09:45:27 浏览次数: 1550 来源:活水智能

点击上方↗️活水智能关注 + 星标?

作者:Kshitij Kutumbe

编译:活水智能

 

使用图数据库与LLM的示意图:NebulaGraph

检索增强生成(Retrieval-Augmented Generation,RAG)通过将大型语言模型(LLM)与外部知识结合,改变了我们利用LLM的方式。一个典型的RAG系统会将文档(或其他数据)索引为向量嵌入,通过相似性搜索检索相关信息,并将其注入LLM的上下文中。这种方式在处理非结构化文本时非常强大。然而,现实中的企业数据通常包含结构化成分,例如数值属性、聚合和关系,而这些是文本嵌入无法很好处理的。

在本文中,我们将探讨如何通过集成知识图谱(特别是Neo4j)和结构化工具来处理更复杂的查询,同时保留基于嵌入的非结构化文本搜索的优势。阅读完本文后,您将学会如何构建一个能够有效处理非结构化和结构化查询的强大RAG应用程序。本文包含了学习和实验这种方法所需的所有内容,包括数据、完整代码和详细解释。

数据链接:数据集(https://www.kaggle.com/datasets/kshitijkutumbe/supply-chain-dataset-dummy/data)

创建本文中使用的Neo4j实例的链接:图实例(https://console.neo4j.io/)

理解RAG应用

一个典型的RAG应用包括:

  • • 向量存储/嵌入:将非结构化文本转换为向量嵌入。

  • • 检索:使用相似性搜索找到最相关的文档或知识片段。

  • • 生成:使用LLM处理用户查询并生成格式化答案。

然而,纯粹基于嵌入的方法的弱点在于处理结构化或关系型数据(例如数值过滤、排序、聚合)。这是知识图谱的优势所在。

文本嵌入的局限性

文本嵌入在RAG应用中被广泛使用,但在处理结构化数据方面存在显著局限性。这些局限性会阻碍查询的准确性和有效性,尤其是在处理现实世界数据场景时。

过滤和排序

文本嵌入擅长理解语义内容,但它们无法自然地处理过滤或排序等操作。例如,当您想要检索供应能力大于40,000的供应商时,文本嵌入无法胜任,因为它们并非为处理数值过滤设计的。它们无法对数值字段应用“>”或“<”这样的条件,而这些条件在许多数据驱动的查询中至关重要。

聚合

文本嵌入在执行聚合操作时也存在困难。例如,像“欧洲有多少供应商?”这样的查询需要对数据进行计数或分组。文本嵌入本身并不提供直接的方式来聚合数据。尽管它们在语义搜索方面表现出色,但像计数、求和或分组这样的任务需要结构化操作,而嵌入无法直接完成。

数据复杂性

在实际生产系统中,数据通常具有超越简单关联的复杂关系。嵌入旨在捕获语义含义,但可能会忽略实体之间复杂的关系连接。例如,在知识图谱中,供应商、位置和产品等实体是相互关联的,需要同时理解实体及其关系。嵌入无法有效处理这些复杂性,从而难以提取基于关系的准确洞察。

为什么需要知识图谱

知识图谱是一种强大的数据结构,它以捕获不同实体之间关系的方式来表示知识。知识图谱不是以孤立的表格或列表形式表示数据,而是通过节点(表示实体)和边(表示实体之间的关系)来建模数据。这种方法特别适合捕获现实世界数据的复杂互联特性。

例如,在商业环境中,供应商、位置和产品等实体可以被建模为节点,节点之间通过 “供应” 或 “位于” 等关系连接。这种结构使我们能更直观地理解数据中不同元素的关系,非常适合需要导航复杂数据连接的任务。

存储结构化数据

在知识图谱中,每个节点可以具有属性,例如供应能力、名称或位置,这些属性描述了它所表示的实体。节点之间的关系也可以具有属性,从而支持丰富的上下文数据表示。例如,一个供应商节点可能与一个位置节点通过 “位于” 关系连接,关系中包含城市或国家等属性。

使用Cypher查询

Neo4j是一种流行的图数据库,它使用Cypher(一种声明性查询语言)与图进行交互。Cypher专为图设计,使得执行涉及过滤、聚合和路径匹配的复杂查询变得更容易。例如,您可以使用Cypher查找具有特定供应能力的供应商,或者查找图中两个位置之间的最短路径。

结合非结构化和结构化数据

知识图谱的一大优势是能够结合结构化和非结构化数据。通过为节点添加嵌入属性,您可以将向量嵌入(表示非结构化数据,如文本描述)与图中的结构化数据一起存储。这使您能够在执行语义搜索(例如查找与“原材料”相关的供应商)的同时,执行传统的结构化查询(如“查找供应能力大于40,000的供应商”)。这种在单一数据库中融合结构化和非结构化数据的方法,使得能够运行全面的查询并做出更明智的决策。

将知识图谱与语言模型结合

将知识图谱(KGs)与语言模型(LLMs)集成的关键在于充分利用两者的优势。LLMs擅长理解复杂语言、解释上下文并生成连贯的类人响应。然而,当需要执行结构化查询(如过滤、排序或对大型数据集进行聚合)时,LLMs并不是最佳选择。这时候,像Neo4j这样的知识图谱通过提供高效的结构化数据查询能力脱颖而出。

通过将生成数据库查询(例如Neo4j的Cypher)的复杂性卸载到专用工具函数上,您可以确保应用程序保持稳健、可靠和准确。这种任务分工使LLMs能够专注于其自然语言处理的优势,而知识图谱则高效处理结构化数据。结果是,您可以构建一个结合两者优势的系统,无缝处理非结构化语言数据和结构化、基于关系的查询。

项目概述:供应商管理

我们将实现一个系统来存储、搜索和检索来自世界各地的供应商,每个供应商具有以下属性:

  • • 名称

  • • 位置

  • • 供应能力(数值)

  • • 简短描述

系统必须能够处理:

  • • 基于供应能力的过滤。

  • • 计数和分组。

  • • 基于描述的向量相似性搜索。

  • • 聚合(例如按位置分组)。

安装所需库

在笔记本或终端中安装以下库:

!pip install --quiet neo4j pyvis langchain-community langchain-openai langgraph

这些库包括:

  • • neo4j:用于连接Neo4j的Python驱动。

  • • pyvis:用于可视化图(可选)。

  • • langchain-community:LangChain相关的社区工具。

  • • langchain-openai:用于使用OpenAI的LLM和嵌入。

  • • langgraph:用于构建带图逻辑的状态机(例如ReAct智能体)。

配置环境变量

为了安全地连接Neo4j和OpenAI,我们将所需的凭据存储为环境变量。这使得代码可以访问敏感信息而无需硬编码。以下是设置方式:

import os

NEO4J_URI = "neo4j+s://<your-instance-id>.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "<your-password>"
OPENAI_API_KEY = "<your-openai-key>"

os.environ["NEO4J_URI"] = NEO4J_URI
os.environ["NEO4J_USERNAME"] = NEO4J_USER
os.environ["NEO4J_PASSWORD"] = NEO4J_PASSWORD
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

此代码将Neo4j和OpenAI的凭据设置为环境变量,确保它们可以在整个应用程序中安全访问。

连接到Neo4j

我们需要连接到一个Neo4j实例,以便能够执行数据导入和查询,以下代码可以实现这一点。

from langchain_community.graphs import Neo4jGraph

graph = Neo4jGraph(refresh_schema=False)

Neo4jGraph负责建立连接,并可以选择性地刷新模式。

在Neo4j中构建知识图谱

使用CSV进行数据导入

您有两个CSV文件:

  1. 1. nodes.csv,包含列如id、location、name、description。

  2. 2. relationships.csv,包含列如START_ID、END_ID、TYPE。

以下是数据导入脚本:

import csv
import numpy as np
from neo4j import GraphDatabase

NODES_CSV = "nodes.csv"
RELATIONSHIPS_CSV = "relationships.csv"
def get_label_for_type(node_type):
    mapping = {
        "Supplier": "Supplier",
        "Manufacturer": "Manufacturer",
        "Distributor": "Distributor",
        "Retailer": "Retailer",
        "Product": "Product"
    }
    return mapping.get(node_type, "Entity")


def ingest_nodes(driver):
    with driver.session() as session:
        with open(NODES_CSV, mode='r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                node_id = row['id:ID']
                name = row['name']
                node_type = row['type']
                location = row['location']
                supply_capacity = np.random.randint(1000, 50001)
                description = row['description']
                label = get_label_for_type(node_type)
                if location.strip():
                    query = f"""
                    MERGE (n:{label} {{id:$id}})
                    SET n.name = $name, n.location = $location, 
                        n.description = $description, n.supply_capacity = $supply_capacity
                    """
                    params = {
                        "id": node_id,
                        "name": name,
                        "location": location,
                        "description": description,
                        "supply_capacity": supply_capacity
                    }
                else:
                    query = f"""
                    MERGE (n:{label} {{id:$id}})
                    SET n.name = $name
                    """
                    params = {"id": node_id, "name": name}
                session.run(query, params)


def ingest_relationships(driver):
    with driver.session() as session:
        with open(RELATIONSHIPS_CSV, mode='r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                start_id = row[':START_ID']
                end_id = row[':END_ID']
                rel_type = row[':TYPE']
                product = row['product']
                if product.strip():
                    query = f"""
                    MATCH (start {{id:$start_id}})
                    MATCH (end {{id:$end_id}})
                    MERGE (start)-[r:{rel_type} {{product:$product}}]->(end)
                    """
                    params = {
                        "start_id": start_id,
                        "end_id": end_id,
                        "product": product
                    }
                else:
                    query = f"""
                    MATCH (start {{id:$start_id}})
                    MATCH (end {{id:$end_id}})
                    MERGE (start)-[r:{rel_type}]->(end)
                    """
                    params = {
                        "start_id": start_id,
                        "end_id": end_id
                    }
                session.run(query, params)
  • • ingest_nodes函数从CSV中读取供应商数据并在Neo4j中创建节点。

  • • 每个节点表示一个供应商,具有名称、位置和供应能力等属性。

  • • MERGE命令确保节点被创建或更新而不会重复。

  • • ingest_relationships函数从CSV中读取关系并在节点之间创建连接。

  • • 在供应商和产品节点之间创建关系,关系具有产品等属性。

  • • MERGE命令确保关系唯一。

创建索引

在Neo4j中创建索引或约束对于性能和数据唯一性至关重要:

def create_indexes(driver):
    with driver.session() as session:
        for label in ["Supplier", "Manufacturer", "Distributor", "Retailer", "Product"]:
            session.run(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.id IS UNIQUE")
  • • 此代码通过在每种节点类型(例如供应商、制造商)的id属性上创建唯一约束,确保图中的每个节点都有唯一标识符。

  • • CREATE CONSTRAINT语句优化了查询并避免了重复数据。

运行数据导入

以下代码将之前定义的所有内容整合在一起,并启动数据导入过程到我们的Neo4j实例中。

driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
create_indexes(driver)
ingest_nodes(driver)
ingest_relationships(driver)
print("Data ingestion complete.")
driver.close()

导入完成后,您可以在Neo4j AuraDB实例中可视化模式。

通过上述导入脚本创建的实际模式的截图

在Neo4j中存储向量嵌入

尽管我们已经有了结构化数据,但我们仍希望为描述添加文本嵌入。通过在Neo4j中存储向量嵌入,您可以执行语义查询(例如,查找与查询文本相似的供应商)。

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Neo4jVector

embedding = OpenAIEmbeddings(model="text-embedding-3-small")
neo4j_vector = Neo4jVector.from_existing_graph(
    embedding=embedding,
    index_name="supply_chain",
    node_label="Supplier",
    text_node_properties=["description"],
    embedding_node_property="embedding",
)

通过此配置:

  • • node_label="Supplier"将嵌入存储限制为供应商节点。

  • • text_node_properties=["description"]指定哪些属性会被嵌入。

  • • embedding_node_property="embedding"是存储生成嵌入向量的位置。

现在,我们可以在Neo4j中执行语义相似性查询,将结构化数据与非结构化文本搜索相结合。

处理结构化查询的工具

为了处理结构化查询(如计数或列出具有数值过滤的供应商),我们定义了一些工具。每个工具本质上是LLM可以调用的一个函数。我们使用Pydantic指定预期输入,确保清晰性和类型检查。

供应商计数工具

我们需要一个工具来根据可选的过滤条件(如最小供应能力、最大供应能力或按属性分组)统计供应商数量。以下代码使用Pydantic定义了输入架构,并实现了查询Neo4j并返回供应商计数的函数。

输入架构

我们需要指定输入架构以确保正确的信息被发送到工具。

from pydantic import BaseModel, Field
from typing import Optional, Dict, List

class SupplierCountInput(BaseModel):
    min_supply_amount: Optional[int] = Field(
        description="Minimum supply amount of the suppliers"
    )
    max_supply_amount: Optional[int] = Field(
        description="Maximum supply amount of the suppliers"
    )
    grouping_key: Optional[str] = Field(
        description="The key to group by the aggregation", 
        enum=["supply_capacity", "location"]
    )

SupplierCountInput定义了工具的输入结构,包括按供应能力过滤供应商的可选字段。它还提供了按属性(例如位置)分组结果的能力。

函数实现

以下代码块中定义了返回供应商计数所需的函数:

import re
from langchain_core.tools import tool

def extract_param_name(filter: str) -> str:
    pattern = r'\$\w+'
    match = re.search(pattern, filter)
    if match:
        return match.group()[1:]
    return None

@tool("supplier-count", args_schema=SupplierCountInput)
def supplier_count(
    min_supply_amount: Optional[int],
    max_supply_amount: Optional[int],
    grouping_key: Optional[str],
) -> List[Dict]:
    """Calculate the count of Suppliers based on particular filters"""
    filters = [
        ("t.supply_capacity >= $min_supply_amount", min_supply_amount),
        ("t.supply_capacity <= $max_supply_amount", max_supply_amount)
    ]
    params = {
        extract_param_name(condition): value
        for condition, value in filters
        if value is not None
    }
    where_clause = " AND ".join(
        [condition for condition, value in filters if value is not None]
    )
    cypher_statement = "MATCH (t:Supplier) "
    if where_clause:
        cypher_statement += f"WHERE {where_clause} "
    return_clause = (
        f"t.{grouping_key}, count(t) AS supplier_count" 
        if grouping_key 
        else "count(t) AS supplier_count"
    )
    cypher_statement += f"RETURN {return_clause}"
    print(cypher_statement)  # Debugging output
    return graph.query(cypher_statement, params=params)

工作原理

  • • extract_param_name:此辅助函数从过滤条件中提取参数名称(例如$min_supply_amount)。

  • • supplier_count:此工具函数构造一个Cypher查询,根据给定的过滤条件统计供应商数量,并可选择按属性(如位置)分组结果。

  • • 它动态构建查询,添加适当的过滤条件,并使用Neo4j的查询系统执行查询。

供应商列表工具

我们需要一个工具来列出供应商,可选地按供应能力排序、按能力过滤,并在提供description时执行向量搜索。

输入架构

我们需要指定输入架构以确保正确的信息被发送到工具。

class SupplierListInput(BaseModel):
    sort_by: str = Field(description="How to sort Suppliers by supply capacity", enum=['supply_capacity'])
    k: Optional[int] = Field(description="Number of Suppliers to return")
    description: Optional[str] = Field(description="Description of the Suppliers")
    min_supply_amount: Optional[int] = Field(description="Minimum supply amount of the suppliers")
    max_supply_amount: Optional[int] = Field(description="Maximum supply amount of the suppliers")

SupplierListInput定义了此工具的输入架构,包含按供应能力过滤、按指定键(如supply_capacity)排序以及可选的描述(用于基于向量的搜索)的参数。

函数实现

以下代码块中定义了返回供应商列表所需的函数:

@tool("supplier-list", args_schema=SupplierListInput)
def supplier_list(
    sort_by: str = "supply_capacity",
    k : int = 4,
    description: Optional[str] = None,
    min_supply_amount: Optional[int] = None,
    max_supply_amount: Optional[int] = None,
) -> List[Dict]:
    """List suppliers based on particular filters"""

    # Handle vector-only search when no prefiltering is applied
    if description and not min_supply_amount and not max_supply_amount:
        return neo4j_vector.similarity_search(description, k=k)
    filters = [
        ("t.supply_capacity >= $min_supply_amount", min_supply_amount),
        ("t.supply_capacity <= $max_supply_amount", max_supply_amount)
    ]
    params = {
        key.split("$")[1]: value for key, value in filters if value is not None
    }
    where_clause = " AND ".join([condition for condition, value in filters if value is not None])
    cypher_statement = "MATCH (t:Supplier) "
    if where_clause:
        cypher_statement += f"WHERE {where_clause} "
    # Sorting and returning
    cypher_statement += " RETURN t.name AS name, t.location AS location, t.description as description, t.supply_capacity AS supply_capacity ORDER BY "
    if description:
        cypher_statement += (
            "vector.similarity.cosine(t.embedding, $embedding) DESC "
        )
        params["embedding"] = embedding.embed_query(description)
    elif sort_by == "supply_capacity":
        cypher_statement += "t.supply_capacity DESC "
    else:
        # Fallback or other possible sorting
        cypher_statement += "t.year DESC "
    cypher_statement += " LIMIT toInteger($limit)"
    params["limit"] = k or 4
    print(cypher_statement)  # Debugging output
    data = graph.query(cypher_statement, params=params)
    return data

supplier_list:此工具列出供应商,可选地按供应能力过滤,并按供应能力或其他标准排序。如果提供了描述,则使用Neo4j的向量存储执行向量相似性搜索以找到与描述匹配的供应商。该函数还处理查询的构建并在Neo4j中执行。

关键点

  • • 向量搜索:如果仅提供描述,则完全依赖于Neo4j的向量存储。

  • • 结构化+向量结合:如果还提供了其他过滤条件,则它会构建一个Cypher查询,按向量相似性或供应能力排序。

  • • 鲁棒性:通过严格编码查询的构建方式,减少了错误并保持了清晰性。

集成LangChain和LangGraph

在本节中,我们将创建一个ReAct风格的智能体,该智能体使用LangGraph决定何时调用工具(如supplier-countsupplier-list)。LangChain用于管理LLM接口,而LangGraph定义了智能体的流程。

构建智能体

以下是帮助我们创建绑定到大型语言模型的工具的代码片段:

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage


llm = ChatOpenAI(model='gpt-4-turbo')
tools = [supplier_count, supplier_list]
llm_with_tools = llm.bind_tools(tools)
sys_msg = SystemMessage(content="You are a helpful assistant tasked with finding and explaining relevant information about Supply chain")
  • • llm_with_tools:将ChatOpenAI模型绑定到工具(supplier_countsupplier_list),使LLM能够在操作期间根据需要调用这些工具。

  • • sys_msg:为智能体设置初始系统消息,指示其角色。

使用LangGraph定义流程

我们定义了两个节点:

  1. 1. assistant:使用LLM解析消息并决定是否需要调用工具。

  2. 2. tools:执行任何工具请求。

  3. 3. 条件边缘检查LLM的最后一条消息是否包含工具调用。如果是,则运行工具;如果不是,则终止。

from langgraph.graph import StateGraph, START, MessagesState
from langgraph.prebuilt import tools_condition, ToolNode
from IPython.display import Image, display

def assistant(state: MessagesState):
   return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}


builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# Define edges: 
builder.add_edge(START, "assistant")
# If there's a tool call, go to 'tools'; else finish
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
react_graph = builder.compile()
display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))
使用上述代码创建的ReAct智能体的示意图

演示与测试

我们将向智能体发送各种查询,并观察其如何决定调用工具(supplier-countsupplier-list)。

统计供应商

当LLM识别到需要基于给定过滤条件统计供应商数量时,会调用supplier-count工具。然后将结果返回给用户。

from langchain_core.messages import HumanMessage

messages = [
    HumanMessage(
        content="How many suppliers have supply capacity more than 20000 and is located in Oslo?"
    )
]
result = react_graph.invoke({"messages": messages})
for m in result["messages"]:
    m.pretty_print()
使用上述代码的输出截图
  1. 1. LLM识别到需要统计供应商数量 → 调用supplier-count

  2. 2. 工具返回供应商数量。

  3. 3. LLM将此信息传达给用户。

列出供应商

当查询要求列出供应能力高于某一阈值的供应商时,会调用supplier-list工具。智能体处理请求并检索相关供应商。

messages = [
    HumanMessage(
        content="What are the suppliers having capacity above 40000?"
    )
]
result = react_graph.invoke({"messages": messages})
for m in result["messages"]:
    m.pretty_print()
使用上述代码的输出截图

在这里,LLM调用supplier-list以检索供应能力高于40,000的供应商。

组合查询

如果您提供的查询同时包含描述和数值过滤条件,智能体将结合向量相似性和Neo4j中的结构化查询:

messages = [
    HumanMessage(
        content="Find suppliers that deal with steel and have at least 20000 supply capacity."
    )
]
result = react_graph.invoke({"messages": messages})
for m in result["messages"]:
    m.pretty_print()

工具将为供应能力>=20000构建一个WHERE子句,同时使用描述中包含“dealing with steel”的向量属性进行语义匹配。

使用上述代码的输出截图

结论

通过超越仅基于嵌入的查询,并结合知识图谱和结构化工具,我们解锁了强大的协同效应:

  • • 精确性:数值过滤、计数和分组由Neo4j的Cypher查询完美处理。

  • • 语义相关性:文本嵌入在语义匹配描述方面仍然表现出色。

  • • 稳定性:我们将基于生成的查询创建降至最低,而是依赖确定性的函数调用。这种方法大大减少了生产中的错误。

  • • 可扩展性:随着模式的增长,您可以通过更多的专业工具扩展您的工具集。LLM负责协调这些工具,而不是生成复杂的查询。

您刚刚构建了一个功能齐全的RAG系统,将结构化和非结构化数据结合到一个管道中,确保两全其美——通过文本嵌入实现语义搜索,并通过Neo4j的知识图谱实现强大、精确的查询。

本代码中使用的框架

LangChain:https://python.langchain.com/docs/introduction/

LangGraph:https://www.langchain.com/langgraph

欢迎根据您的实际用例扩展这些想法:添加更多用于高级分析的工具,集成额外的向量索引,或应用复杂的关系遍历(例如供应链路径分析)。可能性是无限的,而现在您已经掌握了构建可靠、生产级RAG应用程序的蓝图。



学习资源

若要了解更多知识图谱或neo4j图数据库相关教学,你可以查看公众号的其他文章:


活水智能成立于北京,致力于通过AI教育、AI软件和社群提高知识工作者生产力。中国AIGC产业联盟理事。

活水现有AI线下工作坊等10+门课程,15+AI软件上线,多款产品在研。知识星球中拥有2600+成员,涵盖大厂程序员、公司高管、律师等各类知识工作者。在北、上、广、深、杭、重庆等地均有线下组织。

欢迎加入我们的福利群,每周都有一手信息、优惠券发放、优秀同学心得分享,还有赠书活动~

??????

 点击阅读原文,立即入群


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询