1. 引言
RAGFlow 是一个基于深度文档理解的开源 RAG(检索增强生成)引擎。接上文:关于 RAGFlow 项目中 RAG 技术的实现分析。本文将基于对 RAGFlow 源代码的分析,详细解析 RAGFlow 在检索召回(Retrieval)环节的实现机制。

2. RAGFlow 整体架构
根据 GitHub 仓库首页的介绍,RAGFlow 是一个"基于深度文档理解的开源 RAG 引擎",它提供了一个流线型的 RAG 工作流,结合大型语言模型(LLM)提供真实的问答能力,并通过各种复杂格式数据的引用支持回答。
RAGFlow 的核心特点包括:
-
• 基于深度文档理解的非结构化数据知识提取 -
• 基于模板的智能分块 -
• 通过可视化的文本分块减少幻觉 -
• 支持多种异构数据源(Word、幻灯片、Excel、txt、图像、扫描副本、结构化数据、网页等) -
• 自动化和无缝的 RAG 工作流
3. 检索找回机制的源码分析
通过分析 RAGFlow 的源代码 rag/nlp/search.py
、rag/nlp/query.py
等核心文件,来详细了解其检索召回机制的实现。
3.1 检索流程概述
RAGFlow 的检索流程主要包括以下几个步骤:
-
1. 查询处理与向量化 -
2. 混合检索(向量检索+全文检索) -
3. 重排序与结果融合 -
4. 引用插入与来源追踪
3.2 查询处理与向量化
在rag/nlp/query.py
中,FulltextQueryer
类负责处理用户查询,将其转换为可用于检索的形式:
def question(self, txt, tbl="qa", min_match: float = 0.6):
txt = FulltextQueryer.add_space_between_eng_zh(txt)
txt = re.sub(
r"[ :|\r\n\t,,。??/`!!&^%%()\[\]{}<>]+", " ",
rag_tokenizer.tradi2simp(rag_tokenizer.strQ2B(txt.lower())),
).strip()
otxt = txt
txt = FulltextQueryer.rmWWW(txt)
ifnotself.isChinese(txt):
txt = FulltextQueryer.rmWWW(txt)
tks = rag_tokenizer.tokenize(txt).split()
keywords = [t for t in tks if t]
tks_w = self.tw.weights(tks, preprocess=False)
# ...处理权重和同义词
return MatchTextExpr(self.query_fields, query, 100, {"minimum_should_match": min_match}), keywords
这段代码展示了 RAGFlow 如何处理用户查询:
-
1. 对查询文本进行预处理(添加英中间的空格、繁体转简体、全角转半角等) -
2. 移除无关词(如疑问词) -
3. 分词并提取关键词 -
4. 计算词权重 -
5. 添加同义词扩展 -
6. 构建匹配表达式
在rag/nlp/search.py
中,Dealer
类的get_vector
方法负责将查询文本转换为向量:
def get_vector(self, txt, emb_mdl, topk=10, similarity=0.1):
qv, _ = emb_mdl.encode_queries(txt)
shape = np.array(qv).shape
if len(shape) > 1:
raise Exception(
f"Dealer.get_vector returned array's shape {shape} doesn't match expectation(exact one dimension).")
embedding_data = [get_float(v) for v in qv]
vector_column_name = f"q_{len(embedding_data)}_vec"
return MatchDenseExpr(vector_column_name, embedding_data, 'float', 'cosine', topk, {"similarity": similarity})
这段代码展示了如何使用嵌入模型将查询文本转换为向量,并创建用于向量匹配的表达式。
3.3 混合检索机制
RAGFlow 采用了混合检索策略,结合了向量检索和全文检索。在rag/nlp/search.py
的search
方法中可以看到这一实现:
def search(self, req, idx_names: str | list[str], kb_ids: list[str], emb_mdl=None, highlight=False, rank_feature: dict | None = None ):
# ...前置处理
ifnot qst:
# 处理无查询情况
# ...
else:
highlightFields = ["content_ltks", "title_tks"] if highlight else []
matchText, keywords = self.qryr.question(qst, min_match=0.3)
if emb_mdl isNone:
# 仅使用全文检索
matchExprs = [matchText]
res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)
# ...
else:
# 混合检索:结合向量检索和全文检索
matchDense = self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1))
q_vec = matchDense.embedding_data
src.append(f"q_{len(q_vec)}_vec")
fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"})
matchExprs = [matchText, matchDense, fusionExpr]
res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)
# ...
这段代码揭示了 RAGFlow 的混合检索策略:
-
1. 创建全文检索表达式 matchText
-
2. 如果提供了嵌入模型,则创建向量检索表达式 matchDense
-
3. 使用 FusionExpr
融合两种检索结果,权重分别为 0.05 和 0.95,表明向量检索在混合检索中占主导地位 -
4. 如果初次检索结果为空,会尝试降低匹配阈值进行二次检索
3.4 重排序机制
RAGFlow 的重排序机制是其检索找回流程中的关键环节,主要通过rerank
函数实现。在rag/nlp/search.py
中,rerank
函数负责对初步检索结果进行多维度重排序:
def rerank(self, sres, query, tkweight=0.3,
vtweight=0.7, rank_feature: dict | None = None):
"""
对检索结果进行重排序
参数:
- sres: 搜索结果
- query: 查询文本
- tkweight: 词元相似度权重
- vtweight: 向量相似度权重
- rank_feature: 排序特征
返回:
- 重排序后的结果
"""
_, keywords = self.qryr.question(query)
vector_size = len(sres.query_vector)
vector_column = f"q_{vector_size}_vec"
zero_vector = [0.0] * vector_size
ins_embd = []
# 提取文档向量
for chunk_id in sres.ids:
vector = sres.field[chunk_id].get(vector_column, zero_vector)
ifisinstance(vector, str):
vector = [get_float(v) for v in vector.split("t")]
ins_embd.append(vector)
ifnot ins_embd:
return [], [], []
# 处理重要关键词
for i in sres.ids:
ifisinstance(sres.field[i].get("important_kwd", []), str):
sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
# 提取文本特征
ins_tw = []
for i in sres.ids:
content_ltks = list(OrderedDict.fromkeys(sres.field[i]["field"].split()))
title_tks = [t for t in sres.field[i].get("title_tks", "").split() if t]
question_tks = [t for t in sres.field[i].get("question_tks", "").split() if t]
important_kwd = sres.field[i].get("important_kwd", [])
# 计算加权分数
tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6
ins_tw.append(tks)
# 计算排序特征分数
rank_fea = self._rank_feature_scores(rank_feature, sres)
# 计算混合相似度
sim, tksim, vtsim = self.qryr.hybrid_similarity(
sres.query_vector,
ins_embd,
keywords,
ins_tw, tkweight, vtweight)
# 返回综合排序分数
return sim + rank_fea, tksim, vtsim
这个函数展示了 RAGFlow 的重排序机制如何工作:
-
1. 多维特征提取:
-
• 文档向量:从检索结果中提取每个文档的向量表示 -
• 文本特征:提取内容、标题、问题和重要关键词等文本特征 -
• 排序特征:通过 _rank_feature_scores
函数计算额外的排序特征分数
-
2. 特征加权: -
• 对不同类型的文本特征应用不同权重:标题 ×2,重要关键词 ×5,问题 ×6 -
• 通过 tkweight
和vtweight
参数控制词元相似度和向量相似度的权重比例 -
3. 混合相似度计算: -
• 调用 hybrid_similarity
函数计算查询与文档间的混合相似度 -
• 结合向量相似度和词元相似度,生成综合排序分数 -
4. 排序特征融合: -
• 将混合相似度与排序特征分数相加,得到最终排序分数
_rank_feature_scores
函数负责计算基于标签和 PageRank 的排序特征分数:
def _rank_feature_scores(self, query_rfea, search_res):
"""计算排序特征分数"""
## For rank feature(tag_fea) scores.
rank_fea = []
pageranks = []
# 提取PageRank分数
for chunk_id in search_res.ids:
pageranks.append(search_res.field[chunk_id].get(PAGERANK_FLD, 0))
pageranks = np.array(pageranks, dtype=float)
# 如果没有查询特征,直接返回PageRank分数
ifnot query_rfea:
return np.array([0for _ inrange(len(search_res.ids))]) + pageranks
# 计算查询特征与文档标签的相似度
q_denor = np.sqrt(np.sum([s*s for s,t in query_rfea.items() if t != PAGERANK_FLD]))
for i in search_res.ids:
nor, denor = 0, 0
# 如果文档没有标签,添加0分
ifnot search_res.field[i].get(TAG_FLD):
rank_fea.append(0)
continue
# 计算标签相似度分数
for t, sc ineval(search_res.field[i].get(TAG_FLD, "{}"))).items():
if t in query_rfea:
nor += query_rfea[t] * sc
denor += sc * sc
# 归一化分数
if denor == 0:
rank_fea.append(0)
else:
rank_fea.append((nor/np.sqrt(denor)/q_denor))
# 返回标签相似度分数与PageRank分数的加权和
return np.array(rank_fea)*10. + pageranks
此外,RAGFlow 还提供了基于模型的重排序功能,通过rerank_by_model
函数实现:
def rerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3,
vtweight=0.7, cfield="content_ltks",
rank_feature: dict | None = None):
"""
使用外部模型进行重排序
参数:
- rerank_mdl: 重排序模型
- sres: 搜索结果
- query: 查询文本
- tkweight: 词元相似度权重
- vtweight: 向量相似度权重
- cfield: 内容字段名
- rank_feature: 排序特征
返回:
- 重排序后的结果
"""
# 提取文档内容
contents = []
for i in sres.ids:
contents.append(sres.field[i].get(cfield, ""))
# 使用模型计算相似度分数
scores = rerank_mdl.compute_score(query, contents)
# 计算基本重排序分数
sim, tksim, vtsim = self.rerank(sres, query, tkweight, vtweight, rank_feature)
# 返回模型分数与基本分数的加权和
return scores * 0.7 + sim * 0.3, tksim, vtsim
这个函数展示了 RAGFlow 如何结合外部重排序模型与内部重排序机制,进一步提高检索结果的相关性。
3.5 引用插入与来源追踪
RAGFlow 的一个重要特性是能够在生成的回答中插入引用,追踪信息来源。这在insert_citations
方法中实现:
def insert_citations(self, answer, chunks, chunk_v, embd_mdl, tkweight=0.1, vtweight=0.9):
assertlen(chunks) == len(chunk_v)
ifnot chunks:
return answer, set([])
# 将回答分割成片段
pieces = re.split(r"(```)", answer)
# ...处理代码块和句子分割
# 计算每个片段与知识库块的相似度
ans_v, _ = embd_mdl.encode(pieces_)
# 计算混合相似度并插入引用
for i, a inenumerate(pieces_):
sim, tksim, vtsim = self.qryr.hybrid_similarity(
ans_v[i], chunk_v,
rag_tokenizer.tokenize(self.qryr.rmWWW(pieces_[i])).split(),
chunks_tks, tkweight, vtweight
)
# ...根据相似度插入引用
这段代码展示了 RAGFlow 如何为生成的回答添加引用:
-
1. 将回答分割成多个片段 -
2. 计算每个片段与知识库块的向量相似度和词元相似度 -
3. 根据混合相似度确定最相关的知识库块 -
4. 在回答中插入对应的引用标记
4. 文档存储与检索接口
RAGFlow 使用DocStoreConnection
类作为文档存储和检索的接口,支持多种检索表达式:
from rag.utils.doc_store_conn import DocStoreConnection, MatchDenseExpr, FusionExpr, OrderByExpr
主要的检索表达式包括:
-
• MatchTextExpr
:用于全文检索 -
• MatchDenseExpr
:用于向量检索 -
• FusionExpr
:用于融合多种检索结果
这些表达式被传递给dataStore.search
方法执行实际的检索操作。
5. 检索结果处理
检索结果通过SearchResult
数据类进行封装:
@dataclass
class SearchResult:
total: int
ids: list[str]
query_vector: list[float] | None = None
field: dict | None = None
highlight: dict | None = None
aggregation: list | dict | None = None
keywords: list[str] | None = None
group_docs: list[list] | None = None
这个数据结构包含了检索结果的各种信息,包括总数、文档 ID、查询向量、高亮显示、聚合结果等。
6. 总结
RAGFlow 提供了"基于模板的分块"和"减少幻觉的引用"等特性。通过源码分析,我们可以看到这些特性在检索召回机制中的具体实现:
-
1. 模板化分块:虽然在分析的源码中没有直接看到分块模板的实现细节,但从检索代码可以看出,系统支持对不同字段(如标题、内容)设置不同的权重,这与模板化分块的理念一致。 -
2. 减少幻觉的引用:通过 insert_citations
方法,RAGFlow 能够为生成的回答添加引用,追踪信息来源,从而减少幻觉。 -
3. 混合检索策略:RAGFlow 结合了向量检索和全文检索,并通过可配置的权重进行融合,这与官方文档中提到的"多重召回配对融合重排序"相符。
通过对 RAGFlow 源码的分析,我们可以看到其检索找回机制的核心实现包括:
-
1. 查询处理:对用户查询进行预处理、分词、权重计算和同义词扩展 -
2. 混合检索:结合向量检索和全文检索,通过可配置的权重进行融合 -
3. 重排序:通过 rerank
函数实现多维度特征融合的重排序机制,包括:
-
• 文档向量与查询向量的相似度计算 -
• 文本特征(内容、标题、问题、关键词)的加权处理 -
• 标签特征与 PageRank 分数的融合 -
• 支持外部模型进行进一步重排序
RAGFlow 的检索找回机制设计全面而灵活,能够有效地从知识库中检索相关信息,并通过多维度重排序提高检索结果的相关性,通过引用机制减少幻觉,提高回答的可靠性。