Implement dynamic graph memory update feature for simulations

- Added a new optional parameter `enable_graph_memory_update` to the simulation API, allowing real-time updates of agent activities to the Zep knowledge graph.
- Introduced `ZepGraphMemoryUpdater` and `ZepGraphMemoryManager` classes to handle the background processing of activity updates, ensuring efficient API calls and data management.
- Updated the README.md to include detailed instructions on the new graph memory update functionality and its configuration.
- Enhanced the simulation runner to manage the lifecycle of the graph memory updater, including starting and stopping the updater based on user configuration.
- Improved logging to track the status of graph memory updates, providing better insights into the simulation process.
This commit is contained in:
666ghj 2025-12-05 17:53:45 +08:00
parent e4761dab06
commit e3768e2707
5 changed files with 670 additions and 18 deletions

View file

@ -27,6 +27,7 @@
3. **Agent人设生成**: 基于图谱实体,使用 LLM 生成详细的社交媒体用户人设
4. **模拟配置智能生成**: 使用 LLM 根据需求自动生成模拟参数(时间、活跃度、事件等)
5. **双平台模拟**: 支持 Twitter 和 Reddit 双平台并行舆论模拟(基于 OASIS 框架)
6. **图谱记忆动态更新**: 可选功能,将模拟中Agent的活动实时更新到Zep图谱,让图谱"记住"模拟过程
---
@ -69,7 +70,7 @@
3. **模拟运行流程**:
```
启动模拟 → 运行OASIS脚本 → 实时监控 → 记录动作 → 状态查询
启动模拟 → 运行OASIS脚本 → 实时监控 → 记录动作 → (可选)更新Zep图谱记忆 → 状态查询
```
---
@ -150,7 +151,8 @@ backend/
│ ├── oasis_profile_generator.py # 人设生成
│ ├── simulation_config_generator.py # 配置生成
│ ├── simulation_manager.py # 模拟管理
│ └── simulation_runner.py # 模拟运行
│ ├── simulation_runner.py # 模拟运行
│ └── zep_graph_memory_updater.py # 图谱记忆动态更新
└── utils/ # 工具类
├── __init__.py
├── file_parser.py # 文件解析
@ -207,11 +209,13 @@ backend/
2. 启动 OASIS 模拟进程(subprocess)
3. 监控进程运行状态
4. 解析动作日志(actions.jsonl)
5. 实时更新运行状态
6. 支持停止/暂停/恢复
5. (可选)将Agent活动实时更新到Zep图谱
6. 实时更新运行状态
7. 支持停止/暂停/恢复
**核心服务**:
- `SimulationRunner`: 模拟运行器
- `ZepGraphMemoryUpdater`: 图谱记忆动态更新器
---
@ -555,7 +559,8 @@ backend/
{
"simulation_id": "sim_10b494550540",
"platform": "parallel",
"max_rounds": 100
"max_rounds": 100,
"enable_graph_memory_update": false
}
```
@ -563,7 +568,8 @@ backend/
|------|------|------|--------|------|
| simulation_id | String | 是 | - | 模拟ID |
| platform | String | 否 | parallel | 运行平台: twitter/reddit/parallel |
| max_rounds | Integer | 否 | - | 最大模拟轮数,用于截断过长的模拟。如果配置中的轮数超过此值,将被截断 |
| max_rounds | Integer | 否 | - | 最大模拟轮数,用于截断过长的模拟 |
| enable_graph_memory_update | Boolean | 否 | false | 是否将Agent活动动态更新到Zep图谱 |
**返回示例**:
```json
@ -577,12 +583,25 @@ backend/
"reddit_running": true,
"started_at": "2025-12-02T11:00:00",
"total_rounds": 100,
"max_rounds_applied": 100
"max_rounds_applied": 100,
"graph_memory_update_enabled": true,
"graph_id": "mirofish_abc123"
}
}
```
> **说明**: `max_rounds_applied` 字段仅在指定了 `max_rounds` 参数时返回,表示实际应用的最大轮数限制。
> **说明**:
> - `max_rounds_applied` 字段仅在指定了 `max_rounds` 参数时返回
> - `graph_memory_update_enabled``graph_id` 字段在启用图谱记忆更新时返回
**图谱记忆更新功能说明**:
启用 `enable_graph_memory_update` 后:
- 模拟中所有Agent的活动(发帖、评论、点赞、转发等)会实时更新到Zep图谱
- 活动会被转换为自然语言描述,例如:`[Twitter模拟 第15轮] 张三: 发布了一条帖子:「...」`
- 采用批量更新机制(默认10条或30秒),减少API调用次数
- Zep会自动从文本中提取实体和关系,丰富图谱知识
- 需要项目已构建有效的图谱(graph_id)
---
@ -1255,6 +1274,126 @@ def cleanup_all_simulations(cls):
---
### 8. ZepGraphMemoryUpdater (图谱记忆更新器)
**文件**: `app/services/zep_graph_memory_updater.py`
**功能**: 将模拟中的Agent活动动态更新到Zep图谱
**核心类**:
```python
class AgentActivity:
"""Agent活动记录"""
platform: str # twitter / reddit
agent_id: int
agent_name: str
action_type: str # CREATE_POST, LIKE_POST, etc.
action_args: Dict
round_num: int
timestamp: str
def to_episode_text(self) -> str:
"""
将活动转换为自然语言描述
示例输出:
- "[Twitter模拟 第15轮] 张三: 发布了一条帖子:「官方声明:...」"
- "[Reddit模拟 第3轮] 李四: 在帖子#5下评论道「我认为...」"
- "[Twitter模拟 第10轮] 王五: 引用帖子#3并评论「同意」"
"""
```
```python
class ZepGraphMemoryUpdater:
"""
图谱记忆更新器
特性:
- 批量更新(BATCH_SIZE=10条或MAX_WAIT_TIME=30秒)
- 后台线程异步处理,不阻塞主模拟流程
- 带重试的API调用(MAX_RETRIES=3)
- 自动跳过DO_NOTHING类型的活动
"""
def start(self):
"""启动后台工作线程"""
def stop(self):
"""停止并发送剩余活动"""
def add_activity(self, activity: AgentActivity):
"""添加活动到队列"""
def add_activity_from_dict(self, data: Dict, platform: str):
"""从动作日志字典添加活动"""
def get_stats(self) -> Dict:
"""获取统计信息(total_activities, total_sent, failed_count等)"""
```
```python
class ZepGraphMemoryManager:
"""
管理多个模拟的更新器实例
"""
@classmethod
def create_updater(cls, simulation_id: str, graph_id: str) -> ZepGraphMemoryUpdater:
"""为模拟创建并启动更新器"""
@classmethod
def get_updater(cls, simulation_id: str) -> Optional[ZepGraphMemoryUpdater]:
"""获取模拟的更新器"""
@classmethod
def stop_updater(cls, simulation_id: str):
"""停止并移除模拟的更新器"""
@classmethod
def stop_all(cls):
"""停止所有更新器(服务器关闭时调用)"""
```
**活动类型转换**:
| action_type | 转换后的描述 |
|-------------|-------------|
| CREATE_POST | 发布了一条帖子:「{content}」 |
| LIKE_POST | 点赞了帖子#{post_id} |
| DISLIKE_POST | 踩了帖子#{post_id} |
| REPOST | 转发了帖子#{post_id} |
| QUOTE_POST | 引用帖子#{quoted_id}并评论:「{content}」 |
| FOLLOW | 关注了用户#{user_id} |
| CREATE_COMMENT | 在帖子#{post_id}下评论道:「{content}」 |
| LIKE_COMMENT | 点赞了评论#{comment_id} |
| SEARCH_POSTS | 搜索了「{query}」 |
| MUTE | 屏蔽了用户#{user_id} |
**使用示例**:
```python
# 在启动模拟时启用图谱记忆更新
POST /api/simulation/start
{
"simulation_id": "sim_xxx",
"enable_graph_memory_update": true
}
```
启用后,模拟中的活动会被转换为类似以下格式的文本并发送到Zep:
```
[Twitter模拟 第0轮] 上级: 发布了一条帖子:「官方声明:经复核并结合司法判决,校方决定撤销对肖某某的处分。学校向当事人致以正式歉意...」
[Twitter模拟 第0轮] 全国顶尖新闻传播学院的大学: 发布了一条帖子:「武汉大学官方发布:学校已决定撤销此前对当事人的处分...」
[Twitter模拟 第15轮] 全国考生: 引用帖子#5并评论
[Reddit模拟 第3轮] 教师代表: 在帖子#2下评论道「此事暴露出高校在程序正义上的问题...」
```
Zep会自动从这些文本中提取实体(如人名、机构名)和关系,丰富图谱知识。
---
## 工具类
### 1. FileParser (文件解析器)
@ -1508,13 +1647,14 @@ curl -X POST http://localhost:5001/api/simulation/prepare/status \
# 等待status=completed
# Step 7: 启动模拟(可选指定max_rounds限制轮数
# Step 7: 启动模拟(可选参数max_rounds限制轮数enable_graph_memory_update启用图谱记忆更新
curl -X POST http://localhost:5001/api/simulation/start \
-H "Content-Type: application/json" \
-d '{
"simulation_id": "sim_xxx",
"platform": "parallel",
"max_rounds": 50
"max_rounds": 50,
"enable_graph_memory_update": true
}'
# Step 8: 实时查询运行状态
@ -1689,6 +1829,6 @@ MIT License
---
**最后更新**: 2025-12-02
**版本**: v1.0.0
**最后更新**: 2025-12-05
**版本**: v1.1.0

View file

@ -1113,11 +1113,18 @@ def start_simulation():
请求JSON
{
"simulation_id": "sim_xxxx", // 必填模拟ID
"platform": "parallel", // 可选: twitter / reddit / parallel (默认)
"max_rounds": 100 // 可选: 最大模拟轮数用于截断过长的模拟
"simulation_id": "sim_xxxx", // 必填模拟ID
"platform": "parallel", // 可选: twitter / reddit / parallel (默认)
"max_rounds": 100, // 可选: 最大模拟轮数用于截断过长的模拟
"enable_graph_memory_update": false // 可选: 是否将Agent活动动态更新到Zep图谱记忆
}
关于 enable_graph_memory_update
- 启用后模拟中所有Agent的活动发帖评论点赞等都会实时更新到Zep图谱
- 这可以让图谱"记住"模拟过程用于后续分析或AI对话
- 需要模拟关联的项目有有效的 graph_id
- 采用批量更新机制减少API调用次数
返回
{
"success": true,
@ -1127,7 +1134,8 @@ def start_simulation():
"process_pid": 12345,
"twitter_running": true,
"reddit_running": true,
"started_at": "2025-12-01T10:00:00"
"started_at": "2025-12-01T10:00:00",
"graph_memory_update_enabled": true // 是否启用了图谱记忆更新
}
}
"""
@ -1143,6 +1151,7 @@ def start_simulation():
platform = data.get('platform', 'parallel')
max_rounds = data.get('max_rounds') # 可选:最大模拟轮数
enable_graph_memory_update = data.get('enable_graph_memory_update', False) # 可选:是否启用图谱记忆更新
# 验证 max_rounds 参数
if max_rounds is not None:
@ -1203,8 +1212,33 @@ def start_simulation():
"error": f"模拟未准备好,当前状态: {state.status.value},请先调用 /prepare 接口"
}), 400
# 获取图谱ID用于图谱记忆更新
graph_id = None
if enable_graph_memory_update:
# 从模拟状态或项目中获取 graph_id
graph_id = state.graph_id
if not graph_id:
# 尝试从项目中获取
project = ProjectManager.get_project(state.project_id)
if project:
graph_id = project.graph_id
if not graph_id:
return jsonify({
"success": False,
"error": "启用图谱记忆更新需要有效的 graph_id请确保项目已构建图谱"
}), 400
logger.info(f"启用图谱记忆更新: simulation_id={simulation_id}, graph_id={graph_id}")
# 启动模拟
run_state = SimulationRunner.start_simulation(simulation_id, platform, max_rounds)
run_state = SimulationRunner.start_simulation(
simulation_id=simulation_id,
platform=platform,
max_rounds=max_rounds,
enable_graph_memory_update=enable_graph_memory_update,
graph_id=graph_id
)
# 更新模拟状态
state.status = SimulationStatus.RUNNING
@ -1213,6 +1247,9 @@ def start_simulation():
response_data = run_state.to_dict()
if max_rounds:
response_data['max_rounds_applied'] = max_rounds
response_data['graph_memory_update_enabled'] = enable_graph_memory_update
if enable_graph_memory_update:
response_data['graph_id'] = graph_id
return jsonify({
"success": True,

View file

@ -23,6 +23,11 @@ from .simulation_runner import (
AgentAction,
RoundSummary
)
from .zep_graph_memory_updater import (
ZepGraphMemoryUpdater,
ZepGraphMemoryManager,
AgentActivity
)
__all__ = [
'OntologyGenerator',
@ -47,5 +52,8 @@ __all__ = [
'RunnerStatus',
'AgentAction',
'RoundSummary',
'ZepGraphMemoryUpdater',
'ZepGraphMemoryManager',
'AgentActivity',
]

View file

@ -20,6 +20,7 @@ from queue import Queue
from ..config import Config
from ..utils.logger import get_logger
from .zep_graph_memory_updater import ZepGraphMemoryManager
logger = get_logger('mirofish.simulation_runner')
@ -201,6 +202,9 @@ class SimulationRunner:
_stdout_files: Dict[str, Any] = {} # 存储 stdout 文件句柄
_stderr_files: Dict[str, Any] = {} # 存储 stderr 文件句柄
# 图谱记忆更新配置
_graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled
@classmethod
def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]:
"""获取运行状态"""
@ -281,7 +285,9 @@ class SimulationRunner:
cls,
simulation_id: str,
platform: str = "parallel", # twitter / reddit / parallel
max_rounds: int = None # 最大模拟轮数(可选,用于截断过长的模拟)
max_rounds: int = None, # 最大模拟轮数(可选,用于截断过长的模拟)
enable_graph_memory_update: bool = False, # 是否将活动更新到Zep图谱
graph_id: str = None # Zep图谱ID启用图谱更新时必需
) -> SimulationRunState:
"""
启动模拟
@ -290,6 +296,8 @@ class SimulationRunner:
simulation_id: 模拟ID
platform: 运行平台 (twitter/reddit/parallel)
max_rounds: 最大模拟轮数可选用于截断过长的模拟
enable_graph_memory_update: 是否将Agent活动动态更新到Zep图谱
graph_id: Zep图谱ID启用图谱更新时必需
Returns:
SimulationRunState
@ -332,6 +340,21 @@ class SimulationRunner:
cls._save_run_state(state)
# 如果启用图谱记忆更新,创建更新器
if enable_graph_memory_update:
if not graph_id:
raise ValueError("启用图谱记忆更新时必须提供 graph_id")
try:
ZepGraphMemoryManager.create_updater(simulation_id, graph_id)
cls._graph_memory_enabled[simulation_id] = True
logger.info(f"已启用图谱记忆更新: simulation_id={simulation_id}, graph_id={graph_id}")
except Exception as e:
logger.error(f"创建图谱记忆更新器失败: {e}")
cls._graph_memory_enabled[simulation_id] = False
else:
cls._graph_memory_enabled[simulation_id] = False
# 确定运行哪个脚本(脚本位于 backend/scripts/ 目录)
if platform == "twitter":
script_name = "run_twitter_simulation.py"
@ -489,6 +512,15 @@ class SimulationRunner:
cls._save_run_state(state)
finally:
# 停止图谱记忆更新器
if cls._graph_memory_enabled.get(simulation_id, False):
try:
ZepGraphMemoryManager.stop_updater(simulation_id)
logger.info(f"已停止图谱记忆更新: simulation_id={simulation_id}")
except Exception as e:
logger.error(f"停止图谱记忆更新器失败: {e}")
cls._graph_memory_enabled.pop(simulation_id, None)
# 清理进程资源
cls._processes.pop(simulation_id, None)
cls._action_queues.pop(simulation_id, None)
@ -527,6 +559,12 @@ class SimulationRunner:
Returns:
新的读取位置
"""
# 检查是否启用了图谱记忆更新
graph_memory_enabled = cls._graph_memory_enabled.get(state.simulation_id, False)
graph_updater = None
if graph_memory_enabled:
graph_updater = ZepGraphMemoryManager.get_updater(state.simulation_id)
try:
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
@ -557,6 +595,10 @@ class SimulationRunner:
if action.round_num and action.round_num > state.current_round:
state.current_round = action.round_num
# 如果启用了图谱记忆更新将活动发送到Zep
if graph_updater:
graph_updater.add_activity_from_dict(action_data, platform)
except json.JSONDecodeError:
pass
return f.tell()
@ -615,6 +657,15 @@ class SimulationRunner:
state.completed_at = datetime.now().isoformat()
cls._save_run_state(state)
# 停止图谱记忆更新器
if cls._graph_memory_enabled.get(simulation_id, False):
try:
ZepGraphMemoryManager.stop_updater(simulation_id)
logger.info(f"已停止图谱记忆更新: simulation_id={simulation_id}")
except Exception as e:
logger.error(f"停止图谱记忆更新器失败: {e}")
cls._graph_memory_enabled.pop(simulation_id, None)
logger.info(f"模拟已停止: {simulation_id}")
return state
@ -811,6 +862,14 @@ class SimulationRunner:
"""
logger.info("正在清理所有模拟进程...")
# 首先停止所有图谱记忆更新器
try:
ZepGraphMemoryManager.stop_all()
logger.info("已停止所有图谱记忆更新器")
except Exception as e:
logger.error(f"停止图谱记忆更新器失败: {e}")
cls._graph_memory_enabled.clear()
# 复制字典以避免在迭代时修改
processes = list(cls._processes.items())

View file

@ -0,0 +1,408 @@
"""
Zep图谱记忆更新服务
将模拟中的Agent活动动态更新到Zep图谱中
"""
import os
import time
import threading
import json
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
from queue import Queue, Empty
from zep_cloud.client import Zep
from ..config import Config
from ..utils.logger import get_logger
logger = get_logger('mirofish.zep_graph_memory_updater')
@dataclass
class AgentActivity:
"""Agent活动记录"""
platform: str # twitter / reddit
agent_id: int
agent_name: str
action_type: str # CREATE_POST, LIKE_POST, etc.
action_args: Dict[str, Any]
round_num: int
timestamp: str
def to_episode_text(self) -> str:
"""
将活动转换为可以发送给Zep的文本描述
采用自然语言描述格式让Zep能够从中提取实体和关系
"""
platform_name = "Twitter" if self.platform == "twitter" else "Reddit"
# 根据不同的动作类型生成不同的描述
action_descriptions = {
"CREATE_POST": self._describe_create_post,
"LIKE_POST": self._describe_like_post,
"DISLIKE_POST": self._describe_dislike_post,
"REPOST": self._describe_repost,
"QUOTE_POST": self._describe_quote_post,
"FOLLOW": self._describe_follow,
"CREATE_COMMENT": self._describe_create_comment,
"LIKE_COMMENT": self._describe_like_comment,
"DISLIKE_COMMENT": self._describe_dislike_comment,
"SEARCH_POSTS": self._describe_search,
"SEARCH_USER": self._describe_search_user,
"MUTE": self._describe_mute,
}
describe_func = action_descriptions.get(self.action_type, self._describe_generic)
description = describe_func()
# 添加时间和平台上下文
return f"[{platform_name}模拟 第{self.round_num}轮] {self.agent_name}: {description}"
def _describe_create_post(self) -> str:
content = self.action_args.get("content", "")
if content:
# 截取内容,避免过长
if len(content) > 300:
content = content[:300] + "..."
return f"发布了一条帖子:「{content}"
return "发布了一条帖子"
def _describe_like_post(self) -> str:
post_id = self.action_args.get("post_id", "")
return f"点赞了帖子#{post_id}" if post_id else "点赞了一条帖子"
def _describe_dislike_post(self) -> str:
post_id = self.action_args.get("post_id", "")
return f"踩了帖子#{post_id}" if post_id else "踩了一条帖子"
def _describe_repost(self) -> str:
post_id = self.action_args.get("post_id", "")
return f"转发了帖子#{post_id}" if post_id else "转发了一条帖子"
def _describe_quote_post(self) -> str:
quoted_id = self.action_args.get("quoted_id", "")
content = self.action_args.get("content", "")
if quoted_id:
if content:
return f"引用帖子#{quoted_id}并评论:「{content[:100]}{'...' if len(content) > 100 else ''}"
return f"引用了帖子#{quoted_id}"
return "引用了一条帖子"
def _describe_follow(self) -> str:
target_id = self.action_args.get("user_id", "") or self.action_args.get("target_id", "")
return f"关注了用户#{target_id}" if target_id else "关注了一个用户"
def _describe_create_comment(self) -> str:
content = self.action_args.get("content", "")
post_id = self.action_args.get("post_id", "")
if content:
if len(content) > 200:
content = content[:200] + "..."
base = f"评论道:「{content}"
if post_id:
base = f"在帖子#{post_id}{base}"
return base
return f"在帖子#{post_id}下发表了评论" if post_id else "发表了评论"
def _describe_like_comment(self) -> str:
comment_id = self.action_args.get("comment_id", "")
return f"点赞了评论#{comment_id}" if comment_id else "点赞了一条评论"
def _describe_dislike_comment(self) -> str:
comment_id = self.action_args.get("comment_id", "")
return f"踩了评论#{comment_id}" if comment_id else "踩了一条评论"
def _describe_search(self) -> str:
query = self.action_args.get("query", "") or self.action_args.get("keyword", "")
return f"搜索了「{query}" if query else "进行了搜索"
def _describe_search_user(self) -> str:
query = self.action_args.get("query", "") or self.action_args.get("username", "")
return f"搜索了用户「{query}" if query else "搜索了用户"
def _describe_mute(self) -> str:
target_id = self.action_args.get("user_id", "") or self.action_args.get("target_id", "")
return f"屏蔽了用户#{target_id}" if target_id else "屏蔽了一个用户"
def _describe_generic(self) -> str:
# 对于未知的动作类型,生成通用描述
return f"执行了{self.action_type}操作"
class ZepGraphMemoryUpdater:
"""
Zep图谱记忆更新器
监控模拟的actions日志文件将新的agent活动实时更新到Zep图谱中
使用批量更新机制将多个活动合并后一次性发送减少API调用次数
"""
# 活动缓冲区大小(达到此数量后批量发送)
BATCH_SIZE = 10
# 最大等待时间即使未达到BATCH_SIZE也会发送
MAX_WAIT_TIME = 30
# 重试配置
MAX_RETRIES = 3
RETRY_DELAY = 2 # 秒
def __init__(self, graph_id: str, api_key: Optional[str] = None):
"""
初始化更新器
Args:
graph_id: Zep图谱ID
api_key: Zep API Key可选默认从配置读取
"""
self.graph_id = graph_id
self.api_key = api_key or Config.ZEP_API_KEY
if not self.api_key:
raise ValueError("ZEP_API_KEY未配置")
self.client = Zep(api_key=self.api_key)
# 活动队列
self._activity_queue: Queue = Queue()
# 控制标志
self._running = False
self._worker_thread: Optional[threading.Thread] = None
# 统计
self._total_activities = 0
self._total_sent = 0
self._failed_count = 0
logger.info(f"ZepGraphMemoryUpdater 初始化完成: graph_id={graph_id}")
def start(self):
"""启动后台工作线程"""
if self._running:
return
self._running = True
self._worker_thread = threading.Thread(
target=self._worker_loop,
daemon=True,
name=f"ZepMemoryUpdater-{self.graph_id[:8]}"
)
self._worker_thread.start()
logger.info(f"ZepGraphMemoryUpdater 已启动: graph_id={self.graph_id}")
def stop(self):
"""停止后台工作线程"""
self._running = False
# 发送剩余的活动
self._flush_remaining()
if self._worker_thread and self._worker_thread.is_alive():
self._worker_thread.join(timeout=10)
logger.info(f"ZepGraphMemoryUpdater 已停止: graph_id={self.graph_id}, "
f"total_activities={self._total_activities}, "
f"total_sent={self._total_sent}, "
f"failed={self._failed_count}")
def add_activity(self, activity: AgentActivity):
"""
添加一个agent活动到队列
Args:
activity: Agent活动记录
"""
# 跳过DO_NOTHING类型的活动
if activity.action_type == "DO_NOTHING":
return
self._activity_queue.put(activity)
self._total_activities += 1
def add_activity_from_dict(self, data: Dict[str, Any], platform: str):
"""
从字典数据添加活动
Args:
data: 从actions.jsonl解析的字典数据
platform: 平台名称 (twitter/reddit)
"""
# 跳过事件类型的条目
if "event_type" in data:
return
activity = AgentActivity(
platform=platform,
agent_id=data.get("agent_id", 0),
agent_name=data.get("agent_name", ""),
action_type=data.get("action_type", ""),
action_args=data.get("action_args", {}),
round_num=data.get("round", 0),
timestamp=data.get("timestamp", datetime.now().isoformat()),
)
self.add_activity(activity)
def _worker_loop(self):
"""后台工作循环"""
buffer: List[AgentActivity] = []
last_send_time = time.time()
while self._running or not self._activity_queue.empty():
try:
# 尝试从队列获取活动超时1秒
try:
activity = self._activity_queue.get(timeout=1)
buffer.append(activity)
except Empty:
pass
# 检查是否应该发送批次
current_time = time.time()
should_send = (
len(buffer) >= self.BATCH_SIZE or
(len(buffer) > 0 and current_time - last_send_time >= self.MAX_WAIT_TIME)
)
if should_send:
self._send_batch(buffer)
buffer = []
last_send_time = current_time
except Exception as e:
logger.error(f"工作循环异常: {e}")
time.sleep(1)
# 发送剩余的活动
if buffer:
self._send_batch(buffer)
def _send_batch(self, activities: List[AgentActivity]):
"""
批量发送活动到Zep图谱
Args:
activities: 活动列表
"""
if not activities:
return
# 将所有活动合并为一段文本
episode_text = "\n".join([a.to_episode_text() for a in activities])
# 带重试的发送
for attempt in range(self.MAX_RETRIES):
try:
self.client.graph.add(
graph_id=self.graph_id,
type="text",
data=episode_text
)
self._total_sent += len(activities)
logger.debug(f"成功发送 {len(activities)} 条活动到图谱 {self.graph_id}")
return
except Exception as e:
if attempt < self.MAX_RETRIES - 1:
logger.warning(f"发送到Zep失败 (尝试 {attempt + 1}/{self.MAX_RETRIES}): {e}")
time.sleep(self.RETRY_DELAY * (attempt + 1))
else:
logger.error(f"发送到Zep失败已重试{self.MAX_RETRIES}次: {e}")
self._failed_count += len(activities)
def _flush_remaining(self):
"""发送队列中剩余的活动"""
remaining = []
while not self._activity_queue.empty():
try:
remaining.append(self._activity_queue.get_nowait())
except Empty:
break
if remaining:
self._send_batch(remaining)
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"graph_id": self.graph_id,
"total_activities": self._total_activities,
"total_sent": self._total_sent,
"failed_count": self._failed_count,
"queue_size": self._activity_queue.qsize(),
"running": self._running,
}
class ZepGraphMemoryManager:
"""
管理多个模拟的Zep图谱记忆更新器
每个模拟可以有自己的更新器实例
"""
_updaters: Dict[str, ZepGraphMemoryUpdater] = {}
_lock = threading.Lock()
@classmethod
def create_updater(cls, simulation_id: str, graph_id: str) -> ZepGraphMemoryUpdater:
"""
为模拟创建图谱记忆更新器
Args:
simulation_id: 模拟ID
graph_id: Zep图谱ID
Returns:
ZepGraphMemoryUpdater实例
"""
with cls._lock:
# 如果已存在,先停止旧的
if simulation_id in cls._updaters:
cls._updaters[simulation_id].stop()
updater = ZepGraphMemoryUpdater(graph_id)
updater.start()
cls._updaters[simulation_id] = updater
logger.info(f"创建图谱记忆更新器: simulation_id={simulation_id}, graph_id={graph_id}")
return updater
@classmethod
def get_updater(cls, simulation_id: str) -> Optional[ZepGraphMemoryUpdater]:
"""获取模拟的更新器"""
return cls._updaters.get(simulation_id)
@classmethod
def stop_updater(cls, simulation_id: str):
"""停止并移除模拟的更新器"""
with cls._lock:
if simulation_id in cls._updaters:
cls._updaters[simulation_id].stop()
del cls._updaters[simulation_id]
logger.info(f"已停止图谱记忆更新器: simulation_id={simulation_id}")
@classmethod
def stop_all(cls):
"""停止所有更新器"""
with cls._lock:
for simulation_id, updater in list(cls._updaters.items()):
try:
updater.stop()
except Exception as e:
logger.error(f"停止更新器失败: simulation_id={simulation_id}, error={e}")
cls._updaters.clear()
logger.info("已停止所有图谱记忆更新器")
@classmethod
def get_all_stats(cls) -> Dict[str, Dict[str, Any]]:
"""获取所有更新器的统计信息"""
return {
sim_id: updater.get_stats()
for sim_id, updater in cls._updaters.items()
}