AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


基于知识图谱和文档树增强的RAG实验记录
发布日期:2024-07-22 08:46:47 浏览次数: 1844


引言
DRAGON BOAT FESTIVAL

本篇是近期看了一些大模型相关的资料,也跑了很多开源方案,结合一个比较有趣的切入点,做的一个实验记录。

前置环境

unsetunsetopenai keyunsetunset

从非本地化部署llm模型测试,有一个openai key似乎必不可少,不管是作为benchmark的计量单位,或考虑中英文的输入输出,都相对更加方便,不过自3月新用户注册不再赠送5美金之后,走正常请求的方式就不再友好,所以我这里选用了中转的方式,具体调用逻辑如下:

目前使用情况来讲,除了扣费很快,速度与直连没什么区别,扣费规则各家都不一样,这里不好做评价。当然,国内有非常优秀的平替产品,如文心千帆、通义千问和智谱AI,我使用最多的是智谱AI,其次通义千问,因为智谱有封装好的SDK能直接调用,通义千问的modelscope接口同样能很快捷的使用embedding和chat model,这是给我观感最好的,其次才是准确性问题。


unsetunsetneo4junsetunset

关于图数据库,直接docker pull neo4j后的镜像为官方镜像,但启动后用langchain的neo4j接口去连会报错为Could not use APOC procedures。APOC是一个为Neo4j提供额外的过程和功能的插件,它扩展了Cypher查询语言的能力,而官方镜像并没有安装该插件,所以需要在启动后的极简容器里手动安装,但我尝试后发现这并不是一个很快能解决的过程,除了权限问题外,还有各种依赖,于是就去dockerhub找到了如下镜像:

该镜像启动后是有APOC的,然后还需要修改neo4j.conf文件中找到dbms.security.procedures.unrestricted配置项,即:

dbms.security.procedures.unrestricted=apoc.export.file,apoc.import.file
修改为
dbms.security.procedures.unrestricted=apoc.export.file,apoc.import.file,apoc.meta.data,...

正常来讲,重启后就可以直连了。并且都是带界面的:

unsetunsetpython包unsetunset

pip install -U langchain umap-learn scikit-learn langchain_community tiktoken langchain-openai langchainhub chromadb langchain-anthropic langchain_experimental wikipedia 


实验过程

本次实验是根据Wikipedia提取到的内容进行抽取数据元组,将其存入图数据库中,以备后续的RAG综合调用,整个过程可看成如下图:

所以下面开始准备数据源。

unsetunset图数据制作unsetunset

在大模型之前,知识图谱构建是挺繁琐的一个过程,虽然可能结果就是一个层层交叉的三元组,但从一段文字需要经历的过程可从如下图表示:

在构建本体的时候我们一定要接受本体是变化的,就像数据库本身的表结构也可能会更新,所以设计之初就需要考虑鲁棒性和扩展性,而在大模型时代,不管是zero-shot还是few-shot的大模型,我记得在去年2023年的时候,论文里的对比实验就已经超越了如实体抽取等算法,那简单构建一个KG不再是困难的一件事,但大模型短期内还无法处理长文本或整个图谱,所以图谱的存储是一个很重要的方向。能预测到它和向量数据库一样,会成为未来大模型生态圈里一个非常重要的组件,从上层应用角度,可以自由选择是否启用。

这里,我选用了langchain官方文档中提到的Diffbot(Diffbot | ?️? LangChain) 感觉准确率不错,而且用起来相对顺手:

from langchain_community.document_loaders import WikipediaLoader
from langchain_experimental.graph_transformers.diffbot import DiffbotGraphTransformer


query = "Jackie Chan"
raw_documents = WikipediaLoader(query=query).load()

diffbot_api_key = "Diffbot-token"
os.environ["DIFFBOT_API_KEY"] = diffbot_api_key
diffbot_nlp = DiffbotGraphTransformer(diffbot_api_key=diffbot_api_key)

graph_documents = diffbot_nlp.convert_to_graph_documents(raw_documents)

