更多AI技术,关注我的github,欢迎点亮星星:
https://github.com/xinyuwei-david/david-share.git
本demo代码使用Chainlit框架。
Chainlit是一个开源的Python框架,用于构建生产级的对话式人工智能(Conversational AI)应用。它允许开发者快速地创建和部署对话式AI或代理应用,支持多平台和数据持久化。
Chainlit的一些关键特点包括:
-
快速构建:可以轻松集成到现有代码库,或者从头开始构建
-
多平台支持:一次编写逻辑,可以在多个平台上使用2
-
数据持久化:收集、监控和分析用户数据2
-
可视化多步推理:理解生成输出的中间步骤2
-
与流行库和框架的集成:支持OpenAI、Mistral AI、Llama Index、LangChain、Autogen、Haystack等。
我们分析代码的主程序:app.py。

代码中,Chainlit 框架封装了所有的接口,使得您需要直接处理底层的网络通信、事件分发或接口实现。
详细说明
1. Chainlit 提供的装饰器封装了事件处理
代码中使用了许多由 Chainlit 提供的装饰器,这些装饰器封装了与前端交互的接口,简化了编程过程。例如:
-
@cl.on_chat_start:当聊天会话开始时触发。 -
@cl.on_message:当收到用户文本消息时触发。 -
@cl.on_audio_start:当用户开始音频输入时触发。 -
@cl.on_audio_chunk:当收到音频数据块时触发。 -
@cl.on_audio_end、@cl.on_chat_end、@cl.on_stop:当音频输入结束、聊天结束或会话停止时触发。这些装饰器使您能够专注于业务逻辑的实现,而不必关心底层的事件监听和处理机制。
2. Chainlit 封装了前后端的通信
在传统的应用程序中,您可能需要自己编写代码来处理前端和后端之间的通信,例如:
-
监听前端的请求或事件。
-
解析前端发送的数据。
-
将处理结果返回给前端。
然而,Chainlit 为您处理了这些细节,它封装了前端与后端之间的通信,使得后端代码可以直接处理事件。
3. Chainlit 提供了上下文和会话管理
通过 cl.user_session,您可以在用户会话中存储和获取数据。这使您能够:
-
为每个用户维护独立的会话状态。
-
存储用户相关的信息,如
track_id、openai_realtime实例等。 -
在不同的事件处理函数之间共享数据。
4. Chainlit 集成了与 OpenAI API 的交互
在您的代码中,您使用了 AsyncAzureOpenAI 客户端与 Azure OpenAI 服务进行交互。Chainlit 可以与这些服务无缝集成,简化了与大型语言模型的交互过程。
5. Chainlit 封装了错误处理和日志记录
-
错误处理:通过提供
@cl.on_error等装饰器,Chainlit 允许您处理在应用程序中发生的异常和错误。 -
日志记录:通过
chainlit.logger模块,您可以方便地记录日志信息,帮助调试和监控应用程序。
这样设计的好处
-
简化开发:您不需要处理复杂的事件循环、网络通信或异步 I/O 操作,只需专注于实现业务逻辑。
-
提高效率:使用 Chainlit 的封装,可以大大减少样板代码(boilerplate code)的数量,使代码更加简洁明了。
-
代码可读性:通过装饰器和上下文管理,代码的结构更加清晰,逻辑更加直观。
-
可扩展性:Chainlit 提供了丰富的功能和插件,方便您在未来扩展应用程序的功能。
总结
-
Chainlit 封装了应用程序与前端的交互接口,包括事件监听、消息传递、音频数据处理等。
-
您的代码主要使用 Chainlit 提供的装饰器和上下文管理来处理各种事件和会话数据。
-
这种封装使得您可以专注于业务逻辑的实现,而不必关心底层的通信和接口细节。
导入模块
import os
import asyncio
from openai import AsyncAzureOpenAI
import chainlit as cl
from uuid import uuid4
from chainlit.logger import logger
from realtime import RealtimeClient
from realtime.tools import tools
解释:
-
os:用于与操作系统交互,主要用来获取环境变量。
-
asyncio:Python 标准库,用于编写异步代码,支持异步 I/O 操作。
-
AsyncAzureOpenAI:从 openai 包中导入,异步的 Azure OpenAI 客户端,用于与 Azure OpenAI 服务进行异步交互。
-
chainlit:一个用于创建聊天机器人的框架,简化了聊天机器人的开发流程。
-
uuid4:用于生成随机的 UUID(通用唯一标识符)。
-
logger:从 chainlit.logger 导入,用于日志记录,方便调试和监控。
-
RealtimeClient 和 tools:从自定义的 realtime 模块中导入,用于实时通信和处理。
配置 Azure OpenAI 客户端
client = AsyncAzureOpenAI(
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
api_version="2024-10-01-preview"
)
解释:
-
这里创建了一个异步的 Azure OpenAI 客户端
client,需要从环境变量中获取 API 密钥、终端、部署名称。这些信息通常在 Azure 上注册服务时获得。 -
api_version指定了使用的 API 版本。
设置 OpenAI 实时客户端
async def setup_openai_realtime(system_prompt: str):
"""Instantiate and configure the OpenAI Realtime Client"""
openai_realtime = RealtimeClient(system_prompt=system_prompt)
cl.user_session.set("track_id", str(uuid4()))
# 以下省略事件处理函数的定义和配置
解释:
-
定义了一个异步函数
setup_openai_realtime,用于实例化和配置 OpenAI 实时客户端。 -
system_prompt:系统提示,用于指导模型的行为(我们不需要详细分析这个部分)。 -
openai_realtime:创建了一个实时客户端实例,传入了系统提示。 -
cl.user_session.set("track_id", str(uuid4())):为用户会话设置一个唯一的track_id,用于跟踪音频流或其他会话相关的信息。
事件处理函数
在 setup_openai_realtime 函数内,定义了一系列异步的事件处理函数,用于处理不同的事件。这些事件处理函数会在特定的事件发生时被调用。
1. handle_conversation_updated
async def handle_conversation_updated(event):
item = event.get("item")
delta = event.get("delta")
"""Currently used to stream audio back to the client."""
if delta:
# Only one of the following will be populated for any given event
if 'audio' in delta:
audio = delta['audio'] # Int16Array, audio added
await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
if 'transcript' in delta:
transcript = delta['transcript'] # string, transcript added
print(transcript)
if 'arguments' in delta:
arguments = delta['arguments'] # string, function arguments added
pass
解释:
-
当对话更新时调用此函数。
-
event:包含事件的数据。 -
delta:表示与上一次事件的差异,可能包含audio、transcript或arguments。处理逻辑:
-
如果
delta包含audio: -
获取音频数据
audio。 -
使用
cl.context.emitter.send_audio_chunk将音频数据发送回客户端,用于音频流的实时传输。 -
如果
delta包含transcript: -
获取转录文本并打印出来。这可能是语音识别后的文本。
-
如果
delta包含arguments: -
获取函数参数。这可能与功能调用或命令有关。
2. handle_item_completed
async def handle_item_completed(item):
"""Used to populate the chat context with transcription once an item is completed."""
# print(item) # TODO
pass
解释:
-
当某个项目(可能是一次对话或消息)完成时调用。
-
当前函数内容为空,仅包含一个
pass,表示未来可能会添加处理逻辑。
3. handle_conversation_interrupt
async def handle_conversation_interrupt(event):
"""Used to cancel the client previous audio playback."""
cl.user_session.set("track_id", str(uuid4()))
await cl.context.emitter.send_audio_interrupt()
解释:
-
当对话被中断时调用,例如用户停止了音频播放或有新的音频需要播放。
-
更新用户会话的
track_id,以标记新的音频流。 -
调用
send_audio_interrupt通知客户端停止当前的音频播放。
4. handle_error
async def handle_error(event): logger.error(event)
解释:
-
当发生错误时调用。
-
使用日志记录器
logger记录错误信息,方便调试。
注册事件处理函数
openai_realtime.on('conversation.updated', handle_conversation_updated)
openai_realtime.on('conversation.item.completed', handle_item_completed)
openai_realtime.on('conversation.interrupted', handle_conversation_interrupt)
openai_realtime.on('error', handle_error)
解释:
-
为
openai_realtime客户端注册事件处理函数,当特定事件发生时,调用相应的处理函数。
添加工具并启动
cl.user_session.set("openai_realtime", openai_realtime)
coros = [openai_realtime.add_tool(tool_def, tool_handler) for tool_def, tool_handler in tools]
await asyncio.gather(*coros)
解释:
-
将
openai_realtime客户端实例保存到用户会话中,方便在其他地方访问。 -
使用列表推导式,为每个工具定义和处理器添加到
openai_realtime中。 -
await asyncio.gather(*coros):异步地运行所有添加工具的协程。
Chainlit 事件处理装饰器
接下来,代码使用了 @cl.on_* 的装饰器,定义了各种事件处理函数,用于响应聊天和音频相关的事件。
1. @cl.on_chat_start
@cl.on_chat_start
async def start():
await cl.Message(
content="Hi, Welcome to Xinyu Company. How can I help you?. Press `P` to talk!"
).send()
await setup_openai_realtime(system_prompt=system_prompt + "nn Customer ID: 333")
解释:
-
当聊天会话开始时调用。
-
发送一条欢迎消息给用户,提示可以按
P说话。 -
调用
setup_openai_realtime函数,传入system_prompt(系统提示,加上了客户 ID)。
2. @cl.on_message
@cl.on_message
async def on_message(message: cl.Message):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.send_user_message_content([{ "type": 'input_text', "text": message.content}])
else:
await cl.Message(content="Please activate voice mode before sending messages!").send()
解释:
-
当收到用户的文本消息时调用。
-
从用户会话中获取
openai_realtime客户端实例。 -
如果实时客户端存在且已连接:
-
调用
send_user_message_content将用户的文本消息发送给实时客户端,类型为input_text。 -
否则:
-
提示用户需要激活语音模式才能发送消息。
3. @cl.on_audio_start
@cl.on_audio_start
async def on_audio_start():
try:
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
# TODO: might want to recreate items to restore context
# openai_realtime.create_conversation_item(item)
await openai_realtime.connect()
logger.info("Connected to OpenAI realtime")
return True
except Exception as e:
await cl.ErrorMessage(content=f"Failed to connect to OpenAI realtime: {e}").send()
return False
解释:
-
当音频输入开始时调用,即用户开始说话或录音。
-
尝试连接到
openai_realtime客户端。 -
如果成功,记录连接成功的信息,并返回
True。 -
如果发生异常,发送错误消息给用户,并返回
False。
4. @cl.on_audio_chunk
@cl.on_audio_chunk
async def on_audio_chunk(chunk: cl.InputAudioChunk):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime:
if openai_realtime.is_connected():
await openai_realtime.append_input_audio(chunk.data)
else:
logger.info("RealtimeClient is not connected")
解释:
-
当收到音频数据块时调用。
-
从用户会话中获取
openai_realtime客户端实例。 -
如果客户端存在且已连接:
-
将音频数据块添加到实时客户端的输入中,可能用于语音识别或处理。
-
否则:
-
记录客户端未连接的信息。
5. @cl.on_audio_end, @cl.on_chat_end, @cl.on_stop
@cl.on_audio_end
@cl.on_chat_end
@cl.on_stop
async def on_end():
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.disconnect()
解释:
-
当音频输入结束、聊天结束或会话停止时调用。
-
从用户会话中获取
openai_realtime客户端实例。 -
如果客户端存在且已连接:
-
调用
disconnect方法,与实时服务断开连接。
总体流程总结
-
会话开始:
-
用户进入聊天,会触发
@cl.on_chat_start,发送欢迎消息,并调用setup_openai_realtime初始化实时客户端。
用户发送消息:
-
如果用户发送文本消息,触发
@cl.on_message。 -
检查实时客户端是否连接,如果已连接,将消息发送给实时客户端处理。
语音交互:
-
用户按下
P开始语音输入,触发@cl.on_audio_start。 -
尝试连接实时客户端,准备接收音频数据。
-
用户的语音被分割成音频数据块,每个数据块触发
@cl.on_audio_chunk,将音频数据发送给实时客户端。 -
用户结束语音输入,触发
@cl.on_audio_end,断开与实时客户端的连接。
实时处理:
-
实时客户端将音频数据进行处理,可能是语音识别、生成回复等。
-
处理结果通过事件(如
conversation.updated)回调,调用相应的处理函数handle_conversation_updated。 -
将生成的音频或文本回复发送回用户。
会话结束:
-
用户退出或会话结束,触发
@cl.on_chat_end或@cl.on_stop,确保断开与实时客户端的连接,释放资源。
结论
这段代码实现了一个基于 Chainlit 框架的聊天机器人,支持文本和语音交互。它使用了 Azure OpenAI 的服务,通过 RealtimeClient 与 OpenAI 的实时接口交互,处理用户的输入,并将生成的回复(文本或音频)发送给用户。
主要的逻辑包括:
-
初始化和配置实时客户端,设置事件处理函数。
-
处理用户的文本消息和语音输入,将其发送给实时客户端。
-
接收并处理实时客户端的回调事件,将回复发送给用户。
-
管理会话的开始和结束,确保资源的正确分配和释放。
源码地址:
https://github.com/monuminu/AOAI_Samples/tree/main/realtime-assistant-support

