在上一篇dify项目结构说明与win11本地部署之后,本文将带你剖析Dify的后端架构设计思路。
Dify 的后端使用 Python 编写,使用 Flask 作为web框架。使用 SQLAlchemy 作为 ORM,使用 Celery 作为任务队列。授权逻辑通过 Flask-login 进行处理。
一、项目结构
第一次打开Dify的代码目录,随便翻了翻,就把dify的目录功能推测出来了.
api/
├── app.py # 主入口
├── app_factory.py # 初始化逻辑
├── controllers/ # API路由
├── core/ # 核心业务
├── extensions/ # 各种插件
└── services/ # 业务逻辑
├── migrations # 数据库迁移脚本
├── models # 数据库模型和表结构定义
这种布局让新上手项目的开发者能在不看文档的情况下快速找到需要修改的地方,大大降低了熟悉代码的难度。
接口入口
在controllers目录下,是api的视线
-
• console 主要是后端管理平台的api服务 -
• web 对应c端用户的api服务 -
• service_api 开放服务的api -
• inner_api 内部api服务
我们看下console内的__init__.py
文件
bp = Blueprint("console", __name__, url_prefix="/console/api")
api = ExternalApi(bp)
我们可以看到在这个模块下所有的接口都是console/api
开头,在任意一个功能文件的下面都有类似下面的路由注册。手动注册,很累。
api.add_resource(LoginApi, "/login")
api.add_resource(LogoutApi, "/logout")
api.add_resource(EmailCodeLoginSendEmailApi, "/email-code-login")
api.add_resource(EmailCodeLoginApi, "/email-code-login/validity")
api.add_resource(ResetPasswordSendEmailApi, "/reset-password")
api.add_resource(RefreshTokenApi, "/refresh-token")
login
接口,对应的是loginApi
,规范性很强。
我们在后台页面登录的时候,注意模块为
console
,直接在对应的目录下找对应的文件即可。flask 写起来是真麻烦。
二、整体架构:模块化分层的艺术
Dify的其核心设计理念可概括为:
1. 分层设计:严格遵循表现层-应用层-领域层-基础设施层的分层原则(这是为啥我看dify的代码比较亲切,和java比较像,哈哈)
2. 模块化解耦:功能模块高内聚低耦合,像积木一样可自由组合
3. 扩展优先:所有非核心功能都设计为可插拔的扩展
我把所有的模块归纳以后,就是下面的示意图。
基础设施数据库缓存消息队列Celery Worker领域层核心模型业务规则应用层业务服务流程编排表现层API接口参数校验
三、启动分析
api启动分析
我们通过启动命令来分析下
uv run flask run --host 0.0.0.0 --port=5001 --debug
-
• 在这里,我们看到 flask run
这就涉及到flask命令的机制了 -
• 查找环境变量 FLASK_APP
指定的 Python 应用模块(默认依次查找wsgi.py
、app.py
、application.py
) -
• 启动开发服务器(默认使用 werkzeug
) -
• 加载 .flaskenv
或.env
文件(如果存在)
是否非调试模式调试模式启动命令是否为数据库命令?创建迁移专用应用创建完整应用仅初始化数据库相关扩展返回迁移应用检查调试模式启用gevent补丁初始化gRPC gevent补丁psycopg2跳过gevent补丁创建基础应用create_app加载配置初始化24个扩展启动服务器
-
• 在启动的时候,先做了一些判断,如果只是迁移数据库,就直接终止 -
• 然后核心走到了 app_factory类中的create_app
-
• 加载配置那通过 DifyConfig
类把.env
文件加载到了内存,并创建了一个DifyApp
对象app -
• DifyApp
继承了Flask,可以认为app是一个Flask对象 -
• 初始化扩展,就是在 flask
上添加各种各样的功能
扩展功能
以下是整理的extensions
目录下的扩展功能
1. 核心功能扩展:
– ext_database: 数据库支持
– ext_redis: Redis缓存
– ext_celery: 异步任务
– ext_migrate: 数据库迁移
2. 安全相关扩展:
– ext_login: 用户认证
– ext_set_secretkey: 密钥管理
– ext_sentry: 错误监控
3. 性能相关扩展:
– ext_compress: 响应压缩
– ext_proxy_fix: 代理支持
– ext_app_metrics: 应用指标
4. 功能模块扩展:
– ext_blueprints: 路由注册
– ext_commands: CLI命令
– ext_storage: 文件存储
5. 运维相关扩展:
– ext_logging: 日志配置
– ext_otel: OpenTelemetry
– ext_request_logging: 请求日志
我拿路由注册来说
def init_app(app: DifyApp):
# register blueprint routers
from flask_cors import CORS # type: ignore
from controllers.console import bp as console_app_bp
from controllers.files import bp as files_bp
from controllers.inner_api import bp as inner_api_bp
from controllers.service_api import bp as service_api_bp
from controllers.web import bp as web_bp
CORS(
service_api_bp,
allow_headers=["Content-Type", "Authorization", "X-App-Code"],
methods=["GET", "PUT", "POST", "DELETE", "OPTIONS", "PATCH"],
)
app.register_blueprint(service_api_bp)
-
• 首先 通过把controllers目录下的包都导入了进来 -
• 然后通过 CORS,给对应的包的路径增加跨域设置 -
• 最后通过 app.register_blueprint
将这个目录对应的接口都注册到flask中
任务处理启动
uv run celery -A app.celery worker -P solo --without-gossip --without-mingle -Q dataset,generation,mail,ops_trace --loglevel INFO
我们来让ai分析下这个启动命令都是什么意思。
-
• celery
: Celery命令行工具 -
• -A app.celery
: 指定Celery应用模块,表示入口模块,具体指什么下面再说 -
• worker
: 启动worker进程 -
• -P solo
: 使用solo池(单进程) -
• --without-gossip
: 禁用worker之间的gossip通信 -
• --without-mingle
: 禁用worker启动时的mingle同步 -
• -Q dataset,generation,mail,ops_trace
: 只监听这4个队列 -
• --loglevel INFO
: 设置日志级别为INFO
ExtensionsFlask实例(app变量)app模块(app.py)Celery CLIExtensionsFlask实例(app变量)app模块(app.py)Celery CLI导入app模块访问celery属性查找extensions["celery"]返回Celery实例返回celery属性值提供Celery应用
这边就有点绕,我在想这个是app.py?还是app实例属性?通过上面的流程图可以看到
-
• app.celery
最终指向的是通过celery = app.extensions["celery"]
获取到的的celery
我们看下celery里是什么
def init_app(app: DifyApp) -> Celery:
classFlaskTask(Task):
def__call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
returnself.run(*args, **kwargs)
celery_app = Celery(
app.name,
task_cls=FlaskTask,
broker=dify_config.CELERY_BROKER_URL,
backend=dify_config.CELERY_BACKEND,
task_ignore_result=True,
)
celery_app.set_default()
app.extensions["celery"] = celery_app
在ext_celery.app
中知识创建了一个celery_app实例,然后绑定到了flask上。然后我们要好好的了解下celery。
celery
Celery 是一个强大的分布式任务队列系统,用于处理异步任务和定时任务。
基本组件
-
• 任务 (Task): 需要异步执行的函数 -
• Worker: 执行任务的进程 -
• Broker: 消息中间件,负责传递任务 (常用 Redis/RabbitMQ) -
• Backend: 存储任务结果 (可选,常用 Redis/RabbitMQ/数据库)
回到dify中,我们根据CELERY_BROKER_URL
可以找到.env
中找到
CELERY_BROKER_URL=redis://:difyai123456@localhost:${REDIS_PORT}/1
说明dify的中的任务传递是靠redis来的。
根据CELERY_BACKEND
没有找到配置,但是看代码,默认是database
class CeleryConfig(DatabaseConfig):
CELERY_BACKEND: str = Field(
description="Backend for Celery task results. Options: 'database', 'redis'.",
default="database",
)
CELERY_BROKER_URL: Optional[str] = Field(
description="URL of the message broker for Celery tasks.",
default=None,
)
然后我就在dify中找任务是怎么和celery关联上的,没找到,然后问了下大模型。celery在启动的时候自动加载tasks目录下的任务。
任务注册表tasks模块Celery AppCelery Worker启动任务注册表tasks模块Celery AppCelery Worker启动加载Celery应用自动发现任务模块(通过include/自动扫描)@shared_task装饰器执行时注册任务注册完成Worker开始监听队列