它分免费版和商用版,如果选择商用,价格比较贵, 平替的策略也有,比如LLMGraphTransformer, 其利用openai的抽取能力,源码中用了两段prompt为unstructured_promptsystem_prompt,两者的构建都非常有意思,这里引用出前者的代码:

        "You are a top-tier algorithm designed for extracting information in "
        "structured formats to build a knowledge graph. Your task is to identify "
        "the entities and relations requested with the user prompt from a given "
        "text. You must generate the output in a JSON format containing a list "
        'with JSON objects. Each object should have the keys: "head", '
        '"head_type", "relation", "tail", and "tail_type". The "head" '
        "key must contain the text of the extracted entity with one of the types "
        "from the provided list in the user prompt.",
        f'The "head_type" key must contain the type of the extracted head entity, '
        f"which must be one of the types from {node_labels_str}."
        if node_labels
        else "",
        f'The "relation" key must contain the type of relation between the "head" '
        f'and the "tail", which must be one of the relations from {rel_types_str}.'
        if rel_types
        else "",
        f'The "tail" key must represent the text of an extracted entity which is '
        f'the tail of the relation, and the "tail_type" key must contain the type '
        f"of the tail entity from {node_labels_str}."
        if node_labels
        else "",
        "Attempt to extract as many entities and relations as you can. Maintain "
        "Entity Consistency: When extracting entities, it's vital to ensure "
        'consistency. If an entity, such as "John Doe", is mentioned multiple '
        "times in the text but is referred to by different names or pronouns "
        '(e.g., "Joe", "he"), always use the most complete identifier for '
        "that entity. The knowledge graph should be coherent and easily "
        "understandable, so maintaining consistency in entity references is "
        "crucial.",
        "IMPORTANT NOTES:\n- Don't add any explanation and text.",

抽取完成后,进行入库:

# connect to our neo4j database
from langchain_community.graphs import Neo4jGraph

url = ""
username = ""
password = ""

graph = Neo4jGraph(url=url, username=username, password=password)

graph.add_graph_documents(graph_documents)

------------------------------------visual code-----------------------------------------
default_cypher = "MATCH (s)-[r:!MENTIONS]->(t) RETURN s,r,t LIMIT 50"

def showGraph(cypher: str = default_cypher):
    driver = GraphDatabase.driver(
        uri = url,
        auth = (username,password))
    session = driver.session()
    widget = GraphWidget(graph = session.run(cypher).graph())
    widget.node_label_mapping = 'id'
    display(widget)
    return widget

showGraph()

入库后,可根据需要,在当前单元格内直接对数据库内容进行可视化,我这里可视化了之前周杰伦的wiki词条,不过之后都是成龙,因为在我将前者数据丢给智谱ai的时候,竟然发现有很多违禁词,emmm,不知道哪些。


unsetunset构建文档树unsetunset

?** 论文名**《RAPTOR Recursive Abstractive Processing for Tree-Organized Retrieval》
?** 地址**https://arxiv.org/pdf/2401.18059
⛳** Official repo**https://github.com/parthsarthi03/raptor

RAPTOR(Recursive Abstractive Processing for Tree-Organized Retrieval)是一种创建新的检索增强型语言模型,它通过嵌入、聚类和摘要文本模块来构建一个从底层到高层具有不同摘要层的树状结构。这种方法允許模型在推理时从这棵树中检索信息,实现跨文本的不同抽象层的整合。RAPTOR的相关性创新在于它构建了文本摘要的方法,以不同尺度检索上下文的能力,并在多个任务上展示超越传统检索增强语言模型的性能。它主要做如下七步:

  1. 文本分割
  2. 文本向量表示
  3. 文本聚类
  4. 文本摘要
  5. 创建树节点
  6. 递归分聚类以及摘要
  7. 文档检索

以下为具体代码:

from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import umap
from sklearn.mixture import GaussianMixture

RANDOM_SEED = 224  # 固定种子

