后续我会对ragflow的源码进行深入的拆解,深入源码拆解之前我们先对ragflow的项目结构有一个大概的了解。
ragflow目录解说
E:aicoderagflow/
├── agent/
├── agentic_reasoning/
├── api/
├── conf/
├── deepdoc/
├── docker/
├── docs/
├── example/
├── graphrag/
├── helm/
├── intergrations/
├── nltk_data
├── rag/
├── sdk/
└── web/
└── uv.lock
-
• agent目录主要是 智能体相关的实现 -
• agentic_reasoning目录是0.17.2更新的推理能力的实现 -
• api目录主要是后端api -
• conf是源码启动的时候的配置文件 -
• deepdoc目录是 deepdoc的实现,解析文档(偷偷的告诉你们后续会用视觉模型处理文档) -
• docker目录 快速验证部署的docker配置 -
• docs目录是 ragflow的文档库 -
• example目录主要是接口和sdk操作dataset的示例 -
• graphrag目录是 知识图谱实现 -
• helmk8s配置 -
• intergrations目录中ragflow 与 ChatGPT-on-WeChat 对接 -
• nltk_data目录为下载的nltk数据,docker构建的时候创建 -
• rag目录是rag实现 -
• sdk目录是ragflow封装的python sdk -
• web主要是ragflow的前端项目 -
• uv.lockpython项目的依赖组件
接下来我详细介绍下使用的几个目录。
api 相关介绍
api可能是二开最常用的模块了。

