微信扫码
与创始人交个朋友
我要投稿
本文主要介绍了Dify中高质量索引模式时,如何通过线程池执行器来处理chunk的过程。源码位置:dify\api\core\indexing_runner.py\IndexingRunner._load
。核心思想:假设一个数据集中有一个文档,该文档可以拆分为12个段(segment)。如果chunk_size=10,那么分为2批提交给线程池执行器进行处理。
这段代码的目的是通过多线程并发处理文档集合中的每个块,提高处理效率。它创建了一个包含最多10个线程的线程池,并将文档集合按块拆分后提交给线程池执行器处理。最终,它收集所有任务的结果并累加到 tokens
变量中。这种方式可以显著加快大规模文档集合的处理速度。
if dataset.indexing_technique == 'high_quality':
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:# 线程池执行器
futures = []
for i in range(0, len(documents), chunk_size):# 遍历文档
chunk_documents = documents[i:i + chunk_size]# 块文档
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))# 提交任务
for future in futures:# 遍历futures
tokens += future.result()# 令牌
这段代码是用来并行处理文档集合的一部分。它使用了Python的 concurrent.futures
模块来创建一个线程池执行器,以便在多个线程中并发执行任务。下面是详细解释每一行代码的作用:
if dataset.indexing_technique == 'high_quality':
检查数据集的索引技术是否为 "high_quality"。只有在这种情况下,下面的并行处理代码才会被执行。
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
使用 ThreadPoolExecutor
创建一个包含最多10个线程的线程池执行器。max_workers=10
表示线程池中最多可以有10个并发线程。
futures = []
用于存储每个提交的任务的 future 对象。
for i in range(0, len(documents), chunk_size):
通过步长 chunk_size
遍历文档集合 documents
。i
是起始索引。
chunk_documents = documents[i:i + chunk_size]
从文档集合中提取一块文档,这块文档的大小为 chunk_size
。这部分文档会被单独处理。
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))
使用 executor.submit()
方法提交一个任务给线程池执行器。每个任务调用 self._process_chunk
方法,并传入一系列参数。返回 future 对象会被添加到 futures
列表中。传递给 _process_chunk
参数包括:
for future in futures:
遍历所有已提交任务的 future 对象。
tokens += future.result()
调用 future.result()
方法获取任务的结果,并将结果累加到 tokens
变量中。future.result()
会阻塞当前线程,直到任务完成并返回结果。
这段代码的目的是在处理文档块时,计算文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。它首先检查文档是否处于暂停状态,然后计算 tokens 数量。如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则会进行 tokens 计算。随后,加载文档块的索引,并更新相关文档段的状态,最后将所有更改提交到数据库并返回 tokens 数量。整个过程在 Flask 应用的上下文中运行,以确保能够访问和操作数据库。
def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance):# 处理块
with flask_app.app_context():
# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)
tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量
# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)
document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段
db.session.commit()# 提交
return tokens
这段代码定义了一个名为 _process_chunk
的方法,用于处理文档集合的一个块。该方法在 Flask 应用的上下文中运行,计算块中文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。以下是详细的代码解释:
def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance):# 处理块
定义一个名为 _process_chunk
的方法。
参数:
self
:类实例的引用。
flask_app
:Flask 应用对象。
index_processor
:索引处理器对象。
chunk_documents
:要处理的文档块。
dataset
:数据集对象。
dataset_document
:数据集中的文档对象。
embedding_model_instance
:嵌入模型实例。
embedding_model_type_instance
:嵌入模型类型实例。
with flask_app.app_context():
在 Flask 应用上下文中运行代码。这使得代码可以访问 Flask 的应用配置和数据库连接。
# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)
调用 _check_document_paused_status
方法,检查 dataset_document.id
是否处于暂停状态。
tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量
初始化 tokens
变量为 0。
如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则计算块中文档的 tokens 数量。
使用 embedding_model_type_instance.get_num_tokens
方法获取每个文档的 tokens 数量,并累加到 tokens
变量中。
# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)
调用 index_processor.load
方法,加载数据集和块文档的索引,with_keywords=False
表示不使用关键字。
document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id
从文档的元数据中提取 doc_id
,并生成一个文档 ID 列表。
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段
查询条件:查找 DocumentSegment
表中 document_id
为 dataset_document.id
,index_node_id
在 document_ids
列表中的记录,且状态为 "indexing" 的记录。
更新字段:
DocumentSegment.status
:更新状态为 "completed"。
DocumentSegment.enabled
:设置为 True
。
DocumentSegment.completed_at
:设置完成时间为当前 UTC 时间。
db.session.commit()# 提交
将所有更改提交到数据库。
return tokens
返回累加后的 tokens 数量。
53AI,企业落地应用大模型首选服务商
产品:大模型应用平台+智能体定制开发+落地咨询服务
承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-11-15
dify案例分享-基于jina和http实现36氪新闻热榜文章
2024-11-10
安装 Dify 并集成 Ollama 和 Xinference
2024-11-05
【附案例源码】把阿里云验证码短信接入到dify中
2024-11-05
Dify 中的 Bearer Token 与 API-Key 鉴权方式
2024-11-01
Gitee AI 入驻 Dify,成倍加速 AI 应用开发落地
2024-10-27
免费使用 Mistral AI 并将其添加到 Dify 中使用
2024-10-26
一文带你了解Make、Coze、Dify 三大自动化平台的优缺点
2024-10-25
看一看 48Kstar 的Dify,是如何将 AI 研发的难度从天花板拉到地板之下的
2024-04-25
2024-04-24
2024-07-20
2024-05-08
2024-05-07
2024-07-16
2024-05-09
2024-04-25
2024-06-21
2024-07-08