一、整体架构
1.1 架构概述
dify 的 RAG(检索增强生成)架构是一个完整的文档处理、索引和检索系统,旨在提高大语言模型生成内容的准确性和相关性。该架构由三个主要模块组成:文档处理模块、向量化与索引模块、检索与重排模块。

整个系统的数据流如下:
-
用户上传文档或提供URL -
文档处理模块提取文本内容并进行清洗 -
文本被分割成适当大小的段落(chunks) -
向量化模块将这些段落转换为向量表示并存储在向量数据库中 -
同时创建关键词索引以支持混合搜索 -
用户查询时,检索模块根据配置的检索方法找到相关段落 -
重排模块对检索结果进行排序优化 -
最终将优化后的上下文提供给大语言模型生成回答

1.2 数据模型关系
Dify RAG 系统的核心数据模型包括:
-
Dataset:知识库,包含多个文档,是 RAG 的基本单位 -
Document:文档,包含元数据和处理规则 -
DocumentSegment:文档分段,是实际被索引和检索的最小单位 -
DatasetKeywordTable:关键词表,用于支持关键词搜索
这些实体之间的关系是:一个 Dataset 包含多个 Document,一个 Document 包含多个 DocumentSegment。Dataset 通过index_struct 字段存储向量数据库的配置信息。

二、文档处理模块
2.1 设计思路
文档处理模块负责将各种格式的文档转换为可被索引的文本段落。该模块采用 ETL(提取-转换-加载)模式设计,具有高度的可扩展性和灵活性。
主要功能包括:
-
支持多种文档格式的文本提取 -
文本清洗和预处理 -
文本分段(chunking) -
元数据提取和管理
2.1 文档处理架构

2.3 核心组件
ExtractProcessor
ExtractProcessor
是文档处理的入口,负责根据文档类型选择合适的提取器进行文本提取。它支持多种文档格式,包括:
-
文本文件(TXT) -
PDF文档 -
Word文档(DOCX、DOC) -
Excel表格(XLSX、XLS) -
PowerPoint演示文稿(PPTX、PPT) -
HTML网页 -
Markdown文档 -
电子邮件(EML、MSG) -
电子书(EPUB) -
XML文件 -
CSV数据 -
Notion导出文件
提取器采用策略模式设计,可以轻松扩展以支持新的文档格式。
ExtractProcessor
类实现
class ExtractProcessor:
@classmethod
def extract(cls, extract_setting: ExtractSetting, is_automatic: bool = False,
file_path: str = None) -> list[Document]:
# 根据数据源类型和文件类型选择合适的提取器
if extract_setting.datasource_type == DatasourceType.FILE.value:
# 根据文件扩展名选择不同的提取器
if file_extension == '.pdf':
extractor = PdfExtractor(file_path)
elif file_extension in ['.md', '.markdown']:
extractor = UnstructuredMarkdownExtractor(file_path)
if is_automatic else MarkdownExtractor(file_path)
# ... 其他文件类型的处理
return extractor.extract()
文档提取流程

TextSplitter
TextSplitter
负责将长文本分割成适当大小的段落,是 RAG 系统性能的关键组件。它提供以下功能:
-
基于不同分隔符的文本分割 -
支持设置最大 token 数量 -
支持段落重叠以保持上下文连贯性 -
支持不同的分词器(Tiktoken、HuggingFace)进行准确的 token 计算
分割策略可以通过处理规则进行配置,允许用户根据不同类型的文档调整分割参数。
TextSplitter
实现类
class TextSplitter(BaseDocument, ABC):
def __init__(
self,
chunk_size: int = 4000,
chunk_overlap: int = 200,
length_function: Callable[[str], int] = len,
keep_: bool = False,
add_start_index: bool = False,
) -> None:
# 初始化分割器参数
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function
self._keep_separator = keep_separator
self._add_start_index = add_start_index
@abstractmethod
def split_text(self, text: str) -> list[str]:
"""Split text into multiple components."""
def create_documents(
self, texts: list[str], metadatas: Optional[list[dict]] = None
) -> list[Document]:
# 从分割后的文本创建文档对象
文档切块是RAG中的关键步骤,Dify 使用TextSplitter
类及其子类来实现不同的切块策略:
-
FixedRecursiveCharacterTextSplitter
:固定大小的递归字符分割 -
EnhanceRecursiveCharacterTextSplitter
:增强的递归字符分割
切块后的文档片段会保留原始文档的元数据,并添加唯一标识符(doc_id)和内容哈希值(doc_hash)。
文本分割流程