-
• apps目录主要是 ragflow对外暴露的接口 -
• sdk目录是对外提供的api,主要是操作知识库和一些扩展 -
• *_app.py的文件都是对ragflow的web提供的接口 -
• db目录是操作数据库的封装 -
• utils是封装的一些工具
启动文件与apps包
ragflow_server.py是ragflow api服务的启动入口文件。核心代码如下:
settings.init_settings()
# init db
init_web_db()
init_web_data()
# start http server
try:
logging.info("RAGFlow HTTP server start...")
run_simple(
hostname=settings.HOST_IP,
port=settings.HOST_PORT,
application=app,
threaded=True,
use_reloader=RuntimeConfig.DEBUG,
use_debugger=RuntimeConfig.DEBUG,
)
except Exception:
-
• 基于Werkzeug:使用 run_simple启动Flask应用(app对象来自api.apps模块) -
• 支持多线程模式( threaded=True) -
• 开发模式下支持热重载( use_reloader)和调试器(use_debugger) -
• 信号处理:捕获 SIGINT和SIGTERM信号实现优雅关闭
在settings.py中做了很多初始化的工作
def init_settings():
global DOC_ENGINE, docStoreConn, retrievaler, kg_retrievaler
#文档引擎默认使用elasticsearch
DOC_ENGINE = os.environ.get('DOC_ENGINE', "elasticsearch")
lower_case_doc_engine = DOC_ENGINE.lower()
if lower_case_doc_engine == "elasticsearch":
docStoreConn = rag.utils.es_conn.ESConnection()
elif lower_case_doc_engine == "infinity":
docStoreConn = rag.utils.infinity_conn.InfinityConnection()
else:
raise Exception(f"Not supported doc engine: {DOC_ENGINE}")
retrievaler = search.Dealer(docStoreConn)
kg_retrievaler = kg_search.KGSearch(docStoreConn)
-
• 从代码可以看出文档引擎默认使用elasticsearch -
• kg_search 为知识图谱的检索
我们接着看下app目录下的__init__.py文件
def search_pages_path(pages_dir):
app_path_list = [
path for path in pages_dir.glob("*_app.py") ifnot path.name.startswith(".")
]
api_path_list = [
path for path in pages_dir.glob("*sdk/*.py") ifnot path.name.startswith(".")
]
app_path_list.extend(api_path_list)
return app_path_list
defregister_page(page_path):
path = f"{page_path}"
page_name = page_path.stem.rstrip("_app")
module_name = ".".join(
page_path.parts[page_path.parts.index("api"): -1] + (page_name,)
)
spec = spec_from_file_location(module_name, page_path)
page = module_from_spec(spec)
page.app = app
page.manager = Blueprint(page_name, module_name)
sys.modules[module_name] = page
spec.loader.exec_module(page)
page_name = getattr(page, "page_name", page_name)
sdk_path = "\sdk\"if sys.platform.startswith("win") else"/sdk/"
url_prefix = (
f"/api/{API_VERSION}"if sdk_path in path elsef"/{API_VERSION}/{page_name}"
)
app.register_blueprint(page.manager, url_prefix=url_prefix)
return url_prefix
pages_dir = [
Path(__file__).parent,
Path(__file__).parent.parent / "api" / "apps",
Path(__file__).parent.parent / "api" / "apps" / "sdk",
]
client_urls_prefix = [
register_page(path) fordirin pages_dir for path in search_pages_path(dir)
这里主要做的是通过扫描 api/apps/ 和 api/apps/sdk/ 包下的 *_app.py 和sdk包下文件,通过register_page()动态注册到_blueprint。
-
• 路由规则: -
• 普通路由: /{API_VERSION}/{page_name} -
• 这里的 page_name是document_app.py 移除_app.py后缀后的名称 -
• SDK 路由: /api/{API_VERSION}/...
比如ragflow中的上传文件接口http://localhost:8002/v1/document/upload ,拆解下,就是去document+_app.py 对应的文件,也就是document_app.py文件中找upload,这样我们从前端找接口就好找了。

ragflow的接口的代码逻辑基本上在*_app.py中。
常量文件

在常量文件中:
-
• API_VERSION就是上面介绍的api的版本 -
• SERVICE_CONF是加载配置文件 -
• REQUEST_WAIT_SEC和REQUEST_MAX_WAIT_SEC两个参数没啥用 -
• DATASET_NAME_LIMIT限制数据集的名称大小
db包
核心文件是db_models.py和service目录中的common_service.py。
-
• db_models.py封装了表模型, -
• common_service.py通过Peewee ORM封装了常用数据库的CURD。
基础CRUD基础CRUD基础CRUD基础CRUD高级功能高级功能CommonService查询模块写入模块更新模块删除模块批量操作条件过滤queryget/get_allinsert/saveupdate_by_iddelete_by_idinsert_manyupdate_many_by_idfilter_scope_listfilter_delete
这样在其他的service中能CommonService后,就具备了所有的基础功能
class DocumentService(CommonService):
model = Document
@classmethod
@DB.connection_context()
defget_list(cls, kb_id, page_number, items_per_page,
orderby, desc, keywords, id, name):
docs = cls.model.select().where(cls.model.kb_id == kb_id)
ifid:
docs = docs.where(
cls.model.id == id)
if name:
docs = docs.where(
cls.model.name == name
)
if keywords:
docs = docs.where(
fn.LOWER(cls.model.name).contains(keywords.lower())
)
if desc:
docs = docs.order_by(cls.model.getter_by(orderby).desc())
else:
docs = docs.order_by(cls.model.getter_by(orderby).asc())
count = docs.count()
docs = docs.paginate(page_number, items_per_page)
returnlist(docs.dicts()), count
比如DocumentService通过model=Document 指定操作哪个数据模型(也就是哪张表)能快速实现增删改查。通过自定义扩展其他方法达到自己想要的效果。
utils包
封装的一些工具,就是字面意思。
conf目录
在conf中,我们重点关注service_conf.yaml,基本上ragflow支持什么,在这个文件中都能体现出来。

大家根据字面意思去添加配置即可。这个文件知识源码启动的时候使用,大家如果使用docker修改docker目录中的文件。
graphrag
这个是知识图谱的视线。关注几个文件。
-
• general/index.py抽取入口 -
• entity_resolution.py -
• search.py
index.py
该方法主要是暴露给task_executor用于抽取知识图谱。主要流程如下
CommunityReportsExtractorEntityResolutionElasticSearchNetworkXExtractorGraphRAGClientCommunityReportsExtractorEntityResolutionElasticSearchNetworkXExtractorGraphRAGClientrun_graphrag(doc_data)生成子图(实体&关系)NetworkX子图合并到全局图消歧新增实体生成社区报告存储结果完成通知
async def run_graphrag(row: dict, ...):
# 生成子图后存储到ES
subgraph = await generate_subgraph(...)
# 合并到全局图谱
new_graph = await merge_subgraph(...)
# 实体消歧和社区发现后,最终存储社区报告到ES
await extract_community(...)
generate_subgraph代码中的核心片段
subgraph.graph["source_id"] = [doc_id]
chunk = {
"content_with_weight": json.dumps(nx.node_link_data(subgraph)), # 将NetworkX图序列化为JSON
"knowledge_graph_kwd": "subgraph", # ES索引标识
"kb_id": kb_id,
"source_id": [doc_id],
"available_int": 0,
"removed_kwd": "N",
}
# 写入ES的核心操作
await trio.to_thread.run_sync(
lambda: settings.docStoreConn.insert(
[{"id": chunk_id(chunk), **chunk}],
search.index_name(tenant_id),
kb_id
)
)
-
• knowledge_graph_kwd="subgraph": 标识该数据为子图类型 -
• docStoreConn: RAGFlow封装的文档客户端,这个在api模块中说了 -
• search.index_name(tenant_id): 动态生成索引名(格式如ragflow_{tenant_id}
entity_resolution.py
解决知识图谱中 同一实体多表述 的问题(如"苹果公司" vs "Apple Inc."),确保图谱中每个真实世界实体仅对应唯一节点。
GraphLLMERGraphLLMERalt[存在候选节点][无候选节点]loop[每个待消歧节点]获取新增子图节点(subgraph_nodes)查找相似候选节点(相似度>阈值)发送实体对比请求返回是否同一实体的判断合并节点或保留新节点保留为新节点更新全局图谱
该模块是构建高质量知识图谱的核心组件,通过语义理解与图结构分析的结合,显著提升图谱的准确性和一致性。
search.py
提供给内部的直接查询知识图谱的方法,核心概流程如下
ElasticSearchLLMKGSearchUserElasticSearchLLMKGSearchUserretrieval(question, kb_ids)query_rewrite(question)实体类型&关键词并行发起三类检索原始结果结果融合与评分结构化检索结果
web工程
-
• 通过 package.json可以快速了解前端使用了哪些组件,以及常用的脚本 -
• .env定义了前端端口 -
• .umirc.ts文件主要是源码启动,调用后端的路由配置 -
• public中的logo.svg 可以修改项目的logo -
• conf.json配置前端的中文名称 -
• routes.ts静态路由配置,所有的页面都在pages中,大家可以从路由中快速定位。
前端使用的是react写的,真不想再折腾了,学的太多,太杂了。能改改就行了。
后记
-
• 正常二开,建议只动api工程,然后前端自己套一层,权限做一个前置应用去处理。维护人员是少数,用的最多的还是流程的使用人员 -
• rag这个目录,我得好好的整理下,这个是它的和核心。