# 全局聚类嵌入
def global_cluster_embeddings(
    embeddings: np.ndarray,
    dim: int,
    n_neighbors: Optional[int] = None,
    metric: str = "cosine",
)
 -> np.ndarray:

    """
    使用UMAP对嵌入进行全局降维处理。
    参数:
    - embeddings: 输入嵌入,形式为numpy数组。
    - dim: 降维后的目标维度。
    - n_neighbors: 可选;考虑每个点的邻居数量。
                   如果不提供,默认为嵌入数量的平方根。
    - metric: 使用UMAP的距离度量。
    返回:
    - 降维到指定维度的嵌入的numpy数组。
    """

    if n_neighbors is None:
        n_neighbors = int((len(embeddings) - 1) ** 0.5)
    return umap.UMAP(
        n_neighbors=n_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)

# 局部聚类嵌入
def local_cluster_embeddings(
    embeddings: np.ndarray, dim: int, num_neighbors: int = 10, metric: str = "cosine"
)
 -> np.ndarray:

    """
    使用UMAP对嵌入进行局部降维处理,通常在全局聚类之后进行。
    参数:
    - embeddings: 输入嵌入,形式为numpy数组。
    - dim: 降维后的目标维度。
    - num_neighbors: 考虑每个点的邻居数量。
    - metric: 使用UMAP的距离度量。
    返回:
    - 降维到指定维度的嵌入的numpy数组。
    """

    return umap.UMAP(
        n_neighbors=num_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)

# 确定最佳聚类数
def get_optimal_clusters(
    embeddings: np.ndarray, max_clusters: int = 50, random_state: int = RANDOM_SEED
)
 -> int:

    """
    使用高斯混合模型(Gaussian Mixture Model)和贝叶斯信息准则(Bayesian Information Criterion, BIC)确定最佳聚类数。
    参数:
    - embeddings: 输入嵌入,形式为numpy数组。
    - max_clusters: 考虑的最大聚类数。
    - random_state: 种子,用于可重复性。
    返回:
    - 找到的最佳聚类数的整数表示。
    """

    max_clusters = min(max_clusters, len(embeddings))
    n_clusters = np.arange(1, max_clusters)
    bics = []
    for n in n_clusters:
        gm = GaussianMixture(n_components=n, random_state=random_state)
        gm.fit(embeddings)
        bics.append(gm.bic(embeddings))
    return n_clusters[np.argmin(bics)]

# 嵌入文本
def embed(texts):
    """
    此函数假设存在一个名为 `embd` 的对象,该对象具有一个名为 `embed_documents` 的方法,该方法接受文本列表并返回它们的嵌入。
    参数:
    - texts: List[str],要嵌入的文本列表。
    返回:
    - numpy.ndarray: 给定文本文档的嵌入数组。
    """

    text_embeddings = embd.embed_documents(texts)
    text_embeddings_np = np.array(text_embeddings)
    return text_embeddings_np
 
def embed_cluster_texts(texts):
    """
    将文本列表嵌入并聚类,返回一个包含文本、嵌入向量和聚类标签的DataFrame。此函数将嵌入生成和聚类合并为一个步骤。它假设已定义一个先前的 `perform_clustering` 函数,该函数对嵌入执行聚类。
    参数:
    - texts: List[str],要处理的文本文档列表。
    返回:
    - pandas.DataFrame: 包含原始文本、它们的嵌入向量和分配的聚类标签的DataFrame。
    """

    text_embeddings_np = embed(texts)  # 生成嵌入向量
    cluster_labels = perform_clustering(
        text_embeddings_np, 100.1
    )  # 对嵌入向量执行聚类
    df = pd.DataFrame()  # 初始化DataFrame以存储结果
    df["text"] = texts  # 存储原始文本
    df["embd"] = list(text_embeddings_np)  # 将嵌入向量作为列表存储在DataFrame中
    df["cluster"] = cluster_labels  # 存储聚类标签
    return df
 
def fmt_txt(df: pd.DataFrame) -> str:
    unique_txt = df["text"].tolist()
    return "--- --- \n --- --- ".join(unique_txt)
 
