微信扫码
添加专属顾问
我要投稿
本文介绍LightRAG文档插入的实现逻辑。通过本文的分析,可以对文档插入的实现过程有一个比较清晰的理解。可以根据本文的分析,进一步分析文档插入的细节。
下面的代码是使用LightRAG的最简单的流程。打开一个文本文件,插入到LightRAG中,然后对文档进行对话。
WORKING_DIR = "./dickens"
if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
# 构建LightRAG对象
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=gpt_4o_mini_complete, # Use gpt_4o_mini_complete LLM model
# llm_model_func=gpt_4o_complete # Optionally, use a stronger model
)
# 打开文档,并插入到LightRAG中
with open("./dickens/book.txt", "r", encoding="utf-8") as f:
rag.insert(f.read())
# 执行原始的本地查询
print(
rag.query("What are the top themes in this story?", param=QueryParam(mode="naive"))
)
文档插入的实现函数如下:
class LightRAG:
def insert(self, string_or_strings):
loop = always_get_an_event_loop()
return loop.run_until_complete(self.ainsert(string_or_strings))
该函数总体实现逻辑:
(1)为每个文档生成唯一ID并创建文档字典,移除首尾空白
(2)检查哪些文档是最新的,只保留最新的文档,若没有最新的文档,则告警并返回
(3)对文件内容进行分块,并检查哪些分块是最新的,若没有最新的分块,则直接返回;
(4)将分块保存到分块存储字典中,并将分块向量化后保存到向量数据库中
(5)在所有分块中提取实体和关系,若没有提取到最新的实体和关系,则直接返回。
(6)将实体和实体的关系保存到图数据库中。
async def ainsert(self, string_or_strings):
# 定义异步插入方法,接收字符串或字符串列表参数
update_storage = False
# 初始化存储更新标志为False
try:
if isinstance(string_or_strings, str):
string_or_strings = [string_or_strings]
# 如果输入是单个字符串,转换为列表形式
new_docs = {
compute_mdhash_id(c.strip(), prefix="doc-"): {"content": c.strip()}
for c in string_or_strings
}
# 为每个文档生成唯一ID并创建文档字典,移除首尾空白
_add_doc_keys = await self.full_docs.filter_keys(list(new_docs.keys()))
# 检查哪些文档ID是新的(未存储过的)
new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
# 只保留新文档
if not len(new_docs):
logger.warning("All docs are already in the storage")
return
# 如果没有新文档,记录警告并返回
update_storage = True
logger.info(f"[New Docs] inserting {len(new_docs)} docs")
# 设置更新标志,记录新文档数量
inserting_chunks = {}
# 初始化分块字典
for doc_key, doc in tqdm_async(new_docs.items(), desc="Chunking documents", unit="doc"):
# 遍历每个文档,显示进度条
chunks = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": doc_key,
}
for dp in chunking_by_token_size(
doc["content"],
overlap_token_size=self.chunk_overlap_token_size,
max_token_size=self.chunk_token_size,
tiktoken_model=self.tiktoken_model_name,
)
}
# 对文档内容进行分块,为每个分块生成ID,并关联原始文档ID
inserting_chunks.update(chunks)
# 将当前文档的分块添加到总分块字典中
_add_chunk_keys = await self.text_chunks.filter_keys(list(inserting_chunks.keys()))
# 检查哪些分块ID是新的
inserting_chunks = {k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys}
# 只保留新分块
if not len(inserting_chunks):
logger.warning("All chunks are already in the storage")
return
# 如果没有新分块,记录警告并返回
logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks")
# 记录新分块数量
await self.chunks_vdb.upsert(inserting_chunks)
# 将分块存入向量数据库
logger.info("[Entity Extraction]...")
maybe_new_kg = await extract_entities(
inserting_chunks,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
global_config=asdict(self),
)
# 从分块中提取实体和关系
if maybe_new_kg is None:
logger.warning("No new entities and relationships found")
return
# 如果没有找到新实体和关系,记录警告并返回
self.chunk_entity_relation_graph = maybe_new_kg
# 更新知识图谱
await self.full_docs.upsert(new_docs)
await self.text_chunks.upsert(inserting_chunks)
# 将完整文档和分块存入各自的存储
finally:
if update_storage:
await self._insert_done()
# 如果进行了存储更新,执行清理工作
本文分析了LightRAG的文档插入过程的总体实现过程。通过本文的分析可知,LightRAG首先会对文档进行切割并计算嵌入向量,除此之外,还会从文档中提取实体和实体之间的关系。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-03-10
博查正式发布语义排序模型(bocha-semantic-reranker)
2025-03-10
AI比人类更需要搜索引擎,我们专门为AI设计了一个搜索引擎
2025-03-10
超越 RAG:Memobase 为 AI 应用注入长期记忆
2025-03-10
【AI落地应用实战】RAGFlow + 知识图谱 + Deepseek 初步探索
2025-03-10
milvus lite快速实践-了解RAG落地背后的机制
2025-03-09
为什么RAG系统要拥抱向量检索?揭示关键字检索的致命弱点!
2025-03-09
不要盲目再使用DeepSeek R1和QWQ这些推理模型做RAG了
2025-03-07
r1-reasoning-rag:一种新的 RAG 思路
2024-09-04
2024-10-27
2024-07-18
2024-05-05
2024-06-20
2024-06-13
2024-07-09
2024-07-09
2024-05-19
2024-07-07
2025-03-05
2025-03-03
2025-03-02
2025-02-28
2025-02-24
2025-02-23
2025-02-15
2025-02-12