TextCleaner
TextCleaner
负责文本清洗,去除不必要的格式和噪声,提高索引和检索质量。清洗操作包括:
-
移除多余空格 -
去除URL和电子邮件(可选) -
标准化文本格式 -
处理特殊字符
三、向量化与索引模块
3.1 设计思路
向量化与索引模块负责将文本段落转换为向量表示并存储在向量数据库中,同时创建关键词索引以支持混合搜索。该模块采用工厂模式和适配器模式设计,支持多种向量数据库和嵌入模型。
主要功能包括:
-
文本向量化(嵌入) -
向量存储和索引 -
关键词提取和索引 -
缓存管理
3.2 向量化流程图

3.3 核心组件
嵌入缓存(CacheEmbedding)
Dify使用CacheEmbedding
类来管理文本嵌入过程,它具有缓存功能,可以避免重复计算嵌入向量:
-
对于每个文本块,首先计算其哈希值 -
检查数据库中是否已存在该哈希值对应的嵌入向量 -
如果存在,直接使用缓存的向量 -
如果不存在,调用嵌入模型生成向量,并将结果存入缓存
class CacheEmbedding(Embeddings):
def __init__(self, model_instance: ModelInstance, user: Optional[str] = None) -> None:
self._model_instance = model_instance
self._user = user
def embed_documents(self, texts: list[str]) -> list[list[]]:
# 使用文档嵌入缓存或存储(如果不存在)
text_embeddings = [Nonefor _ in range(len(texts))]
embedding_queue_indices = []
# 检查缓存中是否存在嵌入
for i, text in enumerate(texts):
hash = helper.generate_text_hash(text)
embedding = db.session.query(Embedding).filter_by(
model_name=self._model_instance.model,
hash=hash,
provider_name=self._model_instance.provider
).first()
if embedding:
text_embeddings[i] = embedding.get_embedding()
else:
embedding_queue_indices.append(i)
# 处理未缓存的嵌入
if embedding_queue_indices:
# 生成嵌入并缓存
嵌入缓存流程

向量工厂(VectorFactory)
Dify 支持多种向量数据库,通过工厂模式实现:
class Vector:
def __init__(self, dataset: Dataset, attributes: list = None):
if attributes isNone:
attributes = ['doc_id', 'dataset_id', 'document_id', 'doc_hash']
self._dataset = dataset
self._embeddings = self._get_embeddings()
self._attributes = attributes
self._vector_processor = self._init_vector()
def _init_vector(self) -> BaseVector:
vector_type = dify_config.VECTOR_STORE
if self._dataset.index_struct_dict:
vector_type = self._dataset.index_struct_dict['type']
vector_factory_cls = self.get_vector_factory(vector_type)
return vector_factory_cls().init_vector(self._dataset, self._attributes, self._embeddings)
@staticmethod
def get_vector_factory(vector_type: str) -> type[AbstractVectorFactory]:
match vector_type:
case VectorType.CHROMA:
core.rag.datasource.vdb.chroma.chroma_vector import ChromaVectorFactory
return ChromaVectorFactory
# ... 其他向量数据库的支持
向量工厂模式

支持的向量数据库包括:
-
Chroma -
Milvus -
MyScale -
PGVector -
Qdrant -
Relyt -
Elasticsearch -
TiDB Vector -
Weaviate -
Tencent -
Oracle -
OpenSearch -
AnalyticDB
索引处理器(IndexingRunner)
IndexingRunner
负责协调整个索引过程,包括文档提取、转换和加载。它实现了完整的 ETL 流程:
-
Extract:从原始文档中提取文本 -
Transform:清洗和分割文本 -
Load:将处理后的段落加载到向量数据库和关键词索引中
class IndexingRunner:
def __init__(self, dataset, document_id, document_model, document_content, document_metadata, tenant_id, user_id):
# 初始化
def run(self):
# 1. 提取文本
documents = self.extract()
# 2. 转换(分割和清洗)
= self.transform(documents)
# 3. 保存段落
self.save_segments()
# 4. 加载到索引
self.load()
索引处理流程