def embed_cluster_summarize_texts(
    texts: List[str], level: int
)
 -> Tuple[pd.DataFrame, pd.DataFrame]:

    """
    嵌入、聚类并总结文本列表。此函数首先为文本生成嵌入向量,根据相似性对它们进行聚类,扩展聚类分配以便于处理,然后总结每个聚类中的内容。
    参数:
    - texts: 要处理的文本文档列表。
    - level: 一个整数参数,可能定义处理的深度或细节。
    返回:
    - 包含两个DataFrame的元组:
      1. 第一个DataFrame(`df_clusters`)包括原始文本、它们的嵌入向量和聚类分配。
      2. 第二个DataFrame(`df_summary`)包含每个聚类的摘要、指定的详细程度和聚类标识符。
    """

    # 嵌入和聚类文本,结果是一个包含'text'、'embd'和'cluster'列的DataFrame
    df_clusters = embed_cluster_texts(texts)
    # 准备扩展DataFrame以便于更简单地处理聚类
    expanded_list = []

    # 将DataFrame条目扩展为文档-聚类配对,以便直接处理
    for index, row in df_clusters.iterrows():
        for cluster in row["cluster"]:
            expanded_list.append(
                {"text": row["text"], "embd": row["embd"], "cluster": cluster}
            )

    # 从扩展列表创建新的DataFrame
    expanded_df = pd.DataFrame(expanded_list)

    # 检索用于处理的唯一聚类标识符
    all_clusters = expanded_df["cluster"].unique()

    template = """Here is a sub-set of LangChain Expression Langauge doc. 
    
    LangChain Expression Langauge provides a way to compose chain in LangChain.
    
    Give a detailed summary of the documentation provided.
    
    Documentation:
    {context}
    """

    template_length = len(template)
    # 假设ChatPromptTemplate和StrOutputParser等是已经定义好的类或函数
    prompt = ChatPromptTemplate.from_template(template)
    chain = prompt | model | StrOutputParser()
    # 为每个聚类格式化文本以进行总结
    summaries = []
    for i in all_clusters:
        df_cluster = expanded_df[expanded_df["cluster"] == i]
        formatted_txt = fmt_txt(df_cluster)
        summaries.append(chain.invoke({"context": formatted_txt}))
    # 创建一个DataFrame来存储摘要,以及它们对应的聚类和级别
    df_summary = pd.DataFrame(
        {
            "summaries": summaries,
            "level": [level] * len(summaries),
            "cluster": list(all_clusters),
        }
    )

    return df_clusters, df_summary

def recursive_embed_cluster_summarize(
    texts: List[str], level: int = 1, n_levels: int = 3
)
 -> Dict[int, Tuple[pd.DataFrame, pd.DataFrame]]:

    """
    Recursively embeds, clusters, and summarizes texts up to a specified level or until
    the number of unique clusters becomes 1, storing the results at each level.
    Parameters:
    - texts: List[str], texts to be processed.
    - level: int, current recursion level (starts at 1).
    - n_levels: int, maximum depth of recursion.
    Returns:
    - Dict[int, Tuple[pd.DataFrame, pd.DataFrame]], a dictionary where keys are the recursion
      levels and values are tuples containing the clusters DataFrame and summaries DataFrame at that level.
    """

    results = {}  # Dictionary to store results at each level

    # Perform embedding, clustering, and summarization for the current level
    df_clusters, df_summary = embed_cluster_summarize_texts(texts, level)

    # Store the results of the current level
    results[level] = (df_clusters, df_summary)

    # Determine if further recursion is possible and meaningful
    unique_clusters = df_summary["cluster"].nunique()
    if level < n_levels and unique_clusters > 1:
        # Use summaries as the input texts for the next level of recursion
        new_texts = df_summary["summaries"].tolist()
        next_level_results = recursive_embed_cluster_summarize(
            new_texts, level + 1, n_levels
        )
        # Merge the results from the next level into the current results dictionary
        results.update(next_level_results)

    return results

