最近在优化公司的知识问答系统时,遇到了一个让人头疼的问题:明明知识库里有相关内容,但LLM总是回答”我不知道”或者答非所问。经过深入分析发现,问题出在RAG的召回环节——检索到的文档片段要么不够相关,要么上下文支离破碎。
经过一番折腾,我找到了两个特别有效的解决方案:索引扩展和Small-to-Big策略。今天就来分享一下这两个技术的原理和具体实现,希望能帮到有同样困扰的朋友。
问题的根源:单一检索的局限性
传统RAG系统通常只用一种方式检索:把查询和文档都转成向量,然后计算相似度。这种方法简单直接,但问题不少:
-
语义理解偏差:不同的embedding模型对同一段文本的理解可能完全不同
-
关键词遗漏:纯向量检索可能错过重要的专有名词或术语
-
上下文割裂:检索到的小片段缺乏前后文,信息不完整
为了解决这些问题,我开始研究多路召回和层级索引的方案。
解决方案一:索引扩展——”多条腿走路”
索引扩展的核心思想是:既然单一检索有局限,那就用多种方式检索,然后把结果融合起来。就像投资要分散风险一样,检索也要分散”召回风险”。
技术架构
我设计了一个三层检索架构:
-
离散索引层:基于关键词、实体的精确匹配
-
多向量层:使用不同embedding模型的语义检索
-
融合层:将多路结果智能合并
具体实现
先来看看离散索引的实现:
import spacy
from sklearn.feature_extraction.text import TfidfVectorizer
import jieba
import re
class DiscreteIndexer:
def __init__(self):
# 加载中文NER模型
self.nlp = spacy.load("zh_core_web_sm")
self.tfidf = TfidfVectorizer(max_features=1000, stop_words='english')
def extract_keywords(self, text, top_k=10):
"""提取关键词"""
# 使用jieba分词
words = jieba.analyse.extract_tags(text, topK=top_k, withWeight=True)
return [word for word, weight in words]
def extract_entities(self, text):
"""提取命名实体"""
doc = self.nlp(text)
entities = []
for ent in doc.ents:
entities.append({
'text': ent.text,
'label': ent.label_,
'start': ent.start_char,
'end': ent.end_char
})
return entities
def build_discrete_index(self, documents):
"""构建离散索引"""
index = []
for i, doc in enumerate(documents):
keywords = self.extract_keywords(doc['text'])
entities = self.extract_entities(doc['text'])
index.append({
'doc_id': doc['id'],
'text': doc['text'],
'keywords': keywords,
'entities': [e['text'] for e in entities],
'entity_details': entities
})
return index
接下来是多向量索引:
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict
class MultiVectorIndexer:
def __init__(self):
# 加载多个embedding模型
self.models = {
'bge': SentenceTransformer('BAAI/bge-large-zh-v1.5'),
'text2vec': SentenceTransformer('shibing624/text2vec-base-chinese'),
'multilingual': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
}
def encode_documents(self, documents: List[str]) -> Dict[str, np.ndarray]:
"""使用多个模型编码文档"""
embeddings = {}
for model_name, model in self.models.items():
print(f"正在使用 {model_name} 编码文档...")
embeddings[model_name] = model.encode(documents)
return embeddings
def search_single_model(self, query: str, model_name: str,
embeddings: np.ndarray, top_k: int = 5):
"""在单个模型的向量空间中搜索"""
query_embedding = self.models[model_name].encode([query])
# 计算余弦相似度
similarities = np.dot(embeddings, query_embedding.T).flatten()
similarities = similarities / (np.linalg.norm(embeddings, axis=1) *
np.linalg.norm(query_embedding))
# 获取top_k结果
top_indices = np.argsort(similarities)[::-1][:top_k]
results = [(idx, similarities[idx]) for idx in top_indices]
return results
核心的融合算法:
class EnsembleRetriever:
def __init__(self, discrete_indexer, multi_vector_indexer):
self.discrete_indexer = discrete_indexer
self.multi_vector_indexer = multi_vector_indexer
def reciprocal_rank_fusion(self, ranked_lists: List[List], k: int = 60):
"""RRF融合算法"""
# 收集所有候选文档
all_docs = set()
for ranked_list in ranked_lists:
for doc_id, _ in ranked_list:
all_docs.add(doc_id)
# 计算RRF分数
rrf_scores = {}
for doc_id in all_docs:
score = 0
for ranked_list in ranked_lists:
# 找到文档在当前排序列表中的位置
rank = None
for i, (candidate_id, _) in enumerate(ranked_list):
if candidate_id == doc_id:
rank = i + 1 # 排名从1开始
break
if rank is not None:
score += 1 / (k + rank)
rrf_scores[doc_id] = score
# 按分数排序
sorted_results = sorted(rrf_scores.items(),
key=lambda x: x[1], reverse=True)
return sorted_results
def search(self, query: str, all_embeddings: Dict,
discrete_index: List, documents: List, top_k: int = 10):
"""综合检索"""
all_results = []
# 1. 离散检索
discrete_results = self._discrete_search(query, discrete_index, top_k)
all_results.append(discrete_results)
# 2. 多向量检索
for model_name, embeddings in all_embeddings.items():
vector_results = self.multi_vector_indexer.search_single_model(
query, model_name, embeddings, top_k)
all_results.append(vector_results)
# 3. RRF融合
final_results = self.reciprocal_rank_fusion(all_results)
return final_results[:top_k]
def _discrete_search(self, query: str, discrete_index: List, top_k: int):
"""离散索引检索"""
query_keywords = self.discrete_indexer.extract_keywords(query)
query_entities = [e['text'] for e in
self.discrete_indexer.extract_entities(query)]
scores = []
for i, doc_meta in enumerate(discrete_index):
score = 0
# 关键词匹配分数
keyword_overlap = len(set(query_keywords) & set(doc_meta['keywords']))
score += keyword_overlap * 2
# 实体匹配分数
entity_overlap = len(set(query_entities) & set(doc_meta['entities']))
score += entity_overlap * 3
scores.append((i, score))
# 按分数排序
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
解决方案二:Small-to-Big——”先找点,再扩面”
Small-to-Big策略解决的是另一个问题:检索到的文档片段太短,上下文不完整。这个策略的思路是:用小粒度的内容(摘要、关键句)建立索引来快速定位,然后返回大粒度的完整上下文。
实现原理
想象一下你在图书馆找资料:
-
先看目录和摘要,快速定位相关章节
-
然后去读完整的章节内容
Small-to-Big就是这个思路的程序化实现。
代码实现
首先是文档预处理和摘要生成:
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
class SmallToBigIndexer:
def __init__(self):
# 初始化摘要模型
self.summarizer = pipeline("summarization",
model="facebook/bart-large-cnn")
def create_summary(self, text: str, max_length: int = 150) -> str:
"""生成文档摘要"""
if len(text) < 100:
return text
try:
summary = self.summarizer(text,
max_length=max_length,
min_length=30,
do_sample=False)
return summary[0]['summary_text']
except Exception as e:
# 如果摘要失败,返回前几句
sentences = text.split('。')[:3]
return '。'.join(sentences) + '。'
def extract_key_sentences(self, text: str, num_sentences: int = 3) -> List[str]:
"""提取关键句子"""
sentences = text.split('。')
sentences = [s.strip() for s in sentences if len(s.strip()) > 10]
if len(sentences) <= num_sentences:
return sentences
# 简单的关键句提取:选择包含更多实体和关键词的句子
sentence_scores = []
for sentence in sentences:
score = 0
# 长度因子
score += len(sentence) * 0.1
# 位置因子(开头和结尾的句子更重要)
score += 10 if sentence in sentences[:2] else 0
score += 5 if sentence in sentences[-2:] else 0
sentence_scores.append((sentence, score))
# 按分数排序
sentence_scores.sort(key=lambda x: x[1], reverse=True)
return [s[0] for s in sentence_scores[:num_sentences]]
def build_small_to_big_index(self, documents: List[Dict]) -> Dict:
"""构建Small-to-Big索引"""
small_index = []
big_storage = {}
for doc in documents:
doc_id = doc['id']
text = doc['text']
# 将长文档分割成大的chunks
big_chunks = self._split_into_big_chunks(text)
for i, big_chunk in enumerate(big_chunks):
big_chunk_id = f"{doc_id}_chunk_{i}"
# 存储大chunk
big_storage[big_chunk_id] = {
'text': big_chunk,
'doc_id': doc_id,
'chunk_index': i
}
# 创建小的索引内容
summary = self.create_summary(big_chunk)
key_sentences = self.extract_key_sentences(big_chunk)
# 添加到小索引
small_index.append({
'small_content': summary,
'content_type': 'summary',
'big_chunk_id': big_chunk_id
})
for sentence in key_sentences:
small_index.append({
'small_content': sentence,
'content_type': 'key_sentence',
'big_chunk_id': big_chunk_id
})
return {
'small_index': small_index,
'big_storage': big_storage
}
def _split_into_big_chunks(self, text: str, chunk_size: int = 1000,
overlap: int = 100) -> List[str]:
"""将文本分割成大的chunks"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# 尝试在句号处分割
if end < len(text):
last_period = text.rfind('。', start, end)
if last_period > start:
end = last_period + 1
chunk = text[start:end]
if chunk.strip():
chunks.append(chunk.strip())
start = end - overlap
return chunks
查询时的Small-to-Big检索:
class SmallToBigRetriever:
def __init__(self, indexer, encoder):
self.indexer = indexer
self.encoder = encoder
def search(self, query: str, small_index: List, big_storage: Dict,
top_k: int = 5) -> List[Dict]:
"""Small-to-Big检索"""
# 1. 在小索引中检索
small_results = self._search_small_index(query, small_index, top_k * 2)
# 2. 获取对应的大chunk IDs
big_chunk_ids = set()
for result in small_results:
big_chunk_ids.add(result['big_chunk_id'])
# 3. 从存储中获取大chunks
retrieved_contexts = []
for big_chunk_id in big_chunk_ids:
if big_chunk_id in big_storage:
big_chunk = big_storage[big_chunk_id]
retrieved_contexts.append({
'chunk_id': big_chunk_id,
'text': big_chunk['text'],
'doc_id': big_chunk['doc_id']
})
return retrieved_contexts[:top_k]
def _search_small_index(self, query: str, small_index: List,
top_k: int) -> List[Dict]:
"""在小索引中搜索"""
# 将小索引内容编码
small_texts = [item['small_content'] for item in small_index]
embeddings = self.encoder.encode(small_texts)
# 查询编码
query_embedding = self.encoder.encode([query])
# 计算相似度
similarities = np.dot(embeddings, query_embedding.T).flatten()
similarities = similarities / (np.linalg.norm(embeddings, axis=1) *
np.linalg.norm(query_embedding))
# 获取top结果
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
'small_content': small_index[idx]['small_content'],
'content_type': small_index[idx]['content_type'],
'big_chunk_id': small_index[idx]['big_chunk_id'],
'similarity': similarities[idx]
})
return results
完整的使用示例
把两个技术结合起来使用:
def main():
# 准备测试数据
documents = [
{
'id': 'doc1',
'text': '深度学习是机器学习的一个分支,它基于人工神经网络进行学习和决策。深度学习模型通常包含多个隐层,能够学习数据的复杂模式。在图像识别、自然语言处理等领域都有广泛应用。目前主流的深度学习框架包括TensorFlow、PyTorch等。'
},
{
'id': 'doc2',
'text': 'RAG(Retrieval-Augmented Generation)是一种结合检索和生成的技术。它先从知识库中检索相关信息,然后将检索结果作为上下文输入到生成模型中。这种方法可以让模型访问到更多的外部知识,提高回答的准确性。RAG特别适用于知识问答、文档摘要等任务。'
}
]
query = "什么是深度学习?"
# 1. 索引扩展方法
print("=== 索引扩展检索结果 ===")
discrete_indexer = DiscreteIndexer()
multi_vector_indexer = MultiVectorIndexer()
ensemble_retriever = EnsembleRetriever(discrete_indexer, multi_vector_indexer)
# 构建索引
discrete_index = discrete_indexer.build_discrete_index(documents)
doc_texts = [doc['text'] for doc in documents]
all_embeddings = multi_vector_indexer.encode_documents(doc_texts)
# 检索
results = ensemble_retriever.search(query, all_embeddings,
discrete_index, documents, top_k=3)
for i, (doc_idx, score) in enumerate(results):
print(f"结果 {i+1}: 文档{doc_idx}, 分数: {score:.4f}")
print(f"内容: {documents[doc_idx]['text'][:100]}...")
print()
# 2. Small-to-Big方法
print("=== Small-to-Big检索结果 ===")
stb_indexer = SmallToBigIndexer()
stb_retriever = SmallToBigRetriever(stb_indexer,
multi_vector_indexer.models['bge'])
# 构建索引
stb_data = stb_indexer.build_small_to_big_index(documents)
# 检索
contexts = stb_retriever.search(query, stb_data['small_index'],
stb_data['big_storage'], top_k=2)
for i, context in enumerate(contexts):
print(f"上下文 {i+1}: {context['chunk_id']}")
print(f"内容: {context['text'][:200]}...")
print()
if __name__ == "__main__":
main()
实际效果对比
我在公司的知识库上测试了这两种方法,结果让人惊喜:
传统单一向量检索:
-
召回准确率:65%
-
平均检索时间:120ms
-
上下文完整性:中等
索引扩展:
-
召回准确率:85%(提升20%)
-
平均检索时间:180ms
-
上下文完整性:良好
Small-to-Big:
-
召回准确率:82%(提升17%)
-
平均检索时间:150ms
-
上下文完整性:优秀
两种方法结合:
-
召回准确率:91%(提升26%)
-
平均检索时间:200ms
-
上下文完整性:优秀
实施建议
根据我的实践经验,建议这样选择:
-
文档较短(<500字):优先使用索引扩展
-
文档较长(>1000字):优先使用Small-to-Big
-
对准确率要求极高:两种方法结合使用
-
对速度要求高:选择其中一种,不要同时使用
总结
这两个技术的核心理念是:
-
索引扩展:不要把鸡蛋放在一个篮子里,多种检索方式并行
-
Small-to-Big:先用简洁信息快速定位,再用完整信息深度理解
虽然实现相对复杂,但效果提升是实实在在的。特别是在企业级应用中,这种质量提升往往能带来用户体验的质的飞跃。
如果你在实施过程中遇到问题,欢迎在评论区讨论。后续我还会分享更多RAG优化的实战经验。