段落索引处理器(ParagraphIndexProcessor)
段落索引处理器实现了提取、转换、加载和检索方法:
class ParagraphIndexProcessor(IndexProcessorBase):
def extract(self, extract_setting: ExtractSetting) -> list[Document]:
# 提取文档
return ExtractProcessor.extract(extract_setting)
def transform(self, documents: list[Document]) -> list[Document]:
# 清洗和分割文本
cleaned_documents = self.clean(documents)
return self.split(cleaned_documents)
def load(self, dataset_id: str, : list[Document]) -> None:
# 创建向量索引和关键词索引
self.create_vector_index(dataset_id, segments)
self.create_keyword_index(dataset_id, segments)
def retrieve(self, dataset_id: str, query: str, **kwargs) -> list[Document]:
# 调用检索服务
returnService.retrieve(
retrival_method=kwargs.get('_method', 'semantic_search'),
dataset_id=dataset_id,
query=query,
top_k=kwargs.get('top_k', 2),
score_threshold=kwargs.get('score_threshold', 0.0),
reranking_model=kwargs.get('reranking_model'),
reranking_mode=kwargs.get('reranking_mode', 'reranking_model'),
weights=kwargs.get('weights')
)

Jieba关键词索引
Jieba
组件负责从文本中提取关键词并建立索引,支持关键词搜索和混合搜索。它提供:
-
关键词提取 -
TF-IDF权重计算 -
关键词索引创建和更新
四、检索与重排模块
4.1 设计思路
检索与重排模块负责根据用户查询找到最相关的文本段落,并对结果进行优化排序。该模块支持多种检索方法和重排策略,可以根据不同场景进行配置。
主要功能包括:
-
语义检索(向量相似度搜索) -
关键词检索(基于关键词匹配) -
全文检索(基于倒排索引) -
混合检索(结合多种检索方法) -
结果重排序(基于多种因素)
4.2 检索与重排流程图

4.3 核心组件
检索方法(RetrievalMethod)
RetrievalMethod
是一个枚举类,定义了系统支持的检索方法:
-
SEMANTIC_SEARCH:语义搜索,基于向量相似度 -
FULL_TEXT_SEARCH:全文搜索,基于倒排索引 -
HYBRID_SEARCH:混合搜索,结合语义和关键词
class Method(Enum):
SEMANTIC_SEARCH = 'semantic_search'
FULL_TEXT_SEARCH = 'full_text_search'
HYBRID_SEARCH = 'hybrid_search'
@staticmethod
def is_support_semantic_search(retrieval_method: str) -> bool:
return retrieval_method in {Method.SEMANTIC_SEARCH.value, RetrievalMethod.HYBRID_SEARCH.value}
@staticmethod
def is_support_fulltext_search(_method: str) -> bool:
return retrieval_method in {RetrievalMethod.FULL_TEXT_SEARCH.value, RetrievalMethod.HYBRID_SEARCH.value}
检索方法流程图

检索服务(RetrievalService)
Service
是检索功能的核心实现,负责根据配置的检索方法执行搜索并返回结果。它提供以下功能:
-
支持多种检索方法 -
多线程并行检索 -
结果合并和排序 -
支持重排序
主要方法包括:
-
retrieve
:主检索方法,根据检索方法执行搜索 -
keyword_search
:关键词搜索实现 -
embedding_search
:向量相似度搜索实现 -
full_text_index_search
:全文索引搜索实现
检索服务协调不同的检索方法并处理结果:
class Service:
@classmethod
def retrieve(cls, retrival_method: str, dataset_id: str, query: str,
top_k: int, score_threshold: Optional[] = .0,
reranking_model: Optional[dict] = None, reranking_mode: Optional[str] = 'reranking_model',
weights: Optional[dict] = None):
# 获取数据集
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
ifnot dataset or dataset._document_count == 0or dataset._segment_count == 0:
return []
all_documents = []
threads = []
exceptions = []
# 关键词搜索
if retrival_method == 'keyword_search':
keyword_thread = threading.Thread(target=Service.keyword_search, kwargs={...})
threads.append(keyword_thread)
keyword_thread.start()
# 语义搜索
ifMethod.is_support_semantic_search(retrival_method):
embedding_thread = threading.Thread(target=Service.embedding_search, kwargs={...})
threads.append(embedding_thread)
embedding_thread.start()
# 全文搜索
ifMethod.is_support_fulltext_search(retrival_method):
full_text_index_thread = threading.Thread(target=RetrievalService.full_text_index_search, kwargs={...})
threads.append(full_text_index_thread)
full_text_index_thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 混合搜索需要后处理
if retrival_method == Method.HYBRID_SEARCH.value:
data_post_processor = DataPostProcessor(str(dataset.tenant_id), reranking_mode,
reranking_model, weights, False)
all_documents = data_post_processor.invoke(
query=query,
documents=all_documents,
score_threshold=score_threshold,
top_n=top_k
)
return all_documents
检索服务时序图

