""" 动作日志记录器 用于记录OASIS模拟中每个Agent的动作,供后端监控使用 """ import json import os from datetime import datetime from typing import Dict, Any, Optional class ActionLogger: """动作日志记录器""" def __init__(self, log_path: str): """ 初始化日志记录器 Args: log_path: 日志文件路径(.jsonl格式) """ self.log_path = log_path self._ensure_dir() def _ensure_dir(self): """确保目录存在""" log_dir = os.path.dirname(self.log_path) if log_dir: os.makedirs(log_dir, exist_ok=True) def log_action( self, round_num: int, platform: str, agent_id: int, agent_name: str, action_type: str, action_args: Optional[Dict[str, Any]] = None, result: Optional[str] = None, success: bool = True ): """ 记录一个动作 Args: round_num: 轮次 platform: 平台 (twitter/reddit) agent_id: Agent ID agent_name: Agent名称 action_type: 动作类型 action_args: 动作参数 result: 执行结果 success: 是否成功 """ entry = { "round": round_num, "timestamp": datetime.now().isoformat(), "platform": platform, "agent_id": agent_id, "agent_name": agent_name, "action_type": action_type, "action_args": action_args or {}, "result": result, "success": success, } with open(self.log_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') def log_round_start(self, round_num: int, simulated_hour: int, platform: str): """记录轮次开始""" entry = { "round": round_num, "timestamp": datetime.now().isoformat(), "platform": platform, "event_type": "round_start", "simulated_hour": simulated_hour, } with open(self.log_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') def log_round_end(self, round_num: int, actions_count: int, platform: str): """记录轮次结束""" entry = { "round": round_num, "timestamp": datetime.now().isoformat(), "platform": platform, "event_type": "round_end", "actions_count": actions_count, } with open(self.log_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') def log_simulation_start(self, platform: str, config: Dict[str, Any]): """记录模拟开始""" entry = { "timestamp": datetime.now().isoformat(), "platform": platform, "event_type": "simulation_start", "total_rounds": config.get("time_config", {}).get("total_simulation_hours", 72) * 2, "agents_count": len(config.get("agent_configs", [])), } with open(self.log_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') def log_simulation_end(self, platform: str, total_rounds: int, total_actions: int): """记录模拟结束""" entry = { "timestamp": datetime.now().isoformat(), "platform": platform, "event_type": "simulation_end", "total_rounds": total_rounds, "total_actions": total_actions, } with open(self.log_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') # 全局日志实例(可选) _global_logger: Optional[ActionLogger] = None def get_logger(log_path: Optional[str] = None) -> ActionLogger: """获取全局日志实例""" global _global_logger if log_path: _global_logger = ActionLogger(log_path) if _global_logger is None: _global_logger = ActionLogger("actions.jsonl") return _global_logger