MiroFish/backend/scripts/action_logger.py
666ghj d4fac63eb4 Enhance simulation management and logging features
- Registered a cleanup function for simulation processes to ensure proper termination on server shutdown.
- Improved logging during application startup to confirm the registration of the cleanup function.
- Updated simulation preparation checks to clarify the conditions for considering a simulation ready, enhancing error handling and user feedback.
- Added detailed logging for simulation status changes, improving traceability during the simulation lifecycle.
- Introduced new files for simulation configuration and profile data, supporting enhanced testing and visualization capabilities.
2025-12-02 17:11:47 +08:00

305 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动作日志记录器
用于记录OASIS模拟中每个Agent的动作供后端监控使用
日志结构:
sim_xxx/
├── twitter/
│ └── actions.jsonl # Twitter 平台动作日志
├── reddit/
│ └── actions.jsonl # Reddit 平台动作日志
├── simulation.log # 主模拟进程日志
└── run_state.json # 运行状态API 查询用)
"""
import json
import os
import logging
from datetime import datetime
from typing import Dict, Any, Optional
class PlatformActionLogger:
"""单平台动作日志记录器"""
def __init__(self, platform: str, base_dir: str):
"""
初始化日志记录器
Args:
platform: 平台名称 (twitter/reddit)
base_dir: 模拟目录的基础路径
"""
self.platform = platform
self.base_dir = base_dir
self.log_dir = os.path.join(base_dir, platform)
self.log_path = os.path.join(self.log_dir, "actions.jsonl")
self._ensure_dir()
def _ensure_dir(self):
"""确保目录存在"""
os.makedirs(self.log_dir, exist_ok=True)
def log_action(
self,
round_num: int,
agent_id: int,
agent_name: str,
action_type: str,
action_args: Optional[Dict[str, Any]] = None,
result: Optional[str] = None,
success: bool = True
):
"""记录一个动作"""
entry = {
"round": round_num,
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"agent_name": agent_name,
"action_type": action_type,
"action_args": action_args or {},
"result": result,
"success": success,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_round_start(self, round_num: int, simulated_hour: int):
"""记录轮次开始"""
entry = {
"round": round_num,
"timestamp": datetime.now().isoformat(),
"event_type": "round_start",
"simulated_hour": simulated_hour,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_round_end(self, round_num: int, actions_count: int):
"""记录轮次结束"""
entry = {
"round": round_num,
"timestamp": datetime.now().isoformat(),
"event_type": "round_end",
"actions_count": actions_count,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_simulation_start(self, config: Dict[str, Any]):
"""记录模拟开始"""
entry = {
"timestamp": datetime.now().isoformat(),
"event_type": "simulation_start",
"platform": self.platform,
"total_rounds": config.get("time_config", {}).get("total_simulation_hours", 72) * 2,
"agents_count": len(config.get("agent_configs", [])),
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_simulation_end(self, total_rounds: int, total_actions: int):
"""记录模拟结束"""
entry = {
"timestamp": datetime.now().isoformat(),
"event_type": "simulation_end",
"platform": self.platform,
"total_rounds": total_rounds,
"total_actions": total_actions,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
class SimulationLogManager:
"""
模拟日志管理器
统一管理所有日志文件,按平台分离
"""
def __init__(self, simulation_dir: str):
"""
初始化日志管理器
Args:
simulation_dir: 模拟目录路径
"""
self.simulation_dir = simulation_dir
self.twitter_logger: Optional[PlatformActionLogger] = None
self.reddit_logger: Optional[PlatformActionLogger] = None
self._main_logger: Optional[logging.Logger] = None
# 设置主日志
self._setup_main_logger()
def _setup_main_logger(self):
"""设置主模拟日志"""
log_path = os.path.join(self.simulation_dir, "simulation.log")
# 创建 logger
self._main_logger = logging.getLogger(f"simulation.{os.path.basename(self.simulation_dir)}")
self._main_logger.setLevel(logging.INFO)
self._main_logger.handlers.clear()
# 文件处理器
file_handler = logging.FileHandler(log_path, encoding='utf-8', mode='w')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
self._main_logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter(
'[%(asctime)s] %(message)s',
datefmt='%H:%M:%S'
))
self._main_logger.addHandler(console_handler)
self._main_logger.propagate = False
def get_twitter_logger(self) -> PlatformActionLogger:
"""获取 Twitter 平台日志记录器"""
if self.twitter_logger is None:
self.twitter_logger = PlatformActionLogger("twitter", self.simulation_dir)
return self.twitter_logger
def get_reddit_logger(self) -> PlatformActionLogger:
"""获取 Reddit 平台日志记录器"""
if self.reddit_logger is None:
self.reddit_logger = PlatformActionLogger("reddit", self.simulation_dir)
return self.reddit_logger
def log(self, message: str, level: str = "info"):
"""记录主日志"""
if self._main_logger:
getattr(self._main_logger, level.lower(), self._main_logger.info)(message)
def info(self, message: str):
self.log(message, "info")
def warning(self, message: str):
self.log(message, "warning")
def error(self, message: str):
self.log(message, "error")
def debug(self, message: str):
self.log(message, "debug")
# ============ 兼容旧接口 ============
class ActionLogger:
"""
动作日志记录器(兼容旧接口)
建议使用 SimulationLogManager 代替
"""
def __init__(self, log_path: str):
self.log_path = log_path
self._ensure_dir()
def _ensure_dir(self):
log_dir = os.path.dirname(self.log_path)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
def log_action(
self,
round_num: int,
platform: str,
agent_id: int,
agent_name: str,
action_type: str,
action_args: Optional[Dict[str, Any]] = None,
result: Optional[str] = None,
success: bool = True
):
entry = {
"round": round_num,
"timestamp": datetime.now().isoformat(),
"platform": platform,
"agent_id": agent_id,
"agent_name": agent_name,
"action_type": action_type,
"action_args": action_args or {},
"result": result,
"success": success,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_round_start(self, round_num: int, simulated_hour: int, platform: str):
entry = {
"round": round_num,
"timestamp": datetime.now().isoformat(),
"platform": platform,
"event_type": "round_start",
"simulated_hour": simulated_hour,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_round_end(self, round_num: int, actions_count: int, platform: str):
entry = {
"round": round_num,
"timestamp": datetime.now().isoformat(),
"platform": platform,
"event_type": "round_end",
"actions_count": actions_count,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_simulation_start(self, platform: str, config: Dict[str, Any]):
entry = {
"timestamp": datetime.now().isoformat(),
"platform": platform,
"event_type": "simulation_start",
"total_rounds": config.get("time_config", {}).get("total_simulation_hours", 72) * 2,
"agents_count": len(config.get("agent_configs", [])),
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def log_simulation_end(self, platform: str, total_rounds: int, total_actions: int):
entry = {
"timestamp": datetime.now().isoformat(),
"platform": platform,
"event_type": "simulation_end",
"total_rounds": total_rounds,
"total_actions": total_actions,
}
with open(self.log_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
# 全局日志实例(兼容旧接口)
_global_logger: Optional[ActionLogger] = None
def get_logger(log_path: Optional[str] = None) -> ActionLogger:
"""获取全局日志实例(兼容旧接口)"""
global _global_logger
if log_path:
_global_logger = ActionLogger(log_path)
if _global_logger is None:
_global_logger = ActionLogger("actions.jsonl")
return _global_logger