一、前置条件检查
-
版本兼容性
RAGFlow版本 Milvus版本 Python SDK版本 ≥v1.2.0 2.4.x pymilvus≥2.4.0 ≥v1.3.5 2.5.x pymilvus≥2.5.1 -
网络连通性测试
# 在 RAGFlow 容器内执行
docker exec -it ragflow_container bash
curl -v telnet://milvus-host:19530
nc -zv milvus-host 19530
二、配置变更全流程
-
修改 docker-compose.yaml
services:
ragflow:
environment:
+ VECTOR_DB_TYPE: "milvus"
+ MILVUS_HOST: "milvus-prod"
+ MILVUS_PORT: "19530"
+ MILVUS_USER: "admin"
+ MILVUS_PASSWORD: "SecureP@ss123!"
depends_on:
+ - milvus
+ milvus:
+ image: milvusdb/milvus:v2.5.3
+ ports:
+ - "19530:19530"
+ volumes:
+ - milvus_data:/var/lib/milvus
+ - milvus_conf:/etc/milvus -
创建 Milvus 专用账户
docker exec -it milvus-prod bash
milvus-cli --user root --password milvusroot
CREATE USER 'admin' IDENTIFIED BY 'SecureP@ss123!';
GRANT ALL ON *.* TO admin;
三、向量库迁移实操
-
从 Chroma 导出数据
from chromadb.api import ClientAPI
client = ClientAPI()
collections = client.list_collections()
for col in collections:
data = col.get(include=["embeddings", "metadatas"])
with open(f"/backup/{col.name}.json", "w") as f:
json.dump(data, f) -
向 Milvus 导入数据
from pymilvus import utility, Collection
collection = Collection("rag_docs")
for file in os.listdir("/backup"):
with open(file) as f:
data = json.load(f)
entities = [
{
"id": hash(item["metadata"]["source"]),
"text": item["document"],
"vector": item["embedding"],
"source": item["metadata"]["source"]
} for item in data
]
collection.insert(entities)
utility.wait_for_loading_complete("rag_docs")
四、检索适配层改造
-
修改检索器实现
# 原 Chroma 实现
from chromadb import QueryResult
class ChromaRetriever:
def search(self, query):
return self.collection.query(query_texts=[query])
# 改为 Milvus 实现
from pymilvus import SearchRequest, AnnsField
class MilvusRetriever:
def __init__(self):
self.collection = Collection("rag_docs")
def search(self, query, top_k=5):
req = SearchRequest(
data=[self._encode(query)],
anns_field=AnnsField("vector"),
param={"metric_type": "IP", "params": {"nprobe": 32}},
limit=top_k
)
return self.collection.search(req) -
混合检索增强
def hybrid_search(query):
# 向量检索
vector_results = milvus_retriever.search(query)
# 关键词检索
keyword_results = self._keyword_search(query)
# 结果融合 (Weighted Reciprocal Rank Fusion)
fused = []
for res in [vector_results, keyword_results]:
for i, item in enumerate(res):
score = item.score * (1 / (i + 1))
fused.append((item.id, score))
return sorted(fused, key=lambda x: -x[1])[:10]
五、性能优化配置
-
索引参数优化
index_params = {
"index_type": "GPU_IVF_PQ",
"params": {
"nlist": 4096, # 平衡查询速度和内存
"m": 32, # PQ 子量化器数量
"nbits": 8 # 每个向量的存储位数
},
"metric_type": "IP"
} -
缓存策略
# milvus.yaml 配置
cache:
cache_size: 16GB # 分配内存缓存
insert_buffer_size: 2GB # 写入缓冲区
preload_collection: rag_docs # 启动时预加载
六、验证与监控
-
集成验证脚本
# 测试向量插入
python -c "from pymilvus import Collection; c=Collection('rag_docs'); print(c.num_entities)"
# 测试检索时延
ab -n 1000 -c 10 -p query.json http://ragflow:9380/api/search -
监控看板配置
# Prometheus 配置
- job_name: 'milvus'
static_configs:
- targets: ['milvus-prod:9090']
# Grafana 导入看板
Dashboard ID: 13613 # 官方监控模板
故障排查指南:
-
连接失败 → 检查防火墙规则和SELinux状态 -
检索超时 → 增加 search.params.nprobe值 -
内存不足 → 启用 GPU_IVF_PQ索引减少内存占用 -
数据不一致 → 使用 utility.flush(['rag_docs'])强制同步


