AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


LightRAG实现原理分析-文档的解析和插入
发布日期:2025-01-04 08:17:23 浏览次数: 1574 来源:大数据架构师修行之路


概述

本文介绍LightRAG文档插入的实现逻辑。通过本文的分析,可以对文档插入的实现过程有一个比较清晰的理解。可以根据本文的分析,进一步分析文档插入的细节。


文档查询的基本使用

下面的代码是使用LightRAG的最简单的流程。打开一个文本文件,插入到LightRAG中,然后对文档进行对话。

WORKING_DIR = "./dickens"

if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)
# 构建LightRAG对象    
rag = LightRAG(
    working_dir=WORKING_DIR,
    llm_model_func=gpt_4o_mini_complete,  # Use gpt_4o_mini_complete LLM model
    # llm_model_func=gpt_4o_complete  # Optionally, use a stronger model
)
# 打开文档,并插入到LightRAG中
with open("./dickens/book.txt""r"encoding="utf-8"as f:
    rag.insert(f.read())

# 执行原始的本地查询
print(
    rag.query("What are the top themes in this story?"param=QueryParam(mode="naive"))
)


文档插入过程实现分析

文档插入的实现函数如下:

class LightRAG:
    def insert(selfstring_or_strings):
        loop = always_get_an_event_loop()
        return loop.run_until_complete(self.ainsert(string_or_strings))


ainsert()函数实现分析

该函数总体实现逻辑:

(1)为每个文档生成唯一ID并创建文档字典,移除首尾空白

(2)检查哪些文档是最新的,只保留最新的文档,若没有最新的文档,则告警并返回

(3)对文件内容进行分块,并检查哪些分块是最新的,若没有最新的分块,则直接返回;

(4)将分块保存到分块存储字典中,并将分块向量化后保存到向量数据库中

(5)在所有分块中提取实体和关系,若没有提取到最新的实体和关系,则直接返回。

(6)将实体和实体的关系保存到图数据库中。


实现代码如下

async def ainsert(selfstring_or_strings):
    # 定义异步插入方法,接收字符串或字符串列表参数
    
    update_storage = False
    # 初始化存储更新标志为False
    
    try:
        if isinstance(string_or_stringsstr):
            string_or_strings = [string_or_strings]
        # 如果输入是单个字符串,转换为列表形式

        new_docs = {
            compute_mdhash_id(c.strip(), prefix="doc-"): {"content"c.strip()}
            for c in string_or_strings
        }
        # 为每个文档生成唯一ID并创建文档字典,移除首尾空白

        _add_doc_keys = await self.full_docs.filter_keys(list(new_docs.keys()))
        # 检查哪些文档ID是新的(未存储过的)

        new_docs = {kv for kv in new_docs.items() if k in _add_doc_keys}
        # 只保留新文档

        if not len(new_docs):
            logger.warning("All docs are already in the storage")
            return
        # 如果没有新文档,记录警告并返回

        update_storage = True
        logger.info(f"[New Docs] inserting {len(new_docs)} docs")
        # 设置更新标志,记录新文档数量

        inserting_chunks = {}
        # 初始化分块字典

        for doc_keydoc in tqdm_async(new_docs.items(), desc="Chunking documents"unit="doc"):
            # 遍历每个文档,显示进度条
            
            chunks = {
                compute_mdhash_id(dp["content"], prefix="chunk-"): {
                    **dp,
                    "full_doc_id"doc_key,
                }
                for dp in chunking_by_token_size(
                    doc["content"],
                    overlap_token_size=self.chunk_overlap_token_size,
                    max_token_size=self.chunk_token_size,
                    tiktoken_model=self.tiktoken_model_name,
                )
            }
            # 对文档内容进行分块,为每个分块生成ID,并关联原始文档ID

            inserting_chunks.update(chunks)
            # 将当前文档的分块添加到总分块字典中

        _add_chunk_keys = await self.text_chunks.filter_keys(list(inserting_chunks.keys()))
        # 检查哪些分块ID是新的

     inserting_chunks = {kv for kv in inserting_chunks.items() if k in _add_chunk_keys}
        # 只保留新分块

        if not len(inserting_chunks):
            logger.warning("All chunks are already in the storage")
            return
        # 如果没有新分块,记录警告并返回

        logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks")
        # 记录新分块数量

        await self.chunks_vdb.upsert(inserting_chunks)
        # 将分块存入向量数据库

        logger.info("[Entity Extraction]...")
        maybe_new_kg = await extract_entities(
            inserting_chunks,
            knowledge_graph_inst=self.chunk_entity_relation_graph,
            entity_vdb=self.entities_vdb,
            relationships_vdb=self.relationships_vdb,
            global_config=asdict(self),
        )
        # 从分块中提取实体和关系

        if maybe_new_kg is None:
            logger.warning("No new entities and relationships found")
            return
        # 如果没有找到新实体和关系,记录警告并返回

        self.chunk_entity_relation_graph = maybe_new_kg
        # 更新知识图谱

        await self.full_docs.upsert(new_docs)
        await self.text_chunks.upsert(inserting_chunks)
        # 将完整文档和分块存入各自的存储

    finally:
        if update_storage:
            await self._insert_done()
        # 如果进行了存储更新,执行清理工作


小结

本文分析了LightRAG的文档插入过程的总体实现过程。通过本文的分析可知,LightRAG首先会对文档进行切割并计算嵌入向量,除此之外,还会从文档中提取实体和实体之间的关系。


53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询