NextChat是一个流行的 ChatGPT 网页应用,除了小程序端外,基本上其它的平台都支持,并且对于Markdown、LaTex、mermaid、代码等有着很好的渲染能力。因为dify接口不是OpenAI兼容的,所以可通过插件方式来实现Dify应用的 OpenAI 兼容模式。OpenAI Compatible Dify App扩展可将 Dify 应用 API 转换为 OpenAI 兼容的 API。
本文使用OpenAI Compatible Dify App v0.0.6版本。目前支持 last_user_message
(默认)和all_messages
记忆模式,并且仅适用于 chat(聊天)、Agent(智能体) 和 chatflow(对话流) 类型的应用。
一.配置OpenAI Compatible Dify App插件
1.构建简单的Chatflow示例

2.安装OpenAI Compatible Dify App插件

3.配置OpenAI Compatible Dify App插件
添加一个API端点,如下所示:

说明:记忆模式包括使用最后一条用户消息和使用所有消息。
4.OpenAI客户端测试
from openai import OpenAI
client = OpenAI(api_key="app-vEtiEYF28li8ODpLCkO3cB7B", base_url="http://localhost:5002/e/ltjm77cyt7iheipo")
response = client.chat.completions.create(
model="dify",
messages=[{
"role": "user",
"content": "编写Python异步爬虫教程,包含代码示例和注意事项"
}],
temperature=0.7,
max_tokens=4096
)
print(response.choices[0].message.content)
二.源码运行NextChat
1.修改配置文件


2.安装依赖包和运行
yarn install
yarn dev

3.NextChat配置

需要说明接口地址的配置,OpenAI Compatible Dify App中的完整地址是http://localhost:5002/e/ltjm77cyt7iheipo/chat/completions
,NextChat接口地址填写http://localhost:3002/api/proxy/dify/e/ltjm77cyt7iheipo
,这样配置主要是解决CORS问题。
4.查看对话记录


三.OpenAI Compatible Dify App实现原理
OpenAI Compatible Dify App本质是一个Extension 类型的插件。


