From e3768e2707945ead3311e09bc1de3f40256c6da3 Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Fri, 5 Dec 2025 17:53:45 +0800 Subject: [PATCH] Implement dynamic graph memory update feature for simulations - Added a new optional parameter `enable_graph_memory_update` to the simulation API, allowing real-time updates of agent activities to the Zep knowledge graph. - Introduced `ZepGraphMemoryUpdater` and `ZepGraphMemoryManager` classes to handle the background processing of activity updates, ensuring efficient API calls and data management. - Updated the README.md to include detailed instructions on the new graph memory update functionality and its configuration. - Enhanced the simulation runner to manage the lifecycle of the graph memory updater, including starting and stopping the updater based on user configuration. - Improved logging to track the status of graph memory updates, providing better insights into the simulation process. --- backend/README.md | 164 ++++++- backend/app/api/simulation.py | 47 +- backend/app/services/__init__.py | 8 + backend/app/services/simulation_runner.py | 61 ++- .../app/services/zep_graph_memory_updater.py | 408 ++++++++++++++++++ 5 files changed, 670 insertions(+), 18 deletions(-) create mode 100644 backend/app/services/zep_graph_memory_updater.py diff --git a/backend/README.md b/backend/README.md index 2af0c18..e52ace4 100644 --- a/backend/README.md +++ b/backend/README.md @@ -27,6 +27,7 @@ 3. **Agent人设生成**: 基于图谱实体,使用 LLM 生成详细的社交媒体用户人设 4. **模拟配置智能生成**: 使用 LLM 根据需求自动生成模拟参数(时间、活跃度、事件等) 5. **双平台模拟**: 支持 Twitter 和 Reddit 双平台并行舆论模拟(基于 OASIS 框架) +6. **图谱记忆动态更新**: 可选功能,将模拟中Agent的活动实时更新到Zep图谱,让图谱"记住"模拟过程 --- @@ -69,7 +70,7 @@ 3. **模拟运行流程**: ``` - 启动模拟 → 运行OASIS脚本 → 实时监控 → 记录动作 → 状态查询 + 启动模拟 → 运行OASIS脚本 → 实时监控 → 记录动作 → (可选)更新Zep图谱记忆 → 状态查询 ``` --- @@ -150,7 +151,8 @@ backend/ │ ├── oasis_profile_generator.py # 人设生成 │ ├── simulation_config_generator.py # 配置生成 │ ├── simulation_manager.py # 模拟管理 - │ └── simulation_runner.py # 模拟运行 + │ ├── simulation_runner.py # 模拟运行 + │ └── zep_graph_memory_updater.py # 图谱记忆动态更新 └── utils/ # 工具类 ├── __init__.py ├── file_parser.py # 文件解析 @@ -207,11 +209,13 @@ backend/ 2. 启动 OASIS 模拟进程(subprocess) 3. 监控进程运行状态 4. 解析动作日志(actions.jsonl) -5. 实时更新运行状态 -6. 支持停止/暂停/恢复 +5. (可选)将Agent活动实时更新到Zep图谱 +6. 实时更新运行状态 +7. 支持停止/暂停/恢复 **核心服务**: - `SimulationRunner`: 模拟运行器 +- `ZepGraphMemoryUpdater`: 图谱记忆动态更新器 --- @@ -555,7 +559,8 @@ backend/ { "simulation_id": "sim_10b494550540", "platform": "parallel", - "max_rounds": 100 + "max_rounds": 100, + "enable_graph_memory_update": false } ``` @@ -563,7 +568,8 @@ backend/ |------|------|------|--------|------| | simulation_id | String | 是 | - | 模拟ID | | platform | String | 否 | parallel | 运行平台: twitter/reddit/parallel | -| max_rounds | Integer | 否 | - | 最大模拟轮数,用于截断过长的模拟。如果配置中的轮数超过此值,将被截断 | +| max_rounds | Integer | 否 | - | 最大模拟轮数,用于截断过长的模拟 | +| enable_graph_memory_update | Boolean | 否 | false | 是否将Agent活动动态更新到Zep图谱 | **返回示例**: ```json @@ -577,12 +583,25 @@ backend/ "reddit_running": true, "started_at": "2025-12-02T11:00:00", "total_rounds": 100, - "max_rounds_applied": 100 + "max_rounds_applied": 100, + "graph_memory_update_enabled": true, + "graph_id": "mirofish_abc123" } } ``` -> **说明**: `max_rounds_applied` 字段仅在指定了 `max_rounds` 参数时返回,表示实际应用的最大轮数限制。 +> **说明**: +> - `max_rounds_applied` 字段仅在指定了 `max_rounds` 参数时返回 +> - `graph_memory_update_enabled` 和 `graph_id` 字段在启用图谱记忆更新时返回 + +**图谱记忆更新功能说明**: + +启用 `enable_graph_memory_update` 后: +- 模拟中所有Agent的活动(发帖、评论、点赞、转发等)会实时更新到Zep图谱 +- 活动会被转换为自然语言描述,例如:`[Twitter模拟 第15轮] 张三: 发布了一条帖子:「...」` +- 采用批量更新机制(默认10条或30秒),减少API调用次数 +- Zep会自动从文本中提取实体和关系,丰富图谱知识 +- 需要项目已构建有效的图谱(graph_id) --- @@ -1255,6 +1274,126 @@ def cleanup_all_simulations(cls): --- +### 8. ZepGraphMemoryUpdater (图谱记忆更新器) + +**文件**: `app/services/zep_graph_memory_updater.py` + +**功能**: 将模拟中的Agent活动动态更新到Zep图谱 + +**核心类**: + +```python +class AgentActivity: + """Agent活动记录""" + platform: str # twitter / reddit + agent_id: int + agent_name: str + action_type: str # CREATE_POST, LIKE_POST, etc. + action_args: Dict + round_num: int + timestamp: str + + def to_episode_text(self) -> str: + """ + 将活动转换为自然语言描述 + + 示例输出: + - "[Twitter模拟 第15轮] 张三: 发布了一条帖子:「官方声明:...」" + - "[Reddit模拟 第3轮] 李四: 在帖子#5下评论道:「我认为...」" + - "[Twitter模拟 第10轮] 王五: 引用帖子#3并评论:「同意!」" + """ +``` + +```python +class ZepGraphMemoryUpdater: + """ + 图谱记忆更新器 + + 特性: + - 批量更新(BATCH_SIZE=10条或MAX_WAIT_TIME=30秒) + - 后台线程异步处理,不阻塞主模拟流程 + - 带重试的API调用(MAX_RETRIES=3) + - 自动跳过DO_NOTHING类型的活动 + """ + + def start(self): + """启动后台工作线程""" + + def stop(self): + """停止并发送剩余活动""" + + def add_activity(self, activity: AgentActivity): + """添加活动到队列""" + + def add_activity_from_dict(self, data: Dict, platform: str): + """从动作日志字典添加活动""" + + def get_stats(self) -> Dict: + """获取统计信息(total_activities, total_sent, failed_count等)""" +``` + +```python +class ZepGraphMemoryManager: + """ + 管理多个模拟的更新器实例 + """ + + @classmethod + def create_updater(cls, simulation_id: str, graph_id: str) -> ZepGraphMemoryUpdater: + """为模拟创建并启动更新器""" + + @classmethod + def get_updater(cls, simulation_id: str) -> Optional[ZepGraphMemoryUpdater]: + """获取模拟的更新器""" + + @classmethod + def stop_updater(cls, simulation_id: str): + """停止并移除模拟的更新器""" + + @classmethod + def stop_all(cls): + """停止所有更新器(服务器关闭时调用)""" +``` + +**活动类型转换**: + +| action_type | 转换后的描述 | +|-------------|-------------| +| CREATE_POST | 发布了一条帖子:「{content}」 | +| LIKE_POST | 点赞了帖子#{post_id} | +| DISLIKE_POST | 踩了帖子#{post_id} | +| REPOST | 转发了帖子#{post_id} | +| QUOTE_POST | 引用帖子#{quoted_id}并评论:「{content}」 | +| FOLLOW | 关注了用户#{user_id} | +| CREATE_COMMENT | 在帖子#{post_id}下评论道:「{content}」 | +| LIKE_COMMENT | 点赞了评论#{comment_id} | +| SEARCH_POSTS | 搜索了「{query}」 | +| MUTE | 屏蔽了用户#{user_id} | + +**使用示例**: + +```python +# 在启动模拟时启用图谱记忆更新 +POST /api/simulation/start +{ + "simulation_id": "sim_xxx", + "enable_graph_memory_update": true +} +``` + +启用后,模拟中的活动会被转换为类似以下格式的文本并发送到Zep: + +``` +[Twitter模拟 第0轮] 上级: 发布了一条帖子:「官方声明:经复核并结合司法判决,校方决定撤销对肖某某的处分。学校向当事人致以正式歉意...」 +[Twitter模拟 第0轮] 全国顶尖新闻传播学院的大学: 发布了一条帖子:「武汉大学官方发布:学校已决定撤销此前对当事人的处分...」 +[Twitter模拟 第15轮] 全国考生: 引用帖子#5并评论 +[Reddit模拟 第3轮] 教师代表: 在帖子#2下评论道:「此事暴露出高校在程序正义上的问题...」 +``` + +Zep会自动从这些文本中提取实体(如人名、机构名)和关系,丰富图谱知识。 + +--- + ## 工具类 ### 1. FileParser (文件解析器) @@ -1508,13 +1647,14 @@ curl -X POST http://localhost:5001/api/simulation/prepare/status \ # 等待status=completed -# Step 7: 启动模拟(可选指定max_rounds限制轮数) +# Step 7: 启动模拟(可选参数:max_rounds限制轮数,enable_graph_memory_update启用图谱记忆更新) curl -X POST http://localhost:5001/api/simulation/start \ -H "Content-Type: application/json" \ -d '{ "simulation_id": "sim_xxx", "platform": "parallel", - "max_rounds": 50 + "max_rounds": 50, + "enable_graph_memory_update": true }' # Step 8: 实时查询运行状态 @@ -1689,6 +1829,6 @@ MIT License --- -**最后更新**: 2025-12-02 -**版本**: v1.0.0 +**最后更新**: 2025-12-05 +**版本**: v1.1.0 diff --git a/backend/app/api/simulation.py b/backend/app/api/simulation.py index ca49971..06f5d40 100644 --- a/backend/app/api/simulation.py +++ b/backend/app/api/simulation.py @@ -1113,11 +1113,18 @@ def start_simulation(): 请求(JSON): { - "simulation_id": "sim_xxxx", // 必填,模拟ID - "platform": "parallel", // 可选: twitter / reddit / parallel (默认) - "max_rounds": 100 // 可选: 最大模拟轮数,用于截断过长的模拟 + "simulation_id": "sim_xxxx", // 必填,模拟ID + "platform": "parallel", // 可选: twitter / reddit / parallel (默认) + "max_rounds": 100, // 可选: 最大模拟轮数,用于截断过长的模拟 + "enable_graph_memory_update": false // 可选: 是否将Agent活动动态更新到Zep图谱记忆 } + 关于 enable_graph_memory_update: + - 启用后,模拟中所有Agent的活动(发帖、评论、点赞等)都会实时更新到Zep图谱 + - 这可以让图谱"记住"模拟过程,用于后续分析或AI对话 + - 需要模拟关联的项目有有效的 graph_id + - 采用批量更新机制,减少API调用次数 + 返回: { "success": true, @@ -1127,7 +1134,8 @@ def start_simulation(): "process_pid": 12345, "twitter_running": true, "reddit_running": true, - "started_at": "2025-12-01T10:00:00" + "started_at": "2025-12-01T10:00:00", + "graph_memory_update_enabled": true // 是否启用了图谱记忆更新 } } """ @@ -1143,6 +1151,7 @@ def start_simulation(): platform = data.get('platform', 'parallel') max_rounds = data.get('max_rounds') # 可选:最大模拟轮数 + enable_graph_memory_update = data.get('enable_graph_memory_update', False) # 可选:是否启用图谱记忆更新 # 验证 max_rounds 参数 if max_rounds is not None: @@ -1203,8 +1212,33 @@ def start_simulation(): "error": f"模拟未准备好,当前状态: {state.status.value},请先调用 /prepare 接口" }), 400 + # 获取图谱ID(用于图谱记忆更新) + graph_id = None + if enable_graph_memory_update: + # 从模拟状态或项目中获取 graph_id + graph_id = state.graph_id + if not graph_id: + # 尝试从项目中获取 + project = ProjectManager.get_project(state.project_id) + if project: + graph_id = project.graph_id + + if not graph_id: + return jsonify({ + "success": False, + "error": "启用图谱记忆更新需要有效的 graph_id,请确保项目已构建图谱" + }), 400 + + logger.info(f"启用图谱记忆更新: simulation_id={simulation_id}, graph_id={graph_id}") + # 启动模拟 - run_state = SimulationRunner.start_simulation(simulation_id, platform, max_rounds) + run_state = SimulationRunner.start_simulation( + simulation_id=simulation_id, + platform=platform, + max_rounds=max_rounds, + enable_graph_memory_update=enable_graph_memory_update, + graph_id=graph_id + ) # 更新模拟状态 state.status = SimulationStatus.RUNNING @@ -1213,6 +1247,9 @@ def start_simulation(): response_data = run_state.to_dict() if max_rounds: response_data['max_rounds_applied'] = max_rounds + response_data['graph_memory_update_enabled'] = enable_graph_memory_update + if enable_graph_memory_update: + response_data['graph_id'] = graph_id return jsonify({ "success": True, diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py index b8f2e67..b4dda02 100644 --- a/backend/app/services/__init__.py +++ b/backend/app/services/__init__.py @@ -23,6 +23,11 @@ from .simulation_runner import ( AgentAction, RoundSummary ) +from .zep_graph_memory_updater import ( + ZepGraphMemoryUpdater, + ZepGraphMemoryManager, + AgentActivity +) __all__ = [ 'OntologyGenerator', @@ -47,5 +52,8 @@ __all__ = [ 'RunnerStatus', 'AgentAction', 'RoundSummary', + 'ZepGraphMemoryUpdater', + 'ZepGraphMemoryManager', + 'AgentActivity', ] diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index fa03f28..125eede 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -20,6 +20,7 @@ from queue import Queue from ..config import Config from ..utils.logger import get_logger +from .zep_graph_memory_updater import ZepGraphMemoryManager logger = get_logger('mirofish.simulation_runner') @@ -201,6 +202,9 @@ class SimulationRunner: _stdout_files: Dict[str, Any] = {} # 存储 stdout 文件句柄 _stderr_files: Dict[str, Any] = {} # 存储 stderr 文件句柄 + # 图谱记忆更新配置 + _graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled + @classmethod def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]: """获取运行状态""" @@ -281,7 +285,9 @@ class SimulationRunner: cls, simulation_id: str, platform: str = "parallel", # twitter / reddit / parallel - max_rounds: int = None # 最大模拟轮数(可选,用于截断过长的模拟) + max_rounds: int = None, # 最大模拟轮数(可选,用于截断过长的模拟) + enable_graph_memory_update: bool = False, # 是否将活动更新到Zep图谱 + graph_id: str = None # Zep图谱ID(启用图谱更新时必需) ) -> SimulationRunState: """ 启动模拟 @@ -290,6 +296,8 @@ class SimulationRunner: simulation_id: 模拟ID platform: 运行平台 (twitter/reddit/parallel) max_rounds: 最大模拟轮数(可选,用于截断过长的模拟) + enable_graph_memory_update: 是否将Agent活动动态更新到Zep图谱 + graph_id: Zep图谱ID(启用图谱更新时必需) Returns: SimulationRunState @@ -332,6 +340,21 @@ class SimulationRunner: cls._save_run_state(state) + # 如果启用图谱记忆更新,创建更新器 + if enable_graph_memory_update: + if not graph_id: + raise ValueError("启用图谱记忆更新时必须提供 graph_id") + + try: + ZepGraphMemoryManager.create_updater(simulation_id, graph_id) + cls._graph_memory_enabled[simulation_id] = True + logger.info(f"已启用图谱记忆更新: simulation_id={simulation_id}, graph_id={graph_id}") + except Exception as e: + logger.error(f"创建图谱记忆更新器失败: {e}") + cls._graph_memory_enabled[simulation_id] = False + else: + cls._graph_memory_enabled[simulation_id] = False + # 确定运行哪个脚本(脚本位于 backend/scripts/ 目录) if platform == "twitter": script_name = "run_twitter_simulation.py" @@ -489,6 +512,15 @@ class SimulationRunner: cls._save_run_state(state) finally: + # 停止图谱记忆更新器 + if cls._graph_memory_enabled.get(simulation_id, False): + try: + ZepGraphMemoryManager.stop_updater(simulation_id) + logger.info(f"已停止图谱记忆更新: simulation_id={simulation_id}") + except Exception as e: + logger.error(f"停止图谱记忆更新器失败: {e}") + cls._graph_memory_enabled.pop(simulation_id, None) + # 清理进程资源 cls._processes.pop(simulation_id, None) cls._action_queues.pop(simulation_id, None) @@ -527,6 +559,12 @@ class SimulationRunner: Returns: 新的读取位置 """ + # 检查是否启用了图谱记忆更新 + graph_memory_enabled = cls._graph_memory_enabled.get(state.simulation_id, False) + graph_updater = None + if graph_memory_enabled: + graph_updater = ZepGraphMemoryManager.get_updater(state.simulation_id) + try: with open(log_path, 'r', encoding='utf-8') as f: f.seek(position) @@ -557,6 +595,10 @@ class SimulationRunner: if action.round_num and action.round_num > state.current_round: state.current_round = action.round_num + # 如果启用了图谱记忆更新,将活动发送到Zep + if graph_updater: + graph_updater.add_activity_from_dict(action_data, platform) + except json.JSONDecodeError: pass return f.tell() @@ -615,6 +657,15 @@ class SimulationRunner: state.completed_at = datetime.now().isoformat() cls._save_run_state(state) + # 停止图谱记忆更新器 + if cls._graph_memory_enabled.get(simulation_id, False): + try: + ZepGraphMemoryManager.stop_updater(simulation_id) + logger.info(f"已停止图谱记忆更新: simulation_id={simulation_id}") + except Exception as e: + logger.error(f"停止图谱记忆更新器失败: {e}") + cls._graph_memory_enabled.pop(simulation_id, None) + logger.info(f"模拟已停止: {simulation_id}") return state @@ -811,6 +862,14 @@ class SimulationRunner: """ logger.info("正在清理所有模拟进程...") + # 首先停止所有图谱记忆更新器 + try: + ZepGraphMemoryManager.stop_all() + logger.info("已停止所有图谱记忆更新器") + except Exception as e: + logger.error(f"停止图谱记忆更新器失败: {e}") + cls._graph_memory_enabled.clear() + # 复制字典以避免在迭代时修改 processes = list(cls._processes.items()) diff --git a/backend/app/services/zep_graph_memory_updater.py b/backend/app/services/zep_graph_memory_updater.py new file mode 100644 index 0000000..c9a1b5c --- /dev/null +++ b/backend/app/services/zep_graph_memory_updater.py @@ -0,0 +1,408 @@ +""" +Zep图谱记忆更新服务 +将模拟中的Agent活动动态更新到Zep图谱中 +""" + +import os +import time +import threading +import json +from typing import Dict, Any, List, Optional, Callable +from dataclasses import dataclass +from datetime import datetime +from queue import Queue, Empty + +from zep_cloud.client import Zep + +from ..config import Config +from ..utils.logger import get_logger + +logger = get_logger('mirofish.zep_graph_memory_updater') + + +@dataclass +class AgentActivity: + """Agent活动记录""" + platform: str # twitter / reddit + agent_id: int + agent_name: str + action_type: str # CREATE_POST, LIKE_POST, etc. + action_args: Dict[str, Any] + round_num: int + timestamp: str + + def to_episode_text(self) -> str: + """ + 将活动转换为可以发送给Zep的文本描述 + + 采用自然语言描述格式,让Zep能够从中提取实体和关系 + """ + platform_name = "Twitter" if self.platform == "twitter" else "Reddit" + + # 根据不同的动作类型生成不同的描述 + action_descriptions = { + "CREATE_POST": self._describe_create_post, + "LIKE_POST": self._describe_like_post, + "DISLIKE_POST": self._describe_dislike_post, + "REPOST": self._describe_repost, + "QUOTE_POST": self._describe_quote_post, + "FOLLOW": self._describe_follow, + "CREATE_COMMENT": self._describe_create_comment, + "LIKE_COMMENT": self._describe_like_comment, + "DISLIKE_COMMENT": self._describe_dislike_comment, + "SEARCH_POSTS": self._describe_search, + "SEARCH_USER": self._describe_search_user, + "MUTE": self._describe_mute, + } + + describe_func = action_descriptions.get(self.action_type, self._describe_generic) + description = describe_func() + + # 添加时间和平台上下文 + return f"[{platform_name}模拟 第{self.round_num}轮] {self.agent_name}: {description}" + + def _describe_create_post(self) -> str: + content = self.action_args.get("content", "") + if content: + # 截取内容,避免过长 + if len(content) > 300: + content = content[:300] + "..." + return f"发布了一条帖子:「{content}」" + return "发布了一条帖子" + + def _describe_like_post(self) -> str: + post_id = self.action_args.get("post_id", "") + return f"点赞了帖子#{post_id}" if post_id else "点赞了一条帖子" + + def _describe_dislike_post(self) -> str: + post_id = self.action_args.get("post_id", "") + return f"踩了帖子#{post_id}" if post_id else "踩了一条帖子" + + def _describe_repost(self) -> str: + post_id = self.action_args.get("post_id", "") + return f"转发了帖子#{post_id}" if post_id else "转发了一条帖子" + + def _describe_quote_post(self) -> str: + quoted_id = self.action_args.get("quoted_id", "") + content = self.action_args.get("content", "") + if quoted_id: + if content: + return f"引用帖子#{quoted_id}并评论:「{content[:100]}{'...' if len(content) > 100 else ''}」" + return f"引用了帖子#{quoted_id}" + return "引用了一条帖子" + + def _describe_follow(self) -> str: + target_id = self.action_args.get("user_id", "") or self.action_args.get("target_id", "") + return f"关注了用户#{target_id}" if target_id else "关注了一个用户" + + def _describe_create_comment(self) -> str: + content = self.action_args.get("content", "") + post_id = self.action_args.get("post_id", "") + if content: + if len(content) > 200: + content = content[:200] + "..." + base = f"评论道:「{content}」" + if post_id: + base = f"在帖子#{post_id}下{base}" + return base + return f"在帖子#{post_id}下发表了评论" if post_id else "发表了评论" + + def _describe_like_comment(self) -> str: + comment_id = self.action_args.get("comment_id", "") + return f"点赞了评论#{comment_id}" if comment_id else "点赞了一条评论" + + def _describe_dislike_comment(self) -> str: + comment_id = self.action_args.get("comment_id", "") + return f"踩了评论#{comment_id}" if comment_id else "踩了一条评论" + + def _describe_search(self) -> str: + query = self.action_args.get("query", "") or self.action_args.get("keyword", "") + return f"搜索了「{query}」" if query else "进行了搜索" + + def _describe_search_user(self) -> str: + query = self.action_args.get("query", "") or self.action_args.get("username", "") + return f"搜索了用户「{query}」" if query else "搜索了用户" + + def _describe_mute(self) -> str: + target_id = self.action_args.get("user_id", "") or self.action_args.get("target_id", "") + return f"屏蔽了用户#{target_id}" if target_id else "屏蔽了一个用户" + + def _describe_generic(self) -> str: + # 对于未知的动作类型,生成通用描述 + return f"执行了{self.action_type}操作" + + +class ZepGraphMemoryUpdater: + """ + Zep图谱记忆更新器 + + 监控模拟的actions日志文件,将新的agent活动实时更新到Zep图谱中。 + 使用批量更新机制,将多个活动合并后一次性发送,减少API调用次数。 + """ + + # 活动缓冲区大小(达到此数量后批量发送) + BATCH_SIZE = 10 + + # 最大等待时间(秒),即使未达到BATCH_SIZE也会发送 + MAX_WAIT_TIME = 30 + + # 重试配置 + MAX_RETRIES = 3 + RETRY_DELAY = 2 # 秒 + + def __init__(self, graph_id: str, api_key: Optional[str] = None): + """ + 初始化更新器 + + Args: + graph_id: Zep图谱ID + api_key: Zep API Key(可选,默认从配置读取) + """ + self.graph_id = graph_id + self.api_key = api_key or Config.ZEP_API_KEY + + if not self.api_key: + raise ValueError("ZEP_API_KEY未配置") + + self.client = Zep(api_key=self.api_key) + + # 活动队列 + self._activity_queue: Queue = Queue() + + # 控制标志 + self._running = False + self._worker_thread: Optional[threading.Thread] = None + + # 统计 + self._total_activities = 0 + self._total_sent = 0 + self._failed_count = 0 + + logger.info(f"ZepGraphMemoryUpdater 初始化完成: graph_id={graph_id}") + + def start(self): + """启动后台工作线程""" + if self._running: + return + + self._running = True + self._worker_thread = threading.Thread( + target=self._worker_loop, + daemon=True, + name=f"ZepMemoryUpdater-{self.graph_id[:8]}" + ) + self._worker_thread.start() + logger.info(f"ZepGraphMemoryUpdater 已启动: graph_id={self.graph_id}") + + def stop(self): + """停止后台工作线程""" + self._running = False + + # 发送剩余的活动 + self._flush_remaining() + + if self._worker_thread and self._worker_thread.is_alive(): + self._worker_thread.join(timeout=10) + + logger.info(f"ZepGraphMemoryUpdater 已停止: graph_id={self.graph_id}, " + f"total_activities={self._total_activities}, " + f"total_sent={self._total_sent}, " + f"failed={self._failed_count}") + + def add_activity(self, activity: AgentActivity): + """ + 添加一个agent活动到队列 + + Args: + activity: Agent活动记录 + """ + # 跳过DO_NOTHING类型的活动 + if activity.action_type == "DO_NOTHING": + return + + self._activity_queue.put(activity) + self._total_activities += 1 + + def add_activity_from_dict(self, data: Dict[str, Any], platform: str): + """ + 从字典数据添加活动 + + Args: + data: 从actions.jsonl解析的字典数据 + platform: 平台名称 (twitter/reddit) + """ + # 跳过事件类型的条目 + if "event_type" in data: + return + + activity = AgentActivity( + platform=platform, + agent_id=data.get("agent_id", 0), + agent_name=data.get("agent_name", ""), + action_type=data.get("action_type", ""), + action_args=data.get("action_args", {}), + round_num=data.get("round", 0), + timestamp=data.get("timestamp", datetime.now().isoformat()), + ) + + self.add_activity(activity) + + def _worker_loop(self): + """后台工作循环""" + buffer: List[AgentActivity] = [] + last_send_time = time.time() + + while self._running or not self._activity_queue.empty(): + try: + # 尝试从队列获取活动(超时1秒) + try: + activity = self._activity_queue.get(timeout=1) + buffer.append(activity) + except Empty: + pass + + # 检查是否应该发送批次 + current_time = time.time() + should_send = ( + len(buffer) >= self.BATCH_SIZE or + (len(buffer) > 0 and current_time - last_send_time >= self.MAX_WAIT_TIME) + ) + + if should_send: + self._send_batch(buffer) + buffer = [] + last_send_time = current_time + + except Exception as e: + logger.error(f"工作循环异常: {e}") + time.sleep(1) + + # 发送剩余的活动 + if buffer: + self._send_batch(buffer) + + def _send_batch(self, activities: List[AgentActivity]): + """ + 批量发送活动到Zep图谱 + + Args: + activities: 活动列表 + """ + if not activities: + return + + # 将所有活动合并为一段文本 + episode_text = "\n".join([a.to_episode_text() for a in activities]) + + # 带重试的发送 + for attempt in range(self.MAX_RETRIES): + try: + self.client.graph.add( + graph_id=self.graph_id, + type="text", + data=episode_text + ) + + self._total_sent += len(activities) + logger.debug(f"成功发送 {len(activities)} 条活动到图谱 {self.graph_id}") + return + + except Exception as e: + if attempt < self.MAX_RETRIES - 1: + logger.warning(f"发送到Zep失败 (尝试 {attempt + 1}/{self.MAX_RETRIES}): {e}") + time.sleep(self.RETRY_DELAY * (attempt + 1)) + else: + logger.error(f"发送到Zep失败,已重试{self.MAX_RETRIES}次: {e}") + self._failed_count += len(activities) + + def _flush_remaining(self): + """发送队列中剩余的活动""" + remaining = [] + while not self._activity_queue.empty(): + try: + remaining.append(self._activity_queue.get_nowait()) + except Empty: + break + + if remaining: + self._send_batch(remaining) + + def get_stats(self) -> Dict[str, Any]: + """获取统计信息""" + return { + "graph_id": self.graph_id, + "total_activities": self._total_activities, + "total_sent": self._total_sent, + "failed_count": self._failed_count, + "queue_size": self._activity_queue.qsize(), + "running": self._running, + } + + +class ZepGraphMemoryManager: + """ + 管理多个模拟的Zep图谱记忆更新器 + + 每个模拟可以有自己的更新器实例 + """ + + _updaters: Dict[str, ZepGraphMemoryUpdater] = {} + _lock = threading.Lock() + + @classmethod + def create_updater(cls, simulation_id: str, graph_id: str) -> ZepGraphMemoryUpdater: + """ + 为模拟创建图谱记忆更新器 + + Args: + simulation_id: 模拟ID + graph_id: Zep图谱ID + + Returns: + ZepGraphMemoryUpdater实例 + """ + with cls._lock: + # 如果已存在,先停止旧的 + if simulation_id in cls._updaters: + cls._updaters[simulation_id].stop() + + updater = ZepGraphMemoryUpdater(graph_id) + updater.start() + cls._updaters[simulation_id] = updater + + logger.info(f"创建图谱记忆更新器: simulation_id={simulation_id}, graph_id={graph_id}") + return updater + + @classmethod + def get_updater(cls, simulation_id: str) -> Optional[ZepGraphMemoryUpdater]: + """获取模拟的更新器""" + return cls._updaters.get(simulation_id) + + @classmethod + def stop_updater(cls, simulation_id: str): + """停止并移除模拟的更新器""" + with cls._lock: + if simulation_id in cls._updaters: + cls._updaters[simulation_id].stop() + del cls._updaters[simulation_id] + logger.info(f"已停止图谱记忆更新器: simulation_id={simulation_id}") + + @classmethod + def stop_all(cls): + """停止所有更新器""" + with cls._lock: + for simulation_id, updater in list(cls._updaters.items()): + try: + updater.stop() + except Exception as e: + logger.error(f"停止更新器失败: simulation_id={simulation_id}, error={e}") + cls._updaters.clear() + logger.info("已停止所有图谱记忆更新器") + + @classmethod + def get_all_stats(cls) -> Dict[str, Dict[str, Any]]: + """获取所有更新器的统计信息""" + return { + sim_id: updater.get_stats() + for sim_id, updater in cls._updaters.items() + }