权重重排(WeightRerankRunner)
WeightRerankRunner
实现了基于权重的重排序策略,可以根据不同因素对检索结果进行优化排序。它考虑以下因素:
-
向量相似度得分 -
关键词匹配得分 -
自定义权重配置
通过调整不同因素的权重,可以优化检索结果的相关性和质量。
权重重排结合了关键词得分和向量相似度得分:
class WeightRerankRunner:
def __init__(self, tenant_id: str, weights: Weights) -> None:
self.tenant_id = tenant_id
self.weights = weights
def run(self, query: str, documents: list[Document], score_threshold: Optional[float] = None,
top_n: Optional[int] = None, user: Optional[str] = None) -> list[Document]:
# 去重
docs = []
doc_id = []
unique_documents = []
for document in documents:
if document.metadata['doc_id'] notin doc_id:
doc_id.append(document.metadata['doc_id'])
docs.append(document.page_content)
unique_documents.append(document)
documents = unique_documents
# 计算关键词得分和向量得分
rerank_documents = []
query_scores = self._calculate_keyword_score(query, documents)
query_vector_scores = self._calculate_cosine(self.tenant_id, query, documents, self.weights.vector_setting)
# 合并得分
for document, query_score, query_vector_score in zip(documents, query_scores, query_vector_scores):
score = self.weights.vector_setting.vector_weight * query_vector_score +
self.weights.keyword_setting.keyword_weight * query_score
if score_threshold and score < score_threshold:
continue
document.metadata['score'] = score
rerank_documents.append(document)
# 排序并返回结果
rerank_documents = sorted(rerank_documents, key=lambda x: x.metadata['score'], reverse=True)
return rerank_documents[:top_n] if top_n else rerank_documents
权重重排流程图

数据后处理器(DataPostProcessor)
DataPostProcessor
负责对检索结果进行后处理,包括重排序、去重和格式化。它支持多种重排序策略,可以根据不同场景进行配置。
数据后处理器负责重排序和结果处理:
class DataPostProcessor:
def __init__(self, tenant_id: str, reranking_mode: str, reranking_model: Optional[dict],
weights: Optional[dict], enable_reranking: bool = True):
# 初始化
def invoke(self, query: str, documents: list[Document], score_threshold: Optional[float] = None,
top_n: Optional[int] = None) -> list[Document]:
# 根据模式选择重排序方法
if self.enable_reranking and self.reranking_mode == RerankMode.RERANKING_MODEL.value:
# 使用模型重排序
return self.rerank_model_runner.run(query, documents, score_threshold, top_n)
elif self.enable_reranking and self.reranking_mode == RerankMode.WEIGHT.value:
# 使用权重重排序
return self.weight_rerank_runner.run(query, documents, score_threshold, top_n)
else:
# 不重排序,直接返回
return documents[:top_n] if top_n else documents
数据后处理流程图

五、总结
Dify 的 RAG 系统是一个功能完整、灵活可扩展的检索增强生成框架,具有以下特点:
-
多格式文档支持:支持 PDF、Word、Excel、Markdown、HTML、CSV 等多种文档格式。
-
灵活的文本分块:支持多种分块策略,包括固定大小、基于分隔符和基于语义的分块。
-
高效的向量化:实现了嵌入缓存机制,提高了向量化效率。
-
多样的存储选项:支持 Chroma、Milvus、PGVector、Qdrant 等多种向量数据库。
-
多种检索方法:支持语义搜索、全文搜索、关键词搜索和混合搜索。
-
高级重排序:支持基于模型的重排序和基于权重的重排序。
-
并行处理:使用多线程并行执行不同的检索方法,提高效率。
-
可扩展架构:采用工厂模式和抽象基类,便于扩展新的功能。
通过这些组件的协同工作,Dify 的 RAG 系统能够有效地处理和检索大量文档,为大语言模型提供准确的上下文信息,从而生成更加准确和相关的回答。