AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


Dify中高质量索引模式时,通过线程池处理chunk过程
发布日期:2024-07-14 00:47:54 浏览次数: 1893 来源:NLP工程化


本文主要介绍了Dify中高质量索引模式时,如何通过线程池执行器来处理chunk的过程。源码位置:dify\api\core\indexing_runner.py\IndexingRunner._load。核心思想:假设一个数据集中有一个文档,该文档可以拆分为12个段(segment)。如果chunk_size=10,那么分为2批提交给线程池执行器进行处理。

一.线程池处理chunk

1.方法处理过程

这段代码的目的是通过多线程并发处理文档集合中的每个块,提高处理效率。它创建了一个包含最多10个线程的线程池,并将文档集合按块拆分后提交给线程池执行器处理。最终,它收集所有任务的结果并累加到 tokens 变量中。这种方式可以显著加快大规模文档集合的处理速度。

if dataset.indexing_technique == 'high_quality':
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:# 线程池执行器
futures = []
for i in range(0, len(documents), chunk_size):# 遍历文档
chunk_documents = documents[i:i + chunk_size]# 块文档
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))# 提交任务

for future in futures:# 遍历futures
tokens += future.result()# 令牌

2.判断条件

这段代码是用来并行处理文档集合的一部分。它使用了Python的 concurrent.futures 模块来创建一个线程池执行器,以便在多个线程中并发执行任务。下面是详细解释每一行代码的作用:

if dataset.indexing_technique == 'high_quality':

检查数据集的索引技术是否为 "high_quality"。只有在这种情况下,下面的并行处理代码才会被执行。

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:

3.创建线程池执行器

使用 ThreadPoolExecutor 创建一个包含最多10个线程的线程池执行器。max_workers=10 表示线程池中最多可以有10个并发线程。

futures = []

4.初始化 futures 列表

用于存储每个提交的任务的 future 对象。

for i in range(0, len(documents), chunk_size):

5.遍历文档

通过步长 chunk_size 遍历文档集合 documentsi 是起始索引。

chunk_documents = documents[i:i + chunk_size]

6.块文档

从文档集合中提取一块文档,这块文档的大小为 chunk_size。这部分文档会被单独处理。

futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))

7.提交任务

使用 executor.submit() 方法提交一个任务给线程池执行器。每个任务调用 self._process_chunk 方法,并传入一系列参数。返回 future 对象会被添加到 futures 列表中。传递给 _process_chunk 参数包括:

for future in futures:

8.遍历 futures

遍历所有已提交任务的 future 对象。

tokens += future.result()

9.累加结果

调用 future.result() 方法获取任务的结果,并将结果累加到 tokens 变量中。future.result() 会阻塞当前线程,直到任务完成并返回结果。

二._process_chunk方法

1.方法处理过程

这段代码的目的是在处理文档块时,计算文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。它首先检查文档是否处于暂停状态,然后计算 tokens 数量。如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则会进行 tokens 计算。随后,加载文档块的索引,并更新相关文档段的状态,最后将所有更改提交到数据库并返回 tokens 数量。整个过程在 Flask 应用的上下文中运行,以确保能够访问和操作数据库。

def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance)
:
# 处理块
with flask_app.app_context():
# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)

tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量

# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)

document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段

db.session.commit()# 提交

return tokens

这段代码定义了一个名为 _process_chunk 的方法,用于处理文档集合的一个块。该方法在 Flask 应用的上下文中运行,计算块中文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。以下是详细的代码解释:

def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance)
:
# 处理块

2.方法定义

定义一个名为 _process_chunk 的方法。

  • 参数

    • self:类实例的引用。

    • flask_app:Flask 应用对象。

    • index_processor:索引处理器对象。

    • chunk_documents:要处理的文档块。

    • dataset:数据集对象。

    • dataset_document:数据集中的文档对象。

    • embedding_model_instance:嵌入模型实例。

    • embedding_model_type_instance:嵌入模型类型实例。

3.Flask 应用上下文

with flask_app.app_context():

在 Flask 应用上下文中运行代码。这使得代码可以访问 Flask 的应用配置和数据库连接。

4.检查文档是否暂停

# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)

5.计算 tokens 数量

调用 _check_document_paused_status 方法,检查 dataset_document.id 是否处于暂停状态。

tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量
  • 初始化 tokens 变量为 0。

  • 如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则计算块中文档的 tokens 数量。

  • 使用 embedding_model_type_instance.get_num_tokens 方法获取每个文档的 tokens 数量,并累加到 tokens 变量中。

6.加载索引

# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)

调用 index_processor.load 方法,加载数据集和块文档的索引,with_keywords=False 表示不使用关键字。

7.提取文档 ID

document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id

从文档的元数据中提取 doc_id,并生成一个文档 ID 列表。

8.更新文档段状态

db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段
  • 查询条件:查找 DocumentSegment 表中 document_iddataset_document.idindex_node_iddocument_ids 列表中的记录,且状态为 "indexing" 的记录。

  • 更新字段

    • DocumentSegment.status:更新状态为 "completed"。

    • DocumentSegment.enabled:设置为 True

    • DocumentSegment.completed_at:设置完成时间为当前 UTC 时间。

9.提交事务

db.session.commit()# 提交

将所有更改提交到数据库。

10.返回 tokens 数量

return tokens

返回累加后的 tokens 数量。



53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询