思考在什么情况下会使用到高质量索引模式呢?第1种情况是在知识库中上传文档,文档被拆分为段落后需要进行编码(增加);第2种情况是在召回测试的时候,需要对query
进行编码(查询);第3种情况是当文档中的段落增加和更新时需要进行编码(增加和更新)。索引模式是针对知识库设置的,即对该知识库中所有的文档都生效。本文主要解释Dify
中的高质量索引模式实现过程。
一.高质量索引模式操作
调用 大模型(比如OpenAI
) 的嵌入接口进行处理,以在用户查询时提供更高的准确度。可在模型供应商中设置默认的系统推理模型、Embedding
模型和Rerank
模型。

在datasets
数据表中会记录使用的embedding_model
、embedding_model_provider
、retrieval_model
。其中retrieval_model
内容格式如下:
{
"top_k": 2,
"search_method": "semantic_search",
"reranking_model": {
"reranking_model_name": "",
"reranking_provider_name": ""
},
"score_threshold": null,
"reranking_enable": false,
"score_threshold_enabled": false
}
semantic_search
就是向量检索,即通过生成查询嵌入并查询与其向量表示最相似的文本分段。除此外还有全文检索(索引文档中的所有词汇,从而允许用户查询任意词汇,并返回包含这些词汇的文本片段)和混合检索(同时执行全文检索和向量检索,并应用重排序步骤,从两类查询结果中选择匹配用户问题的最佳结果,需配置Rerank
模型API
)。

二.触发异步任务
源码位置:difyapiservicesdataset_service.pysave_document_with_dataset_id()
在知识库中上传文件提交后,就会触发文档索引任务,或重复文档索引任务。如下所示:

document = Document.query.filter_by(
dataset_id=dataset.id,
tenant_id=current_user.current_tenant_id,
data_source_type='upload_file',
enabled=True,
name=file_name
).first()# 根据条件判断documents是否存在文件记录,如果存在就是重复文件,否则就是新的文件
document_indexing_task
和duplicate_document_indexing_task
都是异步任务,它们被用来处理文档的索引。两者区别如下所示:
document_indexing_task
:这个任务用于处理新上传的文档的索引。当新的文档被添加到数据集中时,这个任务会被触发。它接收两个参数:数据集的ID和新添加的文档的ID列表。这个任务会对每个新添加的文档进行索引处理。
duplicate_document_indexing_task
:这个任务用于处理重复文档的索引。当已存在的文档被标记为重复并需要重新索引时,这个任务会被触发。它接收两个参数:数据集的ID和需要重新索引的文档的ID列表。这个任务会对每个需要重新索引的文档进行索引处理。
这两个任务都是异步的,这意味着它们的执行不会阻塞主程序的运行。这对于处理大量文档或者处理需要大量计算的任务非常有用。
1.触发文档索引任务:document_indexing_task.delay()
源码位置:dify-0.6.9apitasksdocument_indexing_task.pydocument_indexing_task()

document_indexing_task
是一个异步任务,用于处理新上传的文档的索引。当新的文档被添加到数据集中时,这个任务会被触发。它接收两个参数:数据集的ID和新添加的文档的ID列表。这个任务会对每个新添加的文档进行索引处理。以下是该方法的主要步骤:
(1)它会获取指定ID的数据集。
(2)它会检查文档的数量是否超过了批量上传的限制或者文档上传的配额。如果超过了限制,它会抛出一个异常,并将索引状态设置为'error'。
(3)对于每个需要索引的文档,它会将索引状态设置为'parsing',并将处理开始的时间设置为当前时间。
(4)它会创建一个IndexingRunner
实例,并调用其run
方法来运行索引处理。如果在处理过程中发生了任何异常,它会捕获这个异常并记录日志。
2.触发重复文档索引任务:duplicate_document_indexing_task.delay()
源码位置:difyapitasksduplicate_document_indexing_task.pyduplicate_document_indexing_task()

