记忆类型
Cloud Native

快速使用
Cloud Native
▍步骤一:创建记忆存储
重要:如果进入 AgentRun 控制台后,右上角有“体验新版”字样,单击切换到新版控制台。

▍步骤二:创建 Agent 并体验记忆功能

-
输入“我叫小明,最近在学习向量数据库”,观察 Agent 回复(说明:函数计算遵循 Serverless(无服务器)架构,只有在请求到达时才创建实例,并能及时释放实例帮助用户节省成本。因此,首次提问时,会有较长的等待时间。)
-
在同一对话中继续提问“我叫什么?”,验证会话历史(Agent 能记住本轮对话内容) - 新建会话,提问“我叫什么?”,验证长期记忆(Agent 能跨会话记住用户信息)
代码集成
Cloud Native
▍环境准备
pip3 install alibabacloud_agentrun20250910 openai tablestore agentrun-sdk agentrun-mem0ai google-adk litellm langchain-openai
配置环境变量(请勿将密钥硬编码在代码中):
# 阿里云账号凭据export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"# Agent调用配置(在控制台Agent运行时 > 详情 > 概览 > 访问信息中获取)export AGENT_ENDPOINT="https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations"# Agent调用凭证(在控制台凭证管理中获取)export AGENT_API_KEY="your-agent-api-key"# 记忆存储名称(步骤一创建后获取)export MEMORY_COLLECTION_NAME="mem-xxxx"# DashScope API Key(LangChain 和 Google ADK 示例使用)export DASHSCOPE_API_KEY="your-dashscope-api-key"# Tablestore 配置(会话状态示例使用)export TABLESTORE_ENDPOINT="https://{INSTANCE}.{REGION}.ots.aliyuncs.com"export TABLESTORE_INSTANCE="your-instance-name"
▍会话历史
三轮对话测试:第 1 轮告知姓名和职业,第 2 轮直接问“我是谁”,验证 Agent 是否记住了上文;第 3 轮继续追问,验证完整上下文是否贯穿始终。每次运行生成新的 session_id,互不干扰。
import osimport reimport threadingimport timeimport uuidfrom openai import OpenAI# ── 配置 ──────────────────────────────────────────────────────────────────────AGENT_ENDPOINT = os.environ["AGENT_ENDPOINT"]AGENT_API_KEY = os.environ["AGENT_API_KEY"]MODEL = os.getenv("MODEL", "qwen3.5-plus")# 从 Endpoint URL 解析账号 ID 和 Agent 名称。# Endpoint 格式:https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations# 注意:这里使用账号 ID 仅适合单用户演示场景。在多用户应用中,# user_id 应代表终端用户(如应用内的用户 ID),而非开发者账号。_account_match = re.match(r"https://(d+).agentrun-data.", AGENT_ENDPOINT)USER_ID = _account_match.group(1) if _account_match else "default_user"_agent_match = re.search(r"/agent-runtimes/([^/]+)/endpoints/", AGENT_ENDPOINT)AGENT_ID = _agent_match.group(1) if _agent_match else "default_agent"client = OpenAI( base_url=f"{AGENT_ENDPOINT}/openai/v1", api_key=AGENT_API_KEY,)# ── 对话函数 ──────────────────────────────────────────────────────────────────def _spinner(stop_event: threading.Event): """在等待首个 Token 期间显示转圈动画。""" frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] i = 0 while not stop_event.is_set(): print(f'r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True) time.sleep(0.1) i += 1 print('r' + ' ' * 35 + 'r', end='', flush=True)def chat(session_id: str, history: list, message: str) -> str: """向 Agent 发送消息,通过 session_id 和消息历史维持会话上下文。 history 为本轮之前的对话历史,格式为 [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] 只保留 user/assistant 的文本内容(不含 tool_calls),历史消息与本轮消息 一起发送,Agent 可从上下文中直接读取之前的对话内容。 session_id 通过请求头 X-AgentRun-Conversation-ID 传递,AgentRun 用其 关联 Tablestore 中的会话记录(用 extra_body 传递无效,服务端不读请求体)。 """ print(f"用户: {message}") # 启动转圈动画(覆盖冷启动和 MCP 工具调用的等待时间) stop_event = threading.Event() spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True) spinner.start() stream = client.chat.completions.create( model=MODEL, messages=history + [{"role": "user", "content": message}], stream=True, extra_headers={ "X-AgentRun-Conversation-ID": session_id, "X-AgentRun-User-ID": USER_ID, "X-AgentRun-Agent-ID": AGENT_ID, }, ) full_reply = [ ] first_token = True for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: token = chunk.choices[0].delta.content if first_token: stop_event.set() spinner.join() print("助手: ", end="", flush=True) first_token = False print(token, end="", flush=True) full_reply.append(token) stop_event.set() spinner.join() print("n") return "".join(full_reply)# ── 主程序 ────────────────────────────────────────────────────────────────────def main(): # 每次运行生成新的 session_id,避免与上次运行的历史混淆。 # 在实际应用中,session_id 通常由应用层管理(如绑定用户的对话 ID)。 session_id = str(uuid.uuid4()) history = [ ] # 维护对话历史(只含 user/assistant 文本消息) print("═" * 50) print("示例:多轮对话(会话历史)") print("═" * 50) print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。") # 第一轮:自我介绍 print("n[第 1 轮]") q1 = "我叫小明,我是一名数据工程师,最近在研究 Tablestore" r1 = chat(session_id, history, q1) history.extend([ {"role": "user", "content": q1}, {"role": "assistant", "content": r1}, ]) # 第二轮:测试 Agent 是否记住了第一轮的内容 print("[第 2 轮]") q2 = "我叫什么名字?我从事什么工作?我最近在研究什么?" r2 = chat(session_id, history, q2) history.extend([ {"role": "user", "content": q2}, {"role": "assistant", "content": r2}, ]) # 第三轮:继续深入话题 print("[第 3 轮]") q3 = "针对我的工作,你有什么 Tablestore 使用建议?" chat(session_id, history, q3)if __name__ == "__main__": main()
说明:推荐使用流式模式(stream=True):Agent 在处理消息时会调用 MCP 记忆工具(读写 Tablestore),流式模式可避免客户端因等待 MCP 工具调用而超时。
将上述代码保存为 conversation_history.py 并运行:
python3 conversation_history.py
▍长期记忆
方式一:通过 MCP 集成
在记忆详情的长期记忆 > MCP 集成中启动服务配置,启动后获取 MCP 服务 URL(SSE),配置到支持 MCP 工具调用的 Agent 中,即可自动提取、更新、删除长期记忆。
说明:通过控制台使用 Agent 运行时,可在创建 Agent 配置记忆时开启 MCP(未提示表示已开启),Agent 会在每轮对话中自动调用 MCP 工具检索和更新长期记忆。对话中出现可提取的用户信息时(如:“我喜欢喝咖啡”),Agent 会自动存入向量库供后续跨 session 使用。
方式二:直接操作 mem0 向量存储
与方式一不同,方式二需要由代码显式调用 memory.add() 写入记忆—— mem0 不会自动监听对话,也不会自动判断“哪些内容值得记录”。这种方式适合在自定义 Agent 框架或数据管道中,由业务逻辑精确控制记忆的写入时机和内容。
本示例写入一段用户描述,mem0 内部调用 LLM 将其拆解为多条结构化记忆(如“名叫 Alice”、“喜欢喝咖啡”),然后用三个不同角度的语义查询验证检索效果。
import osimport timefrom agentrun.memory_collection import MemoryCollection# ── 配置 ──────────────────────────────────────────────────────────────────────MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]# agentrun-sdk 自动从以下环境变量读取凭证(无需在代码中显式传入):# ALIBABA_CLOUD_ACCESS_KEY_ID / ALIBABA_CLOUD_ACCESS_KEY_SECRET# AGENTRUN_REGION(默认 cn-hangzhou)# ── 初始化 mem0 客户端 ────────────────────────────────────────────────────────# to_mem0_memory() 自动完成两件事:# 1. 从 AgentRun 控制平面读取记忆存储配置(Tablestore 实例、集合、向量维度等)# 2. 使用读取到的配置初始化 agentrun-mem0ai Memory 客户端memory = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)# ── 核心操作 ──────────────────────────────────────────────────────────────────def add_memories(user_id: str): """向记忆存储中添加用户信息。 memory.add() 支持两种输入: - str:直接写入一段描述文本,mem0 内部会自动提炼为结构化记忆 - list[dict]:完整的对话消息列表(role/content),mem0 从中提取用户偏好 本示例使用文本输入演示基本用法。 """ print(f"n[添加记忆] user_id={user_id}") result = memory.add( "我叫 Alice,是一名 Python 工程师,平时喜欢喝咖啡,最近在研究向量数据库。", user_id=user_id, metadata={"source_app": "openmemory", "mcp_client": "python_sdk"}, ) results = result.get("results", [ ]) print(f" 写入 {len(results)} 条记忆:") for i, res in enumerate(results, 1): print(f" {i}. [{res.get('event')}] {res.get('memory', '')}")def search_memories(user_id: str, query: str): """从记忆存储中检索与查询语义最相关的记忆。""" print(f"n[检索记忆] 查询: {query!r}") results = memory.search(query, user_id=user_id) hits = results.get("results", [ ]) if not hits: print(" (未找到相关记忆)") return for i, hit in enumerate(hits, 1): score = hit.get("score", 0) content = hit.get("memory", "") print(f" {i}. [{score:.4f}] {content}")# ── 主程序 ────────────────────────────────────────────────────────────────────def main(): print("═" * 55) print("长期记忆示例 —— 直接操作 mem0 向量存储") print("═" * 55) print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。") user_id = "alice-demo" # 写入记忆 add_memories(user_id) # 等待 Tablestore 向量索引刷新(首次写入后搜索需要短暂延迟) print("n等待向量索引刷新...") time.sleep(3) # 语义检索:不同的查询角度 print() print("──" * 27) search_memories(user_id, "她喜欢什么饮料?") search_memories(user_id, "她是做什么工作的?") search_memories(user_id, "她最近在研究什么技术?")if __name__ == "__main__": main()
将上述代码保存为 long_term_memory.py 并运行:
python3 long_term_memory.py
方式三:LangChain 生态集成
以旅行规划助手为例,演示在 LangChain 对话循环中集成 mem0 长期记忆的完整模式:每轮对话开始时先检索与用户问题相关的历史记忆并注入 prompt,生成回复后再将本轮对话内容写回 mem0,使记忆在对话过程中持续积累。多轮后可观察到 Agent 能主动引用早先提到的旅行偏好。
import osimport threadingimport timefrom typing import Listfrom langchain_core.messages import SystemMessage, HumanMessage, BaseMessagefrom langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholderfrom langchain_openai import ChatOpenAIfrom agentrun.memory_collection import MemoryCollection# ── 配置 ──────────────────────────────────────────────────────────────────────MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]DASHSCOPE_API_KEY = os.environ["DASHSCOPE_API_KEY"]# ── 初始化 LLM 和 mem0 客户端 ─────────────────────────────────────────────────# 通过 DashScope OpenAI 兼容接口调用大模型llm = ChatOpenAI( base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key=DASHSCOPE_API_KEY, model="qwen-max",)# 初始化 mem0 客户端(自动从 AgentRun 控制平面读取 Tablestore 配置)mem0 = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)# ── Prompt 模板 ───────────────────────────────────────────────────────────────prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""你是一位专业的旅行规划助手,能够根据用户的偏好和历史对话提供个性化的旅行建议。如果有相关记忆上下文,请在回答中加以参考,让建议更符合用户需求。"""), MessagesPlaceholder(variable_name="context"), ("human", "{input}"),])# ── 核心函数 ──────────────────────────────────────────────────────────────────def _spinner(stop_event: threading.Event): """在等待 LLM 推理期间显示转圈动画。""" frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] i = 0 while not stop_event.is_set(): print(f'r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True) time.sleep(0.1) i += 1 print('r' + ' ' * 35 + 'r', end='', flush=True)def retrieve_context(query: str, user_id: str) -> List[BaseMessage]: """从 mem0 检索与用户问题相关的记忆,作为 LangChain 消息列表注入 prompt。""" try: memories = mem0.search(query, user_id=user_id) memory_list = memories.get("results", [ ]) if not memory_list: return [ ] serialized = " ".join(m["memory"] for m in memory_list) print(f" [mem0 检索到 {len(memory_list)} 条相关记忆]") return [SystemMessage(content=f"用户相关信息:{serialized}")] except Exception as e: print(f" [mem0 检索失败: {e}]") return [ ]def generate_response(user_input: str, context: List[BaseMessage]) -> str: """携带记忆上下文调用 LLM 生成回复。""" chain = prompt | llm response = chain.invoke({"context": context, "input": user_input}) return response.contentdef save_interaction(user_id: str, user_input: str, assistant_response: str) -> int: """将本轮对话写回 mem0,供后续对话检索使用。返回新增记忆条数。""" interaction = [ {"role": "user", "content": user_input}, {"role": "assistant", "content": assistant_response}, ] try: result = mem0.add(interaction, user_id=user_id, metadata={"source_app": "openmemory", "mcp_client": "python_sdk"}) return len(result.get("results", [ ])) except Exception as e: print(f" [mem0 保存失败: {e}]") return 0def chat_turn(user_input: str, user_id: str) -> str: """单轮对话:检索记忆 → 生成回复 → 保存记忆。""" context = retrieve_context(user_input, user_id) # 用一个 spinner 同时覆盖 LLM 推理和 mem0 保存,全部完成后再打印回复 stop_event = threading.Event() spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True) spinner.start() response = generate_response(user_input, context) count = save_interaction(user_id, user_input, response) stop_event.set() spinner.join() print(f"助手: {response}n") if count: print(f" [mem0 新增 {count} 条记忆]n") return response# ── 主程序 ────────────────────────────────────────────────────────────────────def main(): print("═" * 60) print("旅行规划助手(LangChain + mem0 长期记忆)") print("═" * 60) print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。") print("输入 /quit 退出n") user_id = "travel-user-demo" while True: try: user_input = input("你: ").strip() except (EOFError, KeyboardInterrupt): break if not user_input or user_input == "/quit": break response = chat_turn(user_input, user_id)if __name__ == "__main__": main()
将上述代码保存为 langchain_memory.py 并运行:
python3 langchain_memory.py
▍会话状态
方式一:通过 Tablestore SDK 查询会话数据
AgentRun 将每次对话的会话元数据写入 session 表,消息内容写入 message 表。本示例直接扫描这两张表,列出最近的会话记录,并读取其中一个会话的完整对话内容。适用于数据分析、会话审计等只读场景。
import jsonimport osfrom datetime import datetimeimport tablestore as ots# ── 配置 ──────────────────────────────────────────────────────────────────────ENDPOINT = os.environ["TABLESTORE_ENDPOINT"]INSTANCE = os.environ["TABLESTORE_INSTANCE"]ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]client = ots.OTSClient(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET, INSTANCE)# ── 查询函数 ──────────────────────────────────────────────────────────────────def list_sessions(user_id: str = None, limit: int = 10) -> list[dict]: """查询 session 表中的会话元数据列表。""" if user_id: min_pk = [("user_id", user_id), ("session_id", ots.INF_MIN)] max_pk = [("user_id", user_id), ("session_id", ots.INF_MAX)] else: min_pk = [("user_id", ots.INF_MIN), ("session_id", ots.INF_MIN)] max_pk = [("user_id", ots.INF_MAX), ("session_id", ots.INF_MAX)] _, _, rows, _ = client.get_range( "session", ots.Direction.FORWARD, min_pk, max_pk, columns_to_get=[ ], max_version=1, limit=limit, ) sessions = [ ] for row in rows: pks = dict(row.primary_key) attrs = {c[0]: c[1] for c in row.attribute_columns} # update_time 单位为微秒 update_time_us = int(attrs.get("update_time", 0)) sessions.append({ "user_id": pks["user_id"], "session_id": pks["session_id"], "agent_id": attrs.get("agent_id", ""), "update_time": datetime.fromtimestamp(update_time_us / 1e6).strftime("%Y-%m-%d %H:%M:%S"), }) return sessionsdef get_session_messages(session_id: str) -> list[dict]: """查询 message 表中指定会话的所有对话消息。""" min_pk = [("session_id", session_id), ("create_time", ots.INF_MIN), ("message_id", ots.INF_MIN)] max_pk = [("session_id", session_id), ("create_time", ots.INF_MAX), ("message_id", ots.INF_MAX)] _, _, rows, _ = client.get_range( "message", ots.Direction.FORWARD, min_pk, max_pk, columns_to_get=[ ], max_version=1, ) messages = [ ] for row in rows: attrs = {c[0]: c[1] for c in row.attribute_columns} if "content" in attrs: try: messages.extend(json.loads(attrs["content"])) except json.JSONDecodeError: pass return messages# ── 主程序 ────────────────────────────────────────────────────────────────────def main(): print("═" * 55) print("AgentRun 会话状态 —— Tablestore 数据查询示例") print("═" * 55) # ── 1. 列出所有会话 ─────────────────────────────────────────────────────── print("n[会话列表]") sessions = list_sessions(limit=5) if not sessions: print(" (暂无会话记录)") return for s in sessions: print(f" session_id : {s['session_id']}") print(f" agent_id : {s['agent_id']}") print(f" 更新时间 : {s['update_time']}") print() # ── 2. 读取最近一个会话的消息 ───────────────────────────────────────────── latest = sessions[-1] sid = latest["session_id"] print(f"n[会话详情] session_id: {sid}") messages = get_session_messages(sid) if not messages: print(" (该会话暂无消息)") else: for msg in messages: role = msg.get("role", "?") content = msg.get("content", "")[:100] ellipsis = "..." if len(msg.get("content", "")) > 100 else "" print(f" [{role}] {content}{ellipsis}")if __name__ == "__main__": main()
将上述代码保存为 session_state.py 并运行:
python3 session_state.py
方式二:Google ADK 集成(OTSSessionService)
演示如何将 Tablestore 作为 Google ADK Agent 的会话状态后端。通过 OTSSessionService,ADK Agent 的每次对话都会自动持久化到 Tablestore,进程重启后可继续之前的会话,无需重新建立上下文。示例包含一个能查询天气的工具,用于验证 Agent 的工具调用和会话记录是否正常持久化。
from __future__ import annotationsimport asyncioimport osimport sysimport threadingimport timefrom typing import Anyfrom google.adk.agents import Agentfrom google.adk.models.lite_llm import LiteLlmfrom google.adk.runners import Runnerfrom google.genai import typesfrom agentrun.conversation_service import SessionStorefrom agentrun.conversation_service.adapters import OTSSessionService# ── 配置 ──────────────────────────────────────────────────────────────────────MEMORY_COLLECTION_NAME = os.environ.get("MEMORY_COLLECTION_NAME", "")DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")if not MEMORY_COLLECTION_NAME: print("ERROR: 请设置环境变量 MEMORY_COLLECTION_NAME") sys.exit(1)APP_NAME = "adk-chat-demo"USER_ID = "demo_user"SESSION_FILE = ".adk_session_id" # 本地持久化 session ID,删除此文件可开始新会话# ── 工具定义 ──────────────────────────────────────────────────────────────────def _spinner(stop_event: threading.Event): """在等待 Agent 推理期间显示转圈动画。""" frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] i = 0 while not stop_event.is_set(): print(f'rAgent: 正在思考... {frames[i % len(frames)]}', end='', flush=True) time.sleep(0.1) i += 1 print('r' + ' ' * 35 + 'r', end='', flush=True)def get_weather(city: str) -> dict[str, Any]: """查询指定城市的天气信息。""" data = { "北京": {"weather": "晴", "temperature": "5~15°C"}, "上海": {"weather": "多云", "temperature": "12~20°C"}, "杭州": {"weather": "阴", "temperature": "10~18°C"}, } return data.get(city, {"error": "暂无该城市数据"})# ── Step 1: 初始化 SessionStore ───────────────────────────────────────────────# SessionStore.from_memory_collection() 从 AgentRun 控制平面读取记忆存储配置,# 自动获取 Tablestore 实例和 endpoint,无需手动配置 OTS 连接参数。store = SessionStore.from_memory_collection(MEMORY_COLLECTION_NAME)store.init_tables()store.init_search_index()# ── Step 2: 创建 OTSSessionService ───────────────────────────────────────────session_service = OTSSessionService(session_store=store)# ── Step 3: 创建 Agent + Runner ───────────────────────────────────────────────# 通过 LiteLLM 调用 DashScope OpenAI 兼容接口custom_model = LiteLlm( model="openai/qwen-max", api_key=DASHSCOPE_API_KEY, api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",)agent = Agent( name="smart_assistant", model=custom_model, instruction="你是一个友好的中文智能助手,用户问天气时调用 get_weather 工具。", tools=[get_weather],)runner = Runner( agent=agent, app_name=APP_NAME, session_service=session_service,)# ── Step 4: 对话(自动持久化到 Tablestore) ───────────────────────────────────async def chat(session_id: str, text: str) -> str: """发送消息并返回 Agent 回复。""" content = types.Content( role="user", parts=[types.Part(text=text)], ) reply_parts: list[str] = [ ] async for event in runner.run_async( user_id=USER_ID, session_id=session_id, new_message=content, ): if event.is_final_response() and event.content and event.content.parts: for part in event.content.parts: if part.text: reply_parts.append(part.text) return "n".join(reply_parts)# ── 主程序 ────────────────────────────────────────────────────────────────────async def main() -> None: # 从本地文件读取上次的 session ID,尝试恢复会话;找不到则创建新会话 session = None if os.path.exists(SESSION_FILE): saved_id = open(SESSION_FILE).read().strip() session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=saved_id ) if session: print(f"继续之前的会话: {session.id}") if session is None: session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, state={"user:language": "zh-CN"}, ) open(SESSION_FILE, "w").write(session.id) print(f"新会话已创建: {session.id}") print("输入 /quit 退出,/new 开始新会话n") while True: try: user_input = input("你: ").strip() except (EOFError, KeyboardInterrupt): break if not user_input: continue if user_input == "/quit": break if user_input == "/new": session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, state={"user:language": "zh-CN"}, ) open(SESSION_FILE, "w").write(session.id) print(f"新会话已创建: {session.id}n") continue stop_event = threading.Event() spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True) spinner.start() reply = await chat(session.id, user_input) stop_event.set() spinner.join() print(f"Agent: {reply}n")if __name__ == "__main__": asyncio.run(main())
将上述代码保存为 adk_session.py 并运行:
python3 adk_session.py
管理记忆存储
Cloud Native
查询记忆存储的配置详情,包括关联的 Tablestore 实例名称、向量集合、向量维度、嵌入模型和长期记忆提取模型,以及当前账号下的所有记忆存储列表。适用于排查配置问题或了解记忆存储的底层资源信息。
import osfrom alibabacloud_agentrun20250910.client import Clientfrom alibabacloud_tea_openapi import models as open_api_models# ── 配置 ──────────────────────────────────────────────────────────────────────ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]REGION = os.getenv("AGENTRUN_REGION", "cn-hangzhou")config = open_api_models.Config( access_key_id=ACCESS_KEY_ID, access_key_secret=ACCESS_KEY_SECRET, endpoint=f"agentrun.{REGION}.aliyuncs.com",)client = Client(config)# ── 工具函数 ──────────────────────────────────────────────────────────────────def get_memory_collection(name: str) -> dict: """查询记忆存储的详细配置。""" resp = client.get_memory_collection(name) return resp.body.data.to_map()def list_memory_collections() -> list: """列出当前账号下的所有记忆存储。""" from alibabacloud_agentrun20250910 import models as ar_models req = ar_models.ListMemoryCollectionsRequest() resp = client.list_memory_collections(req) return [item.to_map() for item in (resp.body.data.items or [ ])]# ── 主程序 ────────────────────────────────────────────────────────────────────def main(): # ── 查询单个记忆存储 ────────────────────────────────────────────────────── print("═" * 55) print(f"查询记忆存储:{MEMORY_COLLECTION_NAME}") print("═" * 55) info = get_memory_collection(MEMORY_COLLECTION_NAME) print(f"名称:{info.get('memoryCollectionName')}") print(f"ID:{info.get('memoryCollectionId')}") print(f"会话历史:{'已启用' if info.get('enableConversationHistory') else '未启用'}") print(f"会话状态:{'已启用' if info.get('enableConversationState') else '未启用'}") vector_cfg = info.get("vectorStoreConfig", {}) print(f"n向量数据库:{vector_cfg.get('provider')}") vector_detail = vector_cfg.get("config", {}) print(f" 实例:{vector_detail.get('instanceName')}") print(f" 集合:{vector_detail.get('collectionName')}") print(f" 向量维度:{vector_detail.get('vectorDimension')}") embedder_cfg = info.get("embedderConfig", {}) embedder_detail = embedder_cfg.get("config", {}) print(f"n嵌入模型:{embedder_detail.get('model')}") print(f"模型服务:{embedder_cfg.get('modelServiceName')}") llm_cfg = info.get("llmConfig", {}) llm_detail = llm_cfg.get("config", {}) print(f"n长期记忆提取模型:{llm_detail.get('model')}") print(f"模型服务:{llm_cfg.get('modelServiceName')}") # ── 列出所有记忆存储 ────────────────────────────────────────────────────── print("n" + "═" * 55) print("当前账号下的所有记忆存储") print("═" * 55) collections = list_memory_collections() if not collections: print("(无)") for c in collections: status = c.get("status", "") print(f" - {c.get('memoryCollectionName')} [{status}]")if __name__ == "__main__": main()
将上述代码保存为 manage_memory.py 并运行:
python3 manage_memory.py


