微信扫码
与创始人交个朋友
我要投稿
本文主要介绍了Dify中高质量索引模式时,如何通过线程池执行器来处理chunk的过程。源码位置:dify\api\core\indexing_runner.py\IndexingRunner._load
。核心思想:假设一个数据集中有一个文档,该文档可以拆分为12个段(segment)。如果chunk_size=10,那么分为2批提交给线程池执行器进行处理。
这段代码的目的是通过多线程并发处理文档集合中的每个块,提高处理效率。它创建了一个包含最多10个线程的线程池,并将文档集合按块拆分后提交给线程池执行器处理。最终,它收集所有任务的结果并累加到 tokens
变量中。这种方式可以显著加快大规模文档集合的处理速度。
if dataset.indexing_technique == 'high_quality':
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:# 线程池执行器
futures = []
for i in range(0, len(documents), chunk_size):# 遍历文档
chunk_documents = documents[i:i + chunk_size]# 块文档
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))# 提交任务
for future in futures:# 遍历futures
tokens += future.result()# 令牌
这段代码是用来并行处理文档集合的一部分。它使用了Python的 concurrent.futures
模块来创建一个线程池执行器,以便在多个线程中并发执行任务。下面是详细解释每一行代码的作用:
if dataset.indexing_technique == 'high_quality':
检查数据集的索引技术是否为 "high_quality"。只有在这种情况下,下面的并行处理代码才会被执行。
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
使用 ThreadPoolExecutor
创建一个包含最多10个线程的线程池执行器。max_workers=10
表示线程池中最多可以有10个并发线程。
futures = []
用于存储每个提交的任务的 future 对象。
for i in range(0, len(documents), chunk_size):
通过步长 chunk_size
遍历文档集合 documents
。i
是起始索引。
chunk_documents = documents[i:i + chunk_size]
从文档集合中提取一块文档,这块文档的大小为 chunk_size
。这部分文档会被单独处理。
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))
使用 executor.submit()
方法提交一个任务给线程池执行器。每个任务调用 self._process_chunk
方法,并传入一系列参数。返回 future 对象会被添加到 futures
列表中。传递给 _process_chunk
参数包括:
for future in futures:
遍历所有已提交任务的 future 对象。
tokens += future.result()
调用 future.result()
方法获取任务的结果,并将结果累加到 tokens
变量中。future.result()
会阻塞当前线程,直到任务完成并返回结果。
这段代码的目的是在处理文档块时,计算文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。它首先检查文档是否处于暂停状态,然后计算 tokens 数量。如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则会进行 tokens 计算。随后,加载文档块的索引,并更新相关文档段的状态,最后将所有更改提交到数据库并返回 tokens 数量。整个过程在 Flask 应用的上下文中运行,以确保能够访问和操作数据库。
def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance):# 处理块
with flask_app.app_context():
# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)
tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量
# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)
document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段
db.session.commit()# 提交
return tokens
这段代码定义了一个名为 _process_chunk
的方法,用于处理文档集合的一个块。该方法在 Flask 应用的上下文中运行,计算块中文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。以下是详细的代码解释:
def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance):# 处理块
定义一个名为 _process_chunk
的方法。
参数:
self
:类实例的引用。
flask_app
:Flask 应用对象。
index_processor
:索引处理器对象。
chunk_documents
:要处理的文档块。
dataset
:数据集对象。
dataset_document
:数据集中的文档对象。
embedding_model_instance
:嵌入模型实例。
embedding_model_type_instance
:嵌入模型类型实例。
with flask_app.app_context():
在 Flask 应用上下文中运行代码。这使得代码可以访问 Flask 的应用配置和数据库连接。
# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)
调用 _check_document_paused_status
方法,检查 dataset_document.id
是否处于暂停状态。
tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量
初始化 tokens
变量为 0。
如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则计算块中文档的 tokens 数量。
使用 embedding_model_type_instance.get_num_tokens
方法获取每个文档的 tokens 数量,并累加到 tokens
变量中。
# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)
调用 index_processor.load
方法,加载数据集和块文档的索引,with_keywords=False
表示不使用关键字。
document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id
从文档的元数据中提取 doc_id
,并生成一个文档 ID 列表。
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段
查询条件:查找 DocumentSegment
表中 document_id
为 dataset_document.id
,index_node_id
在 document_ids
列表中的记录,且状态为 "indexing" 的记录。
更新字段:
DocumentSegment.status
:更新状态为 "completed"。
DocumentSegment.enabled
:设置为 True
。
DocumentSegment.completed_at
:设置完成时间为当前 UTC 时间。
db.session.commit()# 提交
将所有更改提交到数据库。
return tokens
返回累加后的 tokens 数量。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-01-17
从 Dify 到 Rill-Flow:大模型应用平台的进化之路
2025-01-13
前后端源码部署:Dify v0.15.0 升级 v1.0.0-beta.1 的尝试
2025-01-11
Dify v1.0.0-beta:插件开启公测
2025-01-07
Dify v0.15.0:全新父子检索策略 - 更精准,更全面的知识检索
2024-12-27
【场景驱动】企业的哪些重复性任务,最适合用Coze循环节点来解决?——慢慢学AI146
2024-12-24
Coze,Dify,FastGPT,哪个更强?全方位对比分析来了!
2024-12-19
打开日本市场背后,Dify 是怎么做 AI 全球化的?
2024-12-15
有了 NewAPI 之后,Dify 的可玩儿性又高了
2024-04-25
2024-04-24
2024-07-16
2024-07-20
2024-05-08
2024-05-07
2024-05-09
2024-06-21
2024-12-24
2024-04-25