1._invoke()方法
整体流程实现了一个 OpenAI 兼容的聊天接口,支持流式和阻塞两种响应模式,并提供了完善的错误处理机制。
def_invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
"""
Invokes the endpoint with the given request.
"""
ifnot self.verify(r, settings):
return Response(
json.dumps({"message": "Unauthorized"}),
status=401,
content_type="application/json",
)
app_id: str = settings.get("app_id", {}).get("app_id", "")
ifnot app_id:
raise ValueError("App ID is required")
ifnot isinstance(app_id, str):
raise ValueError("App ID must be a string")
memory_mode: str = settings.get("memory_mode", "last_user_message")
try:
data = r.get_json()
messages = data.get("messages", [])
stream = data.get("stream", False)
conversation_id, query = self._get_memory(memory_mode, messages)
inputs = data.get("inputs", {})
inputs["messages"] = json.dumps(messages)
if stream:
defgenerator():
response = self.session.app.chat.invoke(
app_id=app_id,
inputs=inputs,
query=query,
response_mode="streaming",
conversation_id=conversation_id,
)
return self._handle_chat_stream_message(app_id, response)
return Response(
generator(),
status=200,
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Transfer-Encoding": "chunked",
},
)
else:
response = self.session.app.chat.invoke(
app_id=app_id,
inputs=inputs,
query=query,
response_mode="blocking",
conversation_id=conversation_id,
)
return Response(
self._handle_chat_blocking_message(app_id, response),
status=200,
content_type="text/html",
)
except ValueError as e:
return Response(f"Error: {e}", status=400, content_type="text/plain")
except Exception as e:
return Response(f"Error: {e}", status=500, content_type="text/plain")
该方法是 OpenAI 兼容接口的核心处理函数,完整流程:
(1)认证验证
-
通过
self.verify(r, settings)
检查请求是否已授权 -
如果验证失败,返回 401 未授权错误响应
(2)参数提取与验证
-
从设置中提取
app_id
,这是必需参数 -
验证
app_id
不为空且必须是字符串类型 -
获取
memory_mode
设置,默认值为 “last_user_message”
(3)请求处理
-
解析请求的 JSON 数据
-
提取消息列表
messages
和流式处理标志stream
-
调用
self._get_memory()
根据记忆模式和消息获取会话 ID 和查询内容 -
获取输入参数并将消息添加到
inputs
中
(4)分支处理
根据 stream
参数值走不同的处理流程:
-
流式响应 (stream=True)
-
定义生成器函数用于流式处理
-
调用
self.session.app.chat.invoke
发起流式聊天请求 -
处理流式响应并返回
text/event-stream
类型的数据流 -
设置适当的 HTTP 头部:无缓存和分块传输编码
-
阻塞响应 (stream=False)
-
调用
self.session.app.chat.invoke
发起阻塞式聊天请求 -
处理响应并返回 HTML 内容类型的响应
(5)错误处理
-
捕获
ValueError
异常,返回 400 错误响应 -
捕获其他所有异常,返回 500 服务器错误响应
2.messages_to_text()方法
该方法用于将OpenAI格式的消息列表转换为格式化的文本块。messages
是OpenAI格式的消息列表,每个消息是包含role
和content
键的字典。
defmessages_to_text(self, messages: list[dict[str, Any]]) -> str:
"""
Convert a list of messages to a formatted text block.
:param messages: A list of dictionaries formatted as OpenAI messages.
:return: A string containing the formatted conversation.
"""
text_block = []
for message in messages:
role = message.get("role")
content = message.get("content")
if role and content:
text_block.append(f"{role.upper()}:n{content}")
return"n".join(text_block)
(1)初始化空列表text_block
,用于存储格式化后的消息文本
(2)遍历消息列表中的每条消息:
(3)最终使用换行符n
连接所有格式化后的消息,返回完整的文本块
这种格式便于将结构化的消息转换为可读性强的文本,适用于日志记录或在不支持结构化消息的场景中展示对话内容。输出格式示例,如下所示:
USER:
用户的消息内容
ASSISTANT:
助手的回复内容
SYSTEM:
系统提示内容
3.convert_to_openai_messages()方法
这个方法实现了从自定义文本格式到 OpenAI API 所需 JSON 格式的转换,使应用能够适配 OpenAI 的接口规范。
假设有以下原始文本字符串:
SYSTEM: 你是一个有用的AI助手。
USER: 你好,能帮我解释一下Python的列表推导式吗?
ASSISTANT: 当然!Python列表推导式是一种简洁创建列表的方法。基本语法是:[表达式 for 变量 in 可迭代对象 if 条件]。例如:[x*2 for x in range(5)] 会生成 [0, 2, 4, 6, 8]。
USER: 谢谢,那字典推导式呢?
方法处理后会生成以下格式的列表:
[
{
"role": "system",
"content": "你是一个有用的AI助手。"
},
{
"role": "user",
"content": "你好,能帮我解释一下Python的列表推导式吗?"
},
{
"role": "assistant",
"content": "当然!Python列表推导式是一种简洁创建列表的方法。基本语法是:[表达式 for 变量 in 可迭代对象 if 条件]。例如:[x*2 for x in range(5)] 会生成 [0, 2, 4, 6, 8]。"
},
{
"role": "user",
"content": "谢谢,那字典推导式呢?"
}
]
这个格式完全符合 OpenAI API 所需的消息格式,可以直接用于发送请求。
defconvert_to_openai_messages(self, raw_message: str) -> list[dict]:
"""
Convert a raw message string to a list of messages suitable for OpenAI API.
:param raw_message: A string containing the conversation messages.
:return: A list of dictionaries formatted as OpenAI messages.
"""
messages = []
lines = raw_message.strip().split('n')
for line in lines:
if line.startswith("SYSTEM:"):
messages.append({"role": "system", "content": line[len("SYSTEM:"):].strip()})
elif line.startswith("USER:"):
messages.append({"role": "user", "content": line[len("USER:"):].strip()})
elif line.startswith("ASSISTANT:"):
messages.append({"role": "assistant", "content": line[len("ASSISTANT:"):].strip()})
return messages
(1)参数接收
接收一个 raw_message
字符串参数,包含格式化的对话内容。
(2)初始化
-
创建空列表
messages
用于存储转换后的消息 -
使用
strip()
去除字符串首尾空白,并用split('n')
按行分割
(3)消息解析
逐行遍历文本,根据前缀识别不同角色的消息:
-
“SYSTEM:” 开头 → 系统角色消息
-
“USER:” 开头 → 用户角色消息
-
“ASSISTANT:” 开头 → 助手角色消息
(4)消息格式转换
-
对每行消息,使用
line[len("PREFIX:"):].strip()
提取实际内容 -
创建符合 OpenAI 格式的字典
{"role": xxx, "content": xxx}
-
将字典添加到
messages
列表
(5)返回结果
返回包含所有格式化消息的列表。
4._get_memory()方法
该方法在处理OpenAI兼容API请求时,根据配置的记忆模式提取适当的查询内容,以便传递给后续的聊天调用。不同的记忆模式决定了AI模型接收到的上下文信息量。
def_get_memory(
self, memory_mode: str, messages: list[dict[str, Any]]
) -> tuple[str, str]:
"""
Get the memory from the messages
returns:
- conversation_id: str
- query: str
"""
if memory_mode == "last_user_message":
user_message = ""
for message in reversed(messages):
if message.get("role") == "user":
user_message = message.get("content")
break
ifnot user_message:
raise ValueError("No user message found")
return"", user_message
elif memory_mode == "all_messages":
return"", self.messages_to_text(messages)
else:
raise ValueError(
f"Invalid memory mode: {memory_mode}, only support last_user_message for now"
)
(1)参数
-
memory_mode
: 字符串类型,指定记忆模式 -
messages
: 消息列表,每条消息是包含角色和内容的字典
(2)返回值
返回一个元组 (conversation_id, query)
,包含:
-
conversation_id
: 会话ID(目前实现中总是返回空字符串) -
query
: 提取出的查询内容
(3)memory_mode
为"last_user_message"
-
从后向前遍历消息列表
-
查找第一条角色为
"user"
的消息 -
提取其内容作为
user_message
-
如果未找到用户消息,抛出异常
-
返回
("", user_message)
(4)memory_mode
为"all_messages"
-
调用
messages_to_text()
方法将所有消息转换为格式化文本 -
返回
("", formatted_text)
(5)其它模式
抛出异常,提示只支持 "last_user_message"
模式。
四.流式响应_handle_chat_stream_message()方法
这个方法负责将Dify AI的流式响应转换为OpenAI兼容的格式,采用Server-Sent Events(SSE)方式向客户端推送数据。
注解:Server-Sent Events(SSE)简介
Server-Sent Events(SSE)是一种服务器推送技术,允许服务器通过HTTP连接向客户端实时发送消息流。SSE使用特定格式(
data: {json数据}nn
)进行数据传输,每条消息以”data:”开头并以两个换行符结束。这种技术特别适用于流式聊天应用,服务器可以逐步发送消息内容、结束标志和文件链接等不同类型的事件,而客户端只需建立一次连接即可持续接收数据,最后以”[DONE]”信号表示流结束。与WebSocket相比,SSE更轻量,但仅支持单向通信(服务器到客户端)。EventSource
是浏览器原生支持的SSE客户端实现,通过简单的JavaScript API创建持久连接,自动接收服务器推送的事件。
def_handle_chat_stream_message(
self, app_id: str, generator: Generator[dict[str, Any], None, None]
) -> Generator[str, None, None]:
"""
Handle the chat stream
"""
message_id = ""
for data in generator:
if data.get("event") == "agent_message"or data.get("event") == "message":
message = {
"id": "chatcmpl-" + data.get("message_id", "none"),
"object": "chat.completion.chunk",
"created": int(data.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": data.get("answer", ""),
},
"finish_reason": None,
}
],
}
message_id = message.get("id", "none")
yieldf"data: {json.dumps(message)}nn"
elif data.get("event") == "message_end":
message = {
"id": "chatcmpl-" + data.get("message_id", "none"),
"object": "chat.completion.chunk",
"created": int(data.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
"usage": {
"completion_tokens": data.get("metadata", {})
.get("usage", {})
.get("completion_tokens", 0),
"prompt_tokens": data.get("metadata", {})
.get("usage", {})
.get("prompt_tokens", 0),
"total_tokens": data.get("metadata", {})
.get("usage", {})
.get("total_tokens", 0),
},
}
yieldf"data: {json.dumps(message)}nn"
elif data.get("event") == "message_file":
url = data.get("url", "")
message = {
"id": "chatcmpl-" + message_id,
"object": "chat.completion.chunk",
"created": int(data.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": f"[{data.get('id', 'none')}]({url})",
},
}
],
}
yieldf"data: {json.dumps(message)}nn"
yield"data: [DONE]nn"
1.方法签名
def_handle_chat_stream_message(
self, app_id: str, generator: Generator[dict[str, Any], None, None]
) -> Generator[str, None, None]:
-
参数:
-
app_id
: 应用的唯一标识符 -
generator
: 包含响应数据的生成器对象 -
返回值: 产生SSE格式字符串的生成器
2.初始化变量
初始化一个空的消息ID变量,用于后续跟踪消息。
message_id = ""
3.迭代处理响应数据
代码根据事件类型(event
字段)将响应分为三种情况处理:
(1)消息事件 – agent_message
或 message
当收到常规消息时:
message = {
"id": "chatcmpl-" + data.get("message_id", "none"),
"object": "chat.completion.chunk",
# 其它字段...
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": data.get("answer", ""),
},
"finish_reason": None,
}
],
}
这部分构建了一个OpenAI兼容的响应格式,包含消息内容和元数据,并记录当前消息ID。
(2)消息结束事件 – message_end
当一条消息完成时:
message = {
# 消息头部信息...
"choices": [
{
"index": 0,
"delta": {}, # 空delta表示内容结束
"finish_reason": "stop",
}
],
"usage": {
# 包含token使用统计信息
"completion_tokens": data.get("metadata", {}).get("usage", {}).get("completion_tokens", 0),
# 其它使用统计...
},
}
这部分表示消息已结束,并提供令牌使用统计信息。
(3)文件消息事件 – message_file
当响应包含文件时:
url = data.get("url", "")
message = {
# 消息头部信息...
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": f"[{data.get('id', 'none')}]({url})", # Markdown格式链接
},
}
],
}
这部分处理文件URL,并以Markdown格式[id](url)
呈现文件链接。
4.完成整个流式响应
yield"data: [DONE]nn"
所有数据处理完后,发送[DONE]
标记表示流式响应结束。
5.输出格式
每次产生的数据都按SSE格式输出:
data: {JSON格式的消息}nn
这个方法巧妙地将Dify AI的响应格式转换为与OpenAI API完全兼容的格式,使客户端能够无缝集成。
五.阻塞响应_handle_chat_blocking_message()方法
这个函数负责将Dify API的非流式(阻塞式)响应转换为OpenAI API兼容的格式。它是OpenAI兼容层的关键部分,确保Dify API的响应能够以OpenAI格式返回,使得原本为OpenAI设计的客户端可以无缝使用Dify的服务。
def_handle_chat_blocking_message(
self, app_id: str, response: dict[str, Any]
) -> str:
"""
Handle the chat blocking message
"""
message = {
"id": "chatcmpl-" + response.get("id", "none"),
"object": "chat.completion",
"created": int(response.get("created", 0)),
"model": "gpt-3.5-turbo",
"system_fingerprint": "difyai",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": response.get("answer", ""),
},
"finish_reason": "stop",
}
],
"usage": {
"completion_tokens": response.get("metadata", {})
.get("usage", {})
.get("completion_tokens", 0),
"prompt_tokens": response.get("metadata", {})
.get("usage", {})
.get("prompt_tokens", 0),
},
}
return json.dumps(message)
1.函数签名
-
接收两个参数:
app_id
(应用ID)和response
(Dify API的原始响应) -
返回转换后的JSON字符串
2.转换过程
-
创建一个符合OpenAI API格式的
message
字典 -
设置基本信息:
-
id
: 添加”chatcmpl-“前缀到原始响应ID -
object
: 设为”chat.completion”(OpenAI标准类型) -
created
: 转换时间戳为整数 -
model
: 设为”gpt-3.5-turbo” -
system_fingerprint
: 设为”difyai” -
设置响应内容:
-
在
choices
数组中添加标准结构 -
包含助手角色和回答内容
-
设置
finish_reason
为”stop” -
添加使用统计:
-
completion_tokens
: 完成部分的token数 -
prompt_tokens
: 提示部分的token数 -
从原始响应的
metadata.usage
中提取数据
3.结果返回
使用json.dumps
将字典转换为JSON字符串并返回。
参考文献
[0] 将Dify对接到NextChat中的具体操作和原理:https://z0yrmerhgi8.feishu.cn/wiki/QthjwpLNiiQ63DkD9UTccuJOnzh
[1] OpenAI Compatible Dify App:https://marketplace.dify.ai/plugins/langgenius/oaicompat_dify_app
[2] https://github.com/ChatGPTNextWeb/NextChat
[3] Extension 插件:https://docs.dify.ai/plugin-dev-zh/9231-extension-plugin
知识星球:Dify源码剖析及答疑,Dify扩展系统源码,AI书籍课程|AI报告论文,公众号付费资料。加微信buxingtianxia21
进NLP工程化资料群,以及Dify交流群。