看日志输出,确实是。然后随便找一个代码
@shared_task(queue="dataset")
def add_document_to_index_task(dataset_document_id: str):
-
• queue是 在@shared_task中定义的,整个就4个queue -
• 从启动日志上看 有个 concurrency: 28 (solo)
28个线程
然后我们通过add_document_to_index_task
反向查找下

我们可以看到在操作文档状态的时候,会添加到任务队列里。
我让DeepSeek给我画了下操作流程
Database BackendCelery WorkerRedis Broker客户端/生产者Database BackendCelery WorkerRedis Broker客户端/生产者1. 发布任务到队列 (LPUSH)2. 轮询队列 (BRPOP)3. 执行任务逻辑4. 存储任务结果 (INSERT/UPDATE)5. 可选: 发送完成事件
当看到这个流程,我想到的是一致性如何保障?
接着看到入队和消费的代码
数据库Celery WorkerRedis主流程数据库Celery WorkerRedis主流程alt[文档存在且状态为complete-d][异常情况]setex(indexing_cache_key, 600, 1)add_document_to_index_task.delay()查询DatasetDocument查询Segments更新Segment状态删除AutoDisableLogdel(indexing_cache_key)标记文档为error状态del(indexing_cache_key)
-
• 在客户端,任务入队,设置了10分钟的分布式锁 -
• 然后通过redis的队列推到了 dataset
队列中 -
• celery worker 从 dataset
队列中消费add_document_to_index_task -
• 在add_document_to_index_task中主要是业务逻辑的处理