进行调用:

# Build document tree
doc_text = [d.page_content for d in raw_documents]
leaf_texts = doc_text
results = recursive_embed_cluster_summarize(leaf_texts, level=1, n_levels=3)
results[2]


------------------------------------print-----------------------------------------

(                                                text  \
 0   dent\n\nThe Jackie Chan Stunt Team, also know...   
 1   Jackie Chan Adventures is an animated televis...   
 2   The provided documentation is about Jaycee Ch...   
 3   earch for Lin, who has been taken to the Towe...   
 4   The provided documentation is for a film call...   
 
                                                 embd cluster  
 0  [-0.046987526-0.020250408-0.0124886910.0...     [0]  
 1  [-0.057788752-0.030920357-0.0472585070.0...     [0]  
 2  [-0.057758134-0.0341271-0.06603754-0.021...     [0]  
 3  [-0.014306396-0.045166010.028220890.0230...     [0]  
 4  [-0.022697797-0.031102212-0.0413122660.0...     [0]  ,
                                            summaries  level  cluster
 0   The provided documentation contains informati...      2        0)


unsetunsetRAG chain + RAPTORunsetunset

输出成龙(Jackie chan)的事业成就:

from langchain import hub
from langchain_core.runnables import RunnablePassthrough

# Prompt
prompt = hub.pull("rlm/rag-prompt")


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# Question
originalAns_achievement = rag_chain.invoke("Tell me about Jackie Chan's career achievements.")

------------------------------------print-----------------------------------------

 Jackie Chan is a renowned Hong Kong actor, director, writer, producer, martial artist, and stuntman known for his slapstick acrobatic fighting style, comic timing, and innovative stunts. He has starred in over 150 films and is one of the most influential action film stars of all time. His popular films include Snake in the Eagle's Shadow, Drunken Master, Police Story, and Rush Hour, among others. Chan has also had a successful Hollywood career with films like Shanghai Noon and Shanghai Knights. Additionally, he has released over 20 albums and sung over 100 songs in five languages. Chan has received various awards and honors for his work and has had a significant impact on the film industry.

谁和Jackie chan一起工作?Joe Hisaishi和其是同事嘛?

originalAns_colleagues = rag_chain.invoke("Who work with Jackie Chan?")
rag_chain.invoke("Is Joe Hisaishi colleague of Jackie Chan?")
for i in range(len(doc_text)):
    if "Hisaishi" in doc_text[i]:
        print(i)

------------------------------------print-----------------------------------------


unsetunsetRAG chain + KGunsetunset

输出当前graph结构:

graph.schema

------------------------------------print-----------------------------------------
Node properties:
Person {id: STRING, name: STRING, dateOfBirth: STRING, positionHeld: STRING, age: STRING, academicDegree: STRING, dateOfDeath: STRING, causeOfDeath: STRING}
Location {id: STRING, name: STRING}
Organization {id: STRING, name: STRING, foundingDate: STRING}
Skill {id: STRING, name: STRING}
Money {id: STRING, name: STRING}
Award {id: STRING, name: STRING}
Relationship properties:
PLACE_OF_BIRTH {evidence: STRING}
PERSON_LOCATION {evidence: STRING, isCurrent: STRING, startTime: STRING, isNotCurrent: STRING}
...... # 省略

这里可以使用自定义查询方式:

entity_chain = prompt | llm.with_structured_output(Entities)
# with_structured_output:新版langchain方法,返回格式化为与给定架构匹配的输出的模型包装器。->  https://blog.langchain.dev/tool-calling-with-langchain/

# Fulltext index query
def structured_retriever(question: str) -> str:
    result = ""
    entities = entity_chain.invoke({"question": question})
    for entity in entities.names:
        response = graph.query(
            """CALL db.index.fulltext.queryNodes('entity', $query, {limit:2})
            YIELD node,score
            CALL {
              WITH node
              MATCH (node)-[r:!MENTIONS]->(neighbor)
              RETURN node.id + ' - ' + type(r) + ' -> ' + neighbor.id AS output
              UNION ALL
              WITH node
              MATCH (node)<-[r:!MENTIONS]-(neighbor)
              RETURN neighbor.id + ' - ' + type(r) + ' -> ' +  node.id AS output
            }
            RETURN output LIMIT 50
            """
,
            {"query": generate_full_text_query(entity)},
        )
        result += "\n".join([el['output'for el in response])
    return result

基于此调用:

structured_retriever("Who is jackie chan?")

------------------------------------print-----------------------------------------
Jackie Chan - RULED -> Hong Kong
Jackie Chan - BELONGED_TO -> Golden Harvest
...
Jackie Chan - INFLUENCED -> Hollywood Action Films

但坏消息是,with_structured_output是一种tools call,不走openai中转方式的话,得再手动实现一下,从功能实现和官方文档中感觉实现还是比较简单的,或者直接再加一层prompt,让它按格式输出也行。

不过以上是想手动实现,其实整个大接口,langchain也对此做了封装,即GraphCypherQAChain

from langchain.chains import GraphCypherQAChain

chain = GraphCypherQAChain.from_llm(
    ZhipuAILLM or OpenAI
)

基于此调用:

result_kg_colleagues = chain("Who work with jackie chan?")

------------------------------------print-----------------------------------------
> Entering new GraphCypherQAChain chain...
Generated Cypher:
 MATCH (p:Person)-[:EMPLOYEE_OR_MEMBER_OF]->(o:Organization) WHERE p.name = 'Jackie Chan' RETURN o.name
Full Context:
[{'o.name''Communist Party of China'}, {'o.name'"Chinese People's Political Consultative Conference"}]

> Finished chain.
result_kg_colleagues

------------------------------------print-----------------------------------------
{'query''Who work with jackie chan?',
 'result'" I don't know the answer based on the provided information.",
 'intermediate_steps': [{'query'" MATCH (p:Person)-[:EMPLOYEE_OR_MEMBER_OF]->(o:Organization) WHERE p.name = 'Jackie Chan' RETURN o.name"},
  {'context': [{'o.name''Communist Party of China'},
    {'o.name'"Chinese People's Political Consultative Conference"}]}]}

然后结合RAG一起,做增强回答:

messages = [
    SystemMessage(
        content="You are a helpful assistant who generates information grounded with facts. Please enhance the original answer with complementary entity and relationship information from the knowledge graph to generate the final answer."
    ),
    HumanMessage(
        content= f"{originalAns_colleagues} + {result_kg_colleagues}"
    ),
]
final_ans_colleague = chat.invoke(messages)
final_ans_colleague

------------------------------------print-----------------------------------------
System: Based on the knowledge graph, Jackie Chan has worked with many people throughout his career, including actors, directors, and stuntmen. Some of his most notable collaborations include working with Chris Tucker in the Rush Hour series, and Jaycee Chan, his son, who is also a Chinese actor and singer. Additionally, Chan has starred in various films directed by himself, such as The Fearless Hyena and Project A. Throughout his career, Chan has also worked with many other actors, directors, and stuntmen in various films, including Police Story, Drunken Master, and Snake in the Eagle's Shadow. Jackie Chan has also been a member of the Communist Party of China and the Chinese People's Political Consultative Conference.

最后,如果想将RAG + KG + LLM + RAPTOR进行组合,当然是可以,最简单能:

chain = (
    RunnableParallel(
        {
            "context": _search_query | retriever,
            "question": RunnablePassthrough(),
        }
    )
    | prompt
    | llm
    | StrOutputParser()
)

但我试了效果不好,系统大概率报System: I'm sorry, but the information you provided contains an error. Jackie Chan has not... ,不知道是我llm api选用问题,还是retriever改得有问题,另外search_query顺便加入了history chat,果然整得太复杂,想从demo级转向商业级还是需要从长计议,所以,本篇实验到此结束。


53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询