本文主要介绍了Dify中高质量索引模式时,如何通过线程池执行器来处理chunk的过程。源码位置:difyapicoreindexing_runner.pyIndexingRunner._load
。核心思想:假设一个数据集中有一个文档,该文档可以拆分为12个段(segment)。如果chunk_size=10,那么分为2批提交给线程池执行器进行处理。
一.线程池处理chunk
1.方法处理过程
这段代码的目的是通过多线程并发处理文档集合中的每个块,提高处理效率。它创建了一个包含最多10个线程的线程池,并将文档集合按块拆分后提交给线程池执行器处理。最终,它收集所有任务的结果并累加到 tokens
变量中。这种方式可以显著加快大规模文档集合的处理速度。

if dataset.indexing_technique == 'high_quality':
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:# 线程池执行器
futures = []
for i in range(0, len(documents), chunk_size):# 遍历文档
chunk_documents = documents[i:i + chunk_size]# 块文档
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))# 提交任务
for future in futures:# 遍历futures
tokens += future.result()# 令牌
2.判断条件
这段代码是用来并行处理文档集合的一部分。它使用了Python的 concurrent.futures
模块来创建一个线程池执行器,以便在多个线程中并发执行任务。下面是详细解释每一行代码的作用:
if dataset.indexing_technique == 'high_quality':
检查数据集的索引技术是否为 "high_quality"。只有在这种情况下,下面的并行处理代码才会被执行。
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
3.创建线程池执行器
使用 ThreadPoolExecutor
创建一个包含最多10个线程的线程池执行器。max_workers=10
表示线程池中最多可以有10个并发线程。
futures = []
4.初始化 futures 列表
用于存储每个提交的任务的 future 对象。
for i in range(0, len(documents), chunk_size):
5.遍历文档
通过步长 chunk_size
遍历文档集合 documents
。i
是起始索引。
chunk_documents = documents[i:i + chunk_size]
6.块文档
从文档集合中提取一块文档,这块文档的大小为 chunk_size
。这部分文档会被单独处理。
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))
7.提交任务
使用 executor.submit()
方法提交一个任务给线程池执行器。每个任务调用 self._process_chunk
方法,并传入一系列参数。返回 future 对象会被添加到 futures
列表中。传递给 _process_chunk
参数包括:
for future in futures:
8.遍历 futures
遍历所有已提交任务的 future 对象。
tokens += future.result()
9.累加结果
调用 future.result()
方法获取任务的结果,并将结果累加到 tokens
变量中。future.result()
会阻塞当前线程,直到任务完成并返回结果。
二._process_chunk方法
1.方法处理过程
这段代码的目的是在处理文档块时,计算文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。它首先检查文档是否处于暂停状态,然后计算 tokens 数量。如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则会进行 tokens 计算。随后,加载文档块的索引,并更新相关文档段的状态,最后将所有更改提交到数据库并返回 tokens 数量。整个过程在 Flask 应用的上下文中运行,以确保能够访问和操作数据库。
def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance):# 处理块
with flask_app.app_context():
# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)
tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量
# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)
document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段
db.session.commit()# 提交
return tokens
这段代码定义了一个名为 _process_chunk
的方法,用于处理文档集合的一个块。该方法在 Flask 应用的上下文中运行,计算块中文档的 tokens 数量,加载索引,并更新数据库中的文档段状态。以下是详细的代码解释:
def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
embedding_model_instance, embedding_model_type_instance):# 处理块
2.方法定义
定义一个名为 _process_chunk
的方法。
-
参数:
-
self
:类实例的引用。 -
flask_app
:Flask 应用对象。 -
index_processor
:索引处理器对象。 -
chunk_documents
:要处理的文档块。 -
dataset
:数据集对象。 -
dataset_document
:数据集中的文档对象。 -
embedding_model_instance
:嵌入模型实例。 -
embedding_model_type_instance
:嵌入模型类型实例。
3.Flask 应用上下文
with flask_app.app_context():
在 Flask 应用上下文中运行代码。这使得代码可以访问 Flask 的应用配置和数据库连接。
4.检查文档是否暂停
# check document is paused# 检查文档是否暂停
self._check_document_paused_status(dataset_document.id)
5.计算 tokens 数量
调用 _check_document_paused_status
方法,检查 dataset_document.id
是否处于暂停状态。
tokens = 0
if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
tokens += sum(
embedding_model_type_instance.get_num_tokens(
embedding_model_instance.model,
embedding_model_instance.credentials,
[document.page_content]
)
for document in chunk_documents
)# 计算tokens的数量
-
初始化
tokens
变量为 0。 -
如果数据集的索引技术是 "high_quality" 或存在嵌入模型类型实例,则计算块中文档的 tokens 数量。
-
使用
embedding_model_type_instance.get_num_tokens
方法获取每个文档的 tokens 数量,并累加到tokens
变量中。
6.加载索引
# load index# 加载索引
index_processor.load(dataset, chunk_documents, with_keywords=False)
调用 index_processor.load
方法,加载数据集和块文档的索引,with_keywords=False
表示不使用关键字。

7.提取文档 ID
document_ids = [document.metadata['doc_id'] for document in chunk_documents]# 文档id
从文档的元数据中提取 doc_id
,并生成一个文档 ID 列表。
8.更新文档段状态
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})# 更新文档段
-
查询条件:查找
DocumentSegment
表中document_id
为dataset_document.id
,index_node_id
在document_ids
列表中的记录,且状态为 "indexing" 的记录。 -
更新字段:
-
DocumentSegment.status
:更新状态为 "completed"。 -
DocumentSegment.enabled
:设置为True
。 -
DocumentSegment.completed_at
:设置完成时间为当前 UTC 时间。
9.提交事务
db.session.commit()# 提交
将所有更改提交到数据库。
10.返回 tokens 数量
return tokens
返回累加后的 tokens 数量。