duplicate_document_indexing_task
是一个异步任务,用于处理需要重新索引的文档。当已存在的文档被标记为重复并需要重新索引时,这个任务会被触发。它接收两个参数:数据集的ID和需要重新索引的文档的ID列表。以下是该方法的主要步骤:
(1)它会获取指定ID的数据集。
(2)它会检查文档的数量是否超过了批量上传的限制或者文档上传的配额。如果超过了限制,它会抛出一个异常,并将索引状态设置为'error'。
(3)对于每个需要重新索引的文档,它会清理旧的数据,包括从向量索引中删除旧的数据和删除数据库中的旧数据。然后,它会将索引状态设置为'parsing',并将处理开始的时间设置为当前时间。
(4)它会创建一个IndexingRunner
实例,并调用其run
方法来运行索引处理。如果在处理过程中发生了任何异常,它会捕获这个异常并记录日志。
三.IndexingRunner类
1.IndexingRunner
类概述
源码位置:difyapicoreindexing_runner.pyIndexingRunner类
IndexingRunner
类中每个函数的名称、功能和实现过程的概述。如下所示:
函数名称 | 功能描述 | 实现过程简述 |
---|---|---|
__init__ |
初始化索引运行器 | 初始化存储和模型管理器。 |
run |
运行索引过程 | 遍历数据集文档,执行提取、转换、保存段和加载步骤。 |
run_in_splitting_status |
当索引状态为拆分时运行索引过程 | 删除现有文档段,重新提取、转换、保存段和加载。 |
run_in_indexing_status |
当索引状态为索引时运行索引过程 | 转换未完成的文档段,然后加载它们。 |
indexing_estimate |
估计文档索引所需的资源 | 检查文档限制,提取文本文档,分割文档,计算令牌数和价格。 |
_extract |
提取文档内容 | 根据数据源类型加载文件或Notion信息,并提取文本。 |
filter_string |
过滤字符串,移除不需要的字符 | 使用正则表达式替换或删除文本中的特定字符。 |
_get_splitter |
获取分割器 | 根据处理规则获取文本分割器。 |
_step_split |
步骤拆分 | 将文本文档拆分成文档段并保存。 |
_split_to_documents |
拆分文档为多个节点 | 使用分割器将文本拆分成多个文档节点。 |
format_qa_document |
格式化问答模型文档 | 对于问答模型,生成问题和答案对。 |
_split_to_documents_for_estimate |
为估计过程拆分文档节点 | 拆分文档以估计索引过程所需的资源。 |
_document_clean |
根据处理规则清理文档内容 | 移除额外的空格、URL和电子邮件等。 |
format_split_text |
格式化拆分文本 | 从文本中提取问题和答案对。 |
_load |
加载索引并更新文档/段状态为完成 | 执行实际的索引加载,更新文档和段的状态。 |
_process_keyword_index |
处理关键字索引 | 创建关键字索引。 |
_process_chunk |
处理文档块的索引加载 | 对文档块进行索引处理,并更新数据库状态。 |
_check_document_paused_status |
检查文档是否已暂停索引 | 检查Redis缓存中是否有文档暂停的标记。 |
_update_document_index_status |
更新文档索引状态 | 在数据库中更新文档的索引状态。 |
_update_segments_by_document |
根据文档ID更新段的状态 | 更新与文档ID相关的所有段的状态。 |
batch_add_segments |
批量添加段索引处理 | 将多个段批量添加到索引中。 |
_transform |
转换文档 | 将提取的文本文档转换为可以索引的格式。 |
这个类中的函数大多数是私有函数,它们通常作为run
方法的一部分被调用,或者在估计索引过程时使用。每个函数都专注于索引过程的一个特定方面,从文档的提取和清理到分割、转换和最终的索引加载。
2.运行索引过程run()函数
遍历数据集文档,执行提取、转换、保存段和加载步骤。这个过程和Dify
中的经济索引模式实现过程几乎一样,参考 [1]。由于高质量索引模式需要消耗tokens
,因此需要计算消耗了多少tokens
,大致计算思路:
# 获取嵌入模型实例
embedding_model_instance = self.model_manager.get_model_instance(
tenant_id=dataset.tenant_id,
provider=dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=dataset.embedding_model
)
# 转换为TextEmbeddingModel对象
embedding_model_type_instance = cast(TextEmbeddingModel, embedding_model_type_instance)
# 对于给定消息计算tokens数量
embedding_model_type_instance.get_num_tokens(
model=embedding_model_instance.model,
credentials=embedding_model_instance.credentials,
texts=[text]
)
四.TextEmbeddingModel类
1.VolcengineMaaSTextEmbeddingModel类
源码位置:difyapicoremodel_runtimemodel_providers__basetext_embedding_model.py
在模型供应商下面,提供有各种模型的基类,比如llm
模型、embedding
模型、rerank
模型、语音转文本模型、文本转语音模型、文本转图像模型等。接下来以火山引擎为例介绍VolcengineMaaSTextEmbeddingModel
模型。

源码位置:difyapicoremodel_runtimemodel_providersvolcengine_maastext_embeddingtext_embedding.py

2.调用Embedding模型
源码位置:difyapicoreembeddingcached_embedding.py
该文件包括一个文档嵌入函数(embed_documents
),一个查询嵌入函数(embed_query
)。在这2个函数中都有调用Embedding
模型:
embedding_result = self._model_instance.invoke_text_embedding(
texts=batch_texts,
user=self._user
)
顺便看下在哪里调用了这2个方法:一个位置是difyapicoreragdatasourcevdbvector_factory.py
,另一个位置是difyapiserviceshit_testing_service.py
。关于Dify
中的weaviate
向量数据库操作参考 [2]。

源码位置:difyapicoremodel_manager.py
invoke_text_embedding()
函数具体实现过程:

源码位置:difyapicoremodel_runtimemodel_providers__basetext_embedding_model.py
invoke()
函数具体实现过程:

源码位置:difyapicoremodel_runtimemodel_providersvolcengine_maastext_embeddingtext_embedding.py
因为VolcengineMaaSTextEmbeddingModel
继承自TextEmbeddingModel
,实际上调用的就是VolcengineMaaSTextEmbeddingModel
的_invoke()
方法来实现具体的Embedding
编码:
