diff --git a/backend/README.md b/backend/README.md index e52ace4..5eaffbe 100644 --- a/backend/README.md +++ b/backend/README.md @@ -598,8 +598,8 @@ backend/ 启用 `enable_graph_memory_update` 后: - 模拟中所有Agent的活动(发帖、评论、点赞、转发等)会实时更新到Zep图谱 -- 活动会被转换为自然语言描述,例如:`[Twitter模拟 第15轮] 张三: 发布了一条帖子:「...」` -- 采用批量更新机制(默认10条或30秒),减少API调用次数 +- 每条活动单独发送,确保Zep能正确解析实体和关系 +- 活动会被转换为自然语言描述,例如:`张三: 发布了一条帖子:「...」` - Zep会自动从文本中提取实体和关系,丰富图谱知识 - 需要项目已构建有效的图谱(graph_id) @@ -1295,12 +1295,12 @@ class AgentActivity: def to_episode_text(self) -> str: """ - 将活动转换为自然语言描述 + 将活动转换为自然语言描述(不添加模拟前缀) 示例输出: - - "[Twitter模拟 第15轮] 张三: 发布了一条帖子:「官方声明:...」" - - "[Reddit模拟 第3轮] 李四: 在帖子#5下评论道:「我认为...」" - - "[Twitter模拟 第10轮] 王五: 引用帖子#3并评论:「同意!」" + - "张三: 发布了一条帖子:「官方声明:...」" + - "李四: 在帖子#5下评论道:「我认为...」" + - "王五: 引用帖子#3并评论:「同意!」" """ ``` @@ -1310,10 +1310,11 @@ class ZepGraphMemoryUpdater: 图谱记忆更新器 特性: - - 批量更新(BATCH_SIZE=10条或MAX_WAIT_TIME=30秒) + - 逐条发送活动到Zep,确保图谱正确解析 - 后台线程异步处理,不阻塞主模拟流程 - 带重试的API调用(MAX_RETRIES=3) - 自动跳过DO_NOTHING类型的活动 + - 发送间隔控制(SEND_INTERVAL=0.5秒) """ def start(self): @@ -1381,16 +1382,16 @@ POST /api/simulation/start } ``` -启用后,模拟中的活动会被转换为类似以下格式的文本并发送到Zep: +启用后,模拟中的活动会被逐条转换为自然语言描述并发送到Zep: ``` -[Twitter模拟 第0轮] 上级: 发布了一条帖子:「官方声明:经复核并结合司法判决,校方决定撤销对肖某某的处分。学校向当事人致以正式歉意...」 -[Twitter模拟 第0轮] 全国顶尖新闻传播学院的大学: 发布了一条帖子:「武汉大学官方发布:学校已决定撤销此前对当事人的处分...」 -[Twitter模拟 第15轮] 全国考生: 引用帖子#5并评论 -[Reddit模拟 第3轮] 教师代表: 在帖子#2下评论道:「此事暴露出高校在程序正义上的问题...」 +上级: 发布了一条帖子:「官方声明:经复核并结合司法判决,校方决定撤销对肖某某的处分。学校向当事人致以正式歉意...」 +全国顶尖新闻传播学院的大学: 发布了一条帖子:「武汉大学官方发布:学校已决定撤销此前对当事人的处分...」 +全国考生: 引用帖子#5并评论 +教师代表: 在帖子#2下评论道:「此事暴露出高校在程序正义上的问题...」 ``` -Zep会自动从这些文本中提取实体(如人名、机构名)和关系,丰富图谱知识。 +每条活动单独发送,确保Zep能正确从文本中提取实体(如人名、机构名)和关系,丰富图谱知识。 --- diff --git a/backend/app/services/zep_graph_memory_updater.py b/backend/app/services/zep_graph_memory_updater.py index c9a1b5c..b73b8f1 100644 --- a/backend/app/services/zep_graph_memory_updater.py +++ b/backend/app/services/zep_graph_memory_updater.py @@ -36,9 +36,8 @@ class AgentActivity: 将活动转换为可以发送给Zep的文本描述 采用自然语言描述格式,让Zep能够从中提取实体和关系 + 不添加模拟相关的前缀,避免误导图谱更新 """ - platform_name = "Twitter" if self.platform == "twitter" else "Reddit" - # 根据不同的动作类型生成不同的描述 action_descriptions = { "CREATE_POST": self._describe_create_post, @@ -58,8 +57,8 @@ class AgentActivity: describe_func = action_descriptions.get(self.action_type, self._describe_generic) description = describe_func() - # 添加时间和平台上下文 - return f"[{platform_name}模拟 第{self.round_num}轮] {self.agent_name}: {description}" + # 直接返回 "agent名称: 活动描述" 格式,不添加模拟前缀 + return f"{self.agent_name}: {description}" def _describe_create_post(self) -> str: content = self.action_args.get("content", "") @@ -137,14 +136,11 @@ class ZepGraphMemoryUpdater: Zep图谱记忆更新器 监控模拟的actions日志文件,将新的agent活动实时更新到Zep图谱中。 - 使用批量更新机制,将多个活动合并后一次性发送,减少API调用次数。 + 每条活动单独发送到Zep,确保图谱能正确解析实体和关系。 """ - # 活动缓冲区大小(达到此数量后批量发送) - BATCH_SIZE = 10 - - # 最大等待时间(秒),即使未达到BATCH_SIZE也会发送 - MAX_WAIT_TIME = 30 + # 发送间隔(秒),避免请求过快 + SEND_INTERVAL = 0.5 # 重试配置 MAX_RETRIES = 3 @@ -248,51 +244,31 @@ class ZepGraphMemoryUpdater: self.add_activity(activity) def _worker_loop(self): - """后台工作循环""" - buffer: List[AgentActivity] = [] - last_send_time = time.time() - + """后台工作循环 - 逐条发送活动到Zep""" while self._running or not self._activity_queue.empty(): try: # 尝试从队列获取活动(超时1秒) try: activity = self._activity_queue.get(timeout=1) - buffer.append(activity) + # 立即发送单条活动 + self._send_single_activity(activity) + # 发送间隔,避免请求过快 + time.sleep(self.SEND_INTERVAL) except Empty: pass - - # 检查是否应该发送批次 - current_time = time.time() - should_send = ( - len(buffer) >= self.BATCH_SIZE or - (len(buffer) > 0 and current_time - last_send_time >= self.MAX_WAIT_TIME) - ) - - if should_send: - self._send_batch(buffer) - buffer = [] - last_send_time = current_time except Exception as e: logger.error(f"工作循环异常: {e}") time.sleep(1) - - # 发送剩余的活动 - if buffer: - self._send_batch(buffer) - def _send_batch(self, activities: List[AgentActivity]): + def _send_single_activity(self, activity: AgentActivity): """ - 批量发送活动到Zep图谱 + 发送单条活动到Zep图谱 Args: - activities: 活动列表 + activity: 单条Agent活动 """ - if not activities: - return - - # 将所有活动合并为一段文本 - episode_text = "\n".join([a.to_episode_text() for a in activities]) + episode_text = activity.to_episode_text() # 带重试的发送 for attempt in range(self.MAX_RETRIES): @@ -303,8 +279,8 @@ class ZepGraphMemoryUpdater: data=episode_text ) - self._total_sent += len(activities) - logger.debug(f"成功发送 {len(activities)} 条活动到图谱 {self.graph_id}") + self._total_sent += 1 + logger.debug(f"成功发送活动到图谱 {self.graph_id}: {episode_text[:50]}...") return except Exception as e: @@ -313,19 +289,16 @@ class ZepGraphMemoryUpdater: time.sleep(self.RETRY_DELAY * (attempt + 1)) else: logger.error(f"发送到Zep失败,已重试{self.MAX_RETRIES}次: {e}") - self._failed_count += len(activities) + self._failed_count += 1 def _flush_remaining(self): - """发送队列中剩余的活动""" - remaining = [] + """发送队列中剩余的活动(逐条发送)""" while not self._activity_queue.empty(): try: - remaining.append(self._activity_queue.get_nowait()) + activity = self._activity_queue.get_nowait() + self._send_single_activity(activity) except Empty: break - - if remaining: - self._send_batch(remaining) def get_stats(self) -> Dict[str, Any]: """获取统计信息"""