- Updated .env.example to include new keys for dual LLM configuration, allowing for both general and boost settings. - Modified create_model function to support an optional use_boost parameter, enabling the selection of either general or boost LLM configurations based on availability. - Improved logging to indicate which LLM configuration is being used during model creation, enhancing clarity for users.
757 lines
26 KiB
Python
757 lines
26 KiB
Python
"""
|
||
OASIS 双平台并行模拟预设脚本
|
||
同时运行Twitter和Reddit模拟,读取相同的配置文件
|
||
|
||
使用方式:
|
||
python run_parallel_simulation.py --config simulation_config.json
|
||
|
||
日志结构:
|
||
sim_xxx/
|
||
├── twitter/
|
||
│ └── actions.jsonl # Twitter 平台动作日志
|
||
├── reddit/
|
||
│ └── actions.jsonl # Reddit 平台动作日志
|
||
├── simulation.log # 主模拟进程日志
|
||
└── run_state.json # 运行状态(API 查询用)
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import random
|
||
import sqlite3
|
||
import sys
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
# 添加 backend 目录到路径
|
||
# 脚本固定位于 backend/scripts/ 目录
|
||
_scripts_dir = os.path.dirname(os.path.abspath(__file__))
|
||
_backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..'))
|
||
_project_root = os.path.abspath(os.path.join(_backend_dir, '..'))
|
||
sys.path.insert(0, _scripts_dir)
|
||
sys.path.insert(0, _backend_dir)
|
||
|
||
# 加载项目根目录的 .env 文件(包含 LLM_API_KEY 等配置)
|
||
from dotenv import load_dotenv
|
||
_env_file = os.path.join(_project_root, '.env')
|
||
if os.path.exists(_env_file):
|
||
load_dotenv(_env_file)
|
||
print(f"已加载环境配置: {_env_file}")
|
||
else:
|
||
# 尝试加载 backend/.env
|
||
_backend_env = os.path.join(_backend_dir, '.env')
|
||
if os.path.exists(_backend_env):
|
||
load_dotenv(_backend_env)
|
||
print(f"已加载环境配置: {_backend_env}")
|
||
|
||
|
||
class MaxTokensWarningFilter(logging.Filter):
|
||
"""过滤掉 camel-ai 关于 max_tokens 的警告(我们故意不设置 max_tokens,让模型自行决定)"""
|
||
|
||
def filter(self, record):
|
||
# 过滤掉包含 max_tokens 警告的日志
|
||
if "max_tokens" in record.getMessage() and "Invalid or missing" in record.getMessage():
|
||
return False
|
||
return True
|
||
|
||
|
||
# 在模块加载时立即添加过滤器,确保在 camel 代码执行前生效
|
||
logging.getLogger().addFilter(MaxTokensWarningFilter())
|
||
|
||
|
||
def disable_oasis_logging():
|
||
"""
|
||
禁用 OASIS 库的详细日志输出
|
||
OASIS 的日志太冗余(记录每个 agent 的观察和动作),我们使用自己的 action_logger
|
||
"""
|
||
# 禁用 OASIS 的所有日志器
|
||
oasis_loggers = [
|
||
"social.agent",
|
||
"social.twitter",
|
||
"social.rec",
|
||
"oasis.env",
|
||
"table",
|
||
]
|
||
|
||
for logger_name in oasis_loggers:
|
||
logger = logging.getLogger(logger_name)
|
||
logger.setLevel(logging.CRITICAL) # 只记录严重错误
|
||
logger.handlers.clear()
|
||
logger.propagate = False
|
||
|
||
|
||
def init_logging_for_simulation(simulation_dir: str):
|
||
"""
|
||
初始化模拟的日志配置
|
||
|
||
Args:
|
||
simulation_dir: 模拟目录路径
|
||
"""
|
||
# 禁用 OASIS 的详细日志
|
||
disable_oasis_logging()
|
||
|
||
# 清理旧的 log 目录(如果存在)
|
||
old_log_dir = os.path.join(simulation_dir, "log")
|
||
if os.path.exists(old_log_dir):
|
||
import shutil
|
||
shutil.rmtree(old_log_dir, ignore_errors=True)
|
||
|
||
|
||
from action_logger import SimulationLogManager, PlatformActionLogger
|
||
|
||
try:
|
||
from camel.models import ModelFactory
|
||
from camel.types import ModelPlatformType
|
||
import oasis
|
||
from oasis import (
|
||
ActionType,
|
||
LLMAction,
|
||
ManualAction,
|
||
generate_twitter_agent_graph,
|
||
generate_reddit_agent_graph
|
||
)
|
||
except ImportError as e:
|
||
print(f"错误: 缺少依赖 {e}")
|
||
print("请先安装: pip install oasis-ai camel-ai")
|
||
sys.exit(1)
|
||
|
||
|
||
# Twitter可用动作
|
||
TWITTER_ACTIONS = [
|
||
ActionType.CREATE_POST,
|
||
ActionType.LIKE_POST,
|
||
ActionType.REPOST,
|
||
ActionType.FOLLOW,
|
||
ActionType.DO_NOTHING,
|
||
ActionType.QUOTE_POST,
|
||
]
|
||
|
||
# Reddit可用动作
|
||
REDDIT_ACTIONS = [
|
||
ActionType.LIKE_POST,
|
||
ActionType.DISLIKE_POST,
|
||
ActionType.CREATE_POST,
|
||
ActionType.CREATE_COMMENT,
|
||
ActionType.LIKE_COMMENT,
|
||
ActionType.DISLIKE_COMMENT,
|
||
ActionType.SEARCH_POSTS,
|
||
ActionType.SEARCH_USER,
|
||
ActionType.TREND,
|
||
ActionType.REFRESH,
|
||
ActionType.DO_NOTHING,
|
||
ActionType.FOLLOW,
|
||
ActionType.MUTE,
|
||
]
|
||
|
||
|
||
def load_config(config_path: str) -> Dict[str, Any]:
|
||
"""加载配置文件"""
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
|
||
|
||
# 需要过滤掉的非核心动作类型(这些动作对分析价值较低)
|
||
FILTERED_ACTIONS = {'refresh', 'sign_up'}
|
||
|
||
# 动作类型映射表(数据库中的名称 -> 标准名称)
|
||
ACTION_TYPE_MAP = {
|
||
'create_post': 'CREATE_POST',
|
||
'like_post': 'LIKE_POST',
|
||
'dislike_post': 'DISLIKE_POST',
|
||
'repost': 'REPOST',
|
||
'quote_post': 'QUOTE_POST',
|
||
'follow': 'FOLLOW',
|
||
'mute': 'MUTE',
|
||
'create_comment': 'CREATE_COMMENT',
|
||
'like_comment': 'LIKE_COMMENT',
|
||
'dislike_comment': 'DISLIKE_COMMENT',
|
||
'search_posts': 'SEARCH_POSTS',
|
||
'search_user': 'SEARCH_USER',
|
||
'trend': 'TREND',
|
||
'do_nothing': 'DO_NOTHING',
|
||
'interview': 'INTERVIEW',
|
||
}
|
||
|
||
|
||
def fetch_new_actions_from_db(
|
||
db_path: str,
|
||
last_rowid: int,
|
||
agent_names: Dict[int, str]
|
||
) -> Tuple[List[Dict[str, Any]], int]:
|
||
"""
|
||
从数据库中获取新的动作记录
|
||
|
||
Args:
|
||
db_path: 数据库文件路径
|
||
last_rowid: 上次读取的最大 rowid 值(使用 rowid 而不是 created_at,因为不同平台的 created_at 格式不同)
|
||
agent_names: agent_id -> agent_name 映射
|
||
|
||
Returns:
|
||
(actions_list, new_last_rowid)
|
||
- actions_list: 动作列表,每个元素包含 agent_id, agent_name, action_type, action_args
|
||
- new_last_rowid: 新的最大 rowid 值
|
||
"""
|
||
actions = []
|
||
new_last_rowid = last_rowid
|
||
|
||
if not os.path.exists(db_path):
|
||
return actions, new_last_rowid
|
||
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 使用 rowid 来追踪已处理的记录(rowid 是 SQLite 的内置自增字段)
|
||
# 这样可以避免 created_at 格式差异问题(Twitter 用整数,Reddit 用日期时间字符串)
|
||
cursor.execute("""
|
||
SELECT rowid, user_id, action, info
|
||
FROM trace
|
||
WHERE rowid > ?
|
||
ORDER BY rowid ASC
|
||
""", (last_rowid,))
|
||
|
||
for rowid, user_id, action, info_json in cursor.fetchall():
|
||
# 更新最大 rowid
|
||
new_last_rowid = rowid
|
||
|
||
# 过滤非核心动作
|
||
if action in FILTERED_ACTIONS:
|
||
continue
|
||
|
||
# 解析动作参数
|
||
try:
|
||
action_args = json.loads(info_json) if info_json else {}
|
||
except json.JSONDecodeError:
|
||
action_args = {}
|
||
|
||
# 精简 action_args,只保留关键字段
|
||
simplified_args = {}
|
||
if 'content' in action_args:
|
||
content = action_args['content']
|
||
# 截断过长的内容
|
||
simplified_args['content'] = content[:200] + '...' if len(content) > 200 else content
|
||
if 'post_id' in action_args:
|
||
simplified_args['post_id'] = action_args['post_id']
|
||
if 'comment_id' in action_args:
|
||
simplified_args['comment_id'] = action_args['comment_id']
|
||
if 'quoted_id' in action_args:
|
||
simplified_args['quoted_id'] = action_args['quoted_id']
|
||
if 'new_post_id' in action_args:
|
||
simplified_args['new_post_id'] = action_args['new_post_id']
|
||
if 'follow_id' in action_args:
|
||
simplified_args['follow_id'] = action_args['follow_id']
|
||
if 'query' in action_args:
|
||
simplified_args['query'] = action_args['query']
|
||
if 'like_id' in action_args:
|
||
simplified_args['like_id'] = action_args['like_id']
|
||
if 'dislike_id' in action_args:
|
||
simplified_args['dislike_id'] = action_args['dislike_id']
|
||
|
||
# 转换动作类型名称
|
||
action_type = ACTION_TYPE_MAP.get(action, action.upper())
|
||
|
||
actions.append({
|
||
'agent_id': user_id,
|
||
'agent_name': agent_names.get(user_id, f'Agent_{user_id}'),
|
||
'action_type': action_type,
|
||
'action_args': simplified_args,
|
||
})
|
||
|
||
conn.close()
|
||
except Exception as e:
|
||
print(f"读取数据库动作失败: {e}")
|
||
|
||
return actions, new_last_rowid
|
||
|
||
|
||
def create_model(config: Dict[str, Any], use_boost: bool = False):
|
||
"""
|
||
创建LLM模型
|
||
|
||
支持双 LLM 配置,用于并行模拟时提速:
|
||
- 通用配置:LLM_API_KEY, LLM_BASE_URL, LLM_MODEL_NAME
|
||
- 加速配置(可选):LLM_BOOST_API_KEY, LLM_BOOST_BASE_URL, LLM_BOOST_MODEL_NAME
|
||
|
||
如果配置了加速 LLM,并行模拟时可以让不同平台使用不同的 API 服务商,提高并发能力。
|
||
|
||
Args:
|
||
config: 模拟配置字典
|
||
use_boost: 是否使用加速 LLM 配置(如果可用)
|
||
"""
|
||
# 检查是否有加速配置
|
||
boost_api_key = os.environ.get("LLM_BOOST_API_KEY", "")
|
||
boost_base_url = os.environ.get("LLM_BOOST_BASE_URL", "")
|
||
boost_model = os.environ.get("LLM_BOOST_MODEL_NAME", "")
|
||
has_boost_config = bool(boost_api_key)
|
||
|
||
# 根据参数和配置情况选择使用哪个 LLM
|
||
if use_boost and has_boost_config:
|
||
# 使用加速配置
|
||
llm_api_key = boost_api_key
|
||
llm_base_url = boost_base_url
|
||
llm_model = boost_model or os.environ.get("LLM_MODEL_NAME", "")
|
||
config_label = "[加速LLM]"
|
||
else:
|
||
# 使用通用配置
|
||
llm_api_key = os.environ.get("LLM_API_KEY", "")
|
||
llm_base_url = os.environ.get("LLM_BASE_URL", "")
|
||
llm_model = os.environ.get("LLM_MODEL_NAME", "")
|
||
config_label = "[通用LLM]"
|
||
|
||
# 如果 .env 中没有模型名,则使用 config 作为备用
|
||
if not llm_model:
|
||
llm_model = config.get("llm_model", "gpt-4o-mini")
|
||
|
||
# 设置 camel-ai 所需的环境变量
|
||
if llm_api_key:
|
||
os.environ["OPENAI_API_KEY"] = llm_api_key
|
||
|
||
if not os.environ.get("OPENAI_API_KEY"):
|
||
raise ValueError("缺少 API Key 配置,请在项目根目录 .env 文件中设置 LLM_API_KEY")
|
||
|
||
if llm_base_url:
|
||
os.environ["OPENAI_API_BASE_URL"] = llm_base_url
|
||
|
||
print(f"{config_label} model={llm_model}, base_url={llm_base_url[:40] if llm_base_url else '默认'}...")
|
||
|
||
return ModelFactory.create(
|
||
model_platform=ModelPlatformType.OPENAI,
|
||
model_type=llm_model,
|
||
)
|
||
|
||
|
||
def get_active_agents_for_round(
|
||
env,
|
||
config: Dict[str, Any],
|
||
current_hour: int,
|
||
round_num: int
|
||
) -> List:
|
||
"""根据时间和配置决定本轮激活哪些Agent"""
|
||
time_config = config.get("time_config", {})
|
||
agent_configs = config.get("agent_configs", [])
|
||
|
||
base_min = time_config.get("agents_per_hour_min", 5)
|
||
base_max = time_config.get("agents_per_hour_max", 20)
|
||
|
||
peak_hours = time_config.get("peak_hours", [9, 10, 11, 14, 15, 20, 21, 22])
|
||
off_peak_hours = time_config.get("off_peak_hours", [0, 1, 2, 3, 4, 5])
|
||
|
||
if current_hour in peak_hours:
|
||
multiplier = time_config.get("peak_activity_multiplier", 1.5)
|
||
elif current_hour in off_peak_hours:
|
||
multiplier = time_config.get("off_peak_activity_multiplier", 0.3)
|
||
else:
|
||
multiplier = 1.0
|
||
|
||
target_count = int(random.uniform(base_min, base_max) * multiplier)
|
||
|
||
candidates = []
|
||
for cfg in agent_configs:
|
||
agent_id = cfg.get("agent_id", 0)
|
||
active_hours = cfg.get("active_hours", list(range(8, 23)))
|
||
activity_level = cfg.get("activity_level", 0.5)
|
||
|
||
if current_hour not in active_hours:
|
||
continue
|
||
|
||
if random.random() < activity_level:
|
||
candidates.append(agent_id)
|
||
|
||
selected_ids = random.sample(
|
||
candidates,
|
||
min(target_count, len(candidates))
|
||
) if candidates else []
|
||
|
||
active_agents = []
|
||
for agent_id in selected_ids:
|
||
try:
|
||
agent = env.agent_graph.get_agent(agent_id)
|
||
active_agents.append((agent_id, agent))
|
||
except Exception:
|
||
pass
|
||
|
||
return active_agents
|
||
|
||
|
||
async def run_twitter_simulation(
|
||
config: Dict[str, Any],
|
||
simulation_dir: str,
|
||
action_logger: Optional[PlatformActionLogger] = None,
|
||
main_logger: Optional[SimulationLogManager] = None
|
||
):
|
||
"""运行Twitter模拟"""
|
||
def log_info(msg):
|
||
if main_logger:
|
||
main_logger.info(f"[Twitter] {msg}")
|
||
print(f"[Twitter] {msg}")
|
||
|
||
log_info("初始化...")
|
||
|
||
# Twitter 使用通用 LLM 配置
|
||
model = create_model(config, use_boost=False)
|
||
|
||
# OASIS Twitter使用CSV格式
|
||
profile_path = os.path.join(simulation_dir, "twitter_profiles.csv")
|
||
if not os.path.exists(profile_path):
|
||
log_info(f"错误: Profile文件不存在: {profile_path}")
|
||
return
|
||
|
||
agent_graph = await generate_twitter_agent_graph(
|
||
profile_path=profile_path,
|
||
model=model,
|
||
available_actions=TWITTER_ACTIONS,
|
||
)
|
||
|
||
# 获取Agent名称映射
|
||
agent_names = {}
|
||
for agent_id, agent in agent_graph.get_agents():
|
||
agent_names[agent_id] = getattr(agent, 'name', f'Agent_{agent_id}')
|
||
|
||
db_path = os.path.join(simulation_dir, "twitter_simulation.db")
|
||
if os.path.exists(db_path):
|
||
os.remove(db_path)
|
||
|
||
env = oasis.make(
|
||
agent_graph=agent_graph,
|
||
platform=oasis.DefaultPlatformType.TWITTER,
|
||
database_path=db_path,
|
||
semaphore=30, # 限制最大并发 LLM 请求数,防止 API 过载
|
||
)
|
||
|
||
await env.reset()
|
||
log_info("环境已启动")
|
||
|
||
if action_logger:
|
||
action_logger.log_simulation_start(config)
|
||
|
||
total_actions = 0
|
||
last_rowid = 0 # 跟踪数据库中最后处理的行号(使用 rowid 避免 created_at 格式差异)
|
||
|
||
# 执行初始事件
|
||
event_config = config.get("event_config", {})
|
||
initial_posts = event_config.get("initial_posts", [])
|
||
|
||
if initial_posts:
|
||
initial_actions = {}
|
||
for post in initial_posts:
|
||
agent_id = post.get("poster_agent_id", 0)
|
||
content = post.get("content", "")
|
||
try:
|
||
agent = env.agent_graph.get_agent(agent_id)
|
||
initial_actions[agent] = ManualAction(
|
||
action_type=ActionType.CREATE_POST,
|
||
action_args={"content": content}
|
||
)
|
||
|
||
if action_logger:
|
||
action_logger.log_action(
|
||
round_num=0,
|
||
agent_id=agent_id,
|
||
agent_name=agent_names.get(agent_id, f"Agent_{agent_id}"),
|
||
action_type="CREATE_POST",
|
||
action_args={"content": content[:100] + "..." if len(content) > 100 else content}
|
||
)
|
||
total_actions += 1
|
||
except Exception:
|
||
pass
|
||
|
||
if initial_actions:
|
||
await env.step(initial_actions)
|
||
log_info(f"已发布 {len(initial_actions)} 条初始帖子")
|
||
|
||
# 主模拟循环
|
||
time_config = config.get("time_config", {})
|
||
total_hours = time_config.get("total_simulation_hours", 72)
|
||
minutes_per_round = time_config.get("minutes_per_round", 30)
|
||
total_rounds = (total_hours * 60) // minutes_per_round
|
||
|
||
start_time = datetime.now()
|
||
|
||
for round_num in range(total_rounds):
|
||
simulated_minutes = round_num * minutes_per_round
|
||
simulated_hour = (simulated_minutes // 60) % 24
|
||
simulated_day = simulated_minutes // (60 * 24) + 1
|
||
|
||
active_agents = get_active_agents_for_round(
|
||
env, config, simulated_hour, round_num
|
||
)
|
||
|
||
if not active_agents:
|
||
continue
|
||
|
||
if action_logger:
|
||
action_logger.log_round_start(round_num + 1, simulated_hour)
|
||
|
||
actions = {agent: LLMAction() for _, agent in active_agents}
|
||
await env.step(actions)
|
||
|
||
# 从数据库获取实际执行的动作并记录
|
||
actual_actions, last_rowid = fetch_new_actions_from_db(
|
||
db_path, last_rowid, agent_names
|
||
)
|
||
|
||
round_action_count = 0
|
||
for action_data in actual_actions:
|
||
if action_logger:
|
||
action_logger.log_action(
|
||
round_num=round_num + 1,
|
||
agent_id=action_data['agent_id'],
|
||
agent_name=action_data['agent_name'],
|
||
action_type=action_data['action_type'],
|
||
action_args=action_data['action_args']
|
||
)
|
||
total_actions += 1
|
||
round_action_count += 1
|
||
|
||
if action_logger:
|
||
action_logger.log_round_end(round_num + 1, round_action_count)
|
||
|
||
if (round_num + 1) % 20 == 0:
|
||
progress = (round_num + 1) / total_rounds * 100
|
||
log_info(f"Day {simulated_day}, {simulated_hour:02d}:00 - Round {round_num + 1}/{total_rounds} ({progress:.1f}%)")
|
||
|
||
await env.close()
|
||
|
||
if action_logger:
|
||
action_logger.log_simulation_end(total_rounds, total_actions)
|
||
|
||
elapsed = (datetime.now() - start_time).total_seconds()
|
||
log_info(f"模拟完成! 耗时: {elapsed:.1f}秒, 总动作: {total_actions}")
|
||
|
||
|
||
async def run_reddit_simulation(
|
||
config: Dict[str, Any],
|
||
simulation_dir: str,
|
||
action_logger: Optional[PlatformActionLogger] = None,
|
||
main_logger: Optional[SimulationLogManager] = None
|
||
):
|
||
"""运行Reddit模拟"""
|
||
def log_info(msg):
|
||
if main_logger:
|
||
main_logger.info(f"[Reddit] {msg}")
|
||
print(f"[Reddit] {msg}")
|
||
|
||
log_info("初始化...")
|
||
|
||
# Reddit 使用加速 LLM 配置(如果有的话,否则回退到通用配置)
|
||
model = create_model(config, use_boost=True)
|
||
|
||
profile_path = os.path.join(simulation_dir, "reddit_profiles.json")
|
||
if not os.path.exists(profile_path):
|
||
log_info(f"错误: Profile文件不存在: {profile_path}")
|
||
return
|
||
|
||
agent_graph = await generate_reddit_agent_graph(
|
||
profile_path=profile_path,
|
||
model=model,
|
||
available_actions=REDDIT_ACTIONS,
|
||
)
|
||
|
||
# 获取Agent名称映射
|
||
agent_names = {}
|
||
for agent_id, agent in agent_graph.get_agents():
|
||
agent_names[agent_id] = getattr(agent, 'name', f'Agent_{agent_id}')
|
||
|
||
db_path = os.path.join(simulation_dir, "reddit_simulation.db")
|
||
if os.path.exists(db_path):
|
||
os.remove(db_path)
|
||
|
||
env = oasis.make(
|
||
agent_graph=agent_graph,
|
||
platform=oasis.DefaultPlatformType.REDDIT,
|
||
database_path=db_path,
|
||
semaphore=30, # 限制最大并发 LLM 请求数,防止 API 过载
|
||
)
|
||
|
||
await env.reset()
|
||
log_info("环境已启动")
|
||
|
||
if action_logger:
|
||
action_logger.log_simulation_start(config)
|
||
|
||
total_actions = 0
|
||
last_rowid = 0 # 跟踪数据库中最后处理的行号(使用 rowid 避免 created_at 格式差异)
|
||
|
||
# 执行初始事件
|
||
event_config = config.get("event_config", {})
|
||
initial_posts = event_config.get("initial_posts", [])
|
||
|
||
if initial_posts:
|
||
initial_actions = {}
|
||
for post in initial_posts:
|
||
agent_id = post.get("poster_agent_id", 0)
|
||
content = post.get("content", "")
|
||
try:
|
||
agent = env.agent_graph.get_agent(agent_id)
|
||
if agent in initial_actions:
|
||
if not isinstance(initial_actions[agent], list):
|
||
initial_actions[agent] = [initial_actions[agent]]
|
||
initial_actions[agent].append(ManualAction(
|
||
action_type=ActionType.CREATE_POST,
|
||
action_args={"content": content}
|
||
))
|
||
else:
|
||
initial_actions[agent] = ManualAction(
|
||
action_type=ActionType.CREATE_POST,
|
||
action_args={"content": content}
|
||
)
|
||
|
||
if action_logger:
|
||
action_logger.log_action(
|
||
round_num=0,
|
||
agent_id=agent_id,
|
||
agent_name=agent_names.get(agent_id, f"Agent_{agent_id}"),
|
||
action_type="CREATE_POST",
|
||
action_args={"content": content[:100] + "..." if len(content) > 100 else content}
|
||
)
|
||
total_actions += 1
|
||
except Exception:
|
||
pass
|
||
|
||
if initial_actions:
|
||
await env.step(initial_actions)
|
||
log_info(f"已发布 {len(initial_actions)} 条初始帖子")
|
||
|
||
# 主模拟循环
|
||
time_config = config.get("time_config", {})
|
||
total_hours = time_config.get("total_simulation_hours", 72)
|
||
minutes_per_round = time_config.get("minutes_per_round", 30)
|
||
total_rounds = (total_hours * 60) // minutes_per_round
|
||
|
||
start_time = datetime.now()
|
||
|
||
for round_num in range(total_rounds):
|
||
simulated_minutes = round_num * minutes_per_round
|
||
simulated_hour = (simulated_minutes // 60) % 24
|
||
simulated_day = simulated_minutes // (60 * 24) + 1
|
||
|
||
active_agents = get_active_agents_for_round(
|
||
env, config, simulated_hour, round_num
|
||
)
|
||
|
||
if not active_agents:
|
||
continue
|
||
|
||
if action_logger:
|
||
action_logger.log_round_start(round_num + 1, simulated_hour)
|
||
|
||
actions = {agent: LLMAction() for _, agent in active_agents}
|
||
await env.step(actions)
|
||
|
||
# 从数据库获取实际执行的动作并记录
|
||
actual_actions, last_rowid = fetch_new_actions_from_db(
|
||
db_path, last_rowid, agent_names
|
||
)
|
||
|
||
round_action_count = 0
|
||
for action_data in actual_actions:
|
||
if action_logger:
|
||
action_logger.log_action(
|
||
round_num=round_num + 1,
|
||
agent_id=action_data['agent_id'],
|
||
agent_name=action_data['agent_name'],
|
||
action_type=action_data['action_type'],
|
||
action_args=action_data['action_args']
|
||
)
|
||
total_actions += 1
|
||
round_action_count += 1
|
||
|
||
if action_logger:
|
||
action_logger.log_round_end(round_num + 1, round_action_count)
|
||
|
||
if (round_num + 1) % 20 == 0:
|
||
progress = (round_num + 1) / total_rounds * 100
|
||
log_info(f"Day {simulated_day}, {simulated_hour:02d}:00 - Round {round_num + 1}/{total_rounds} ({progress:.1f}%)")
|
||
|
||
await env.close()
|
||
|
||
if action_logger:
|
||
action_logger.log_simulation_end(total_rounds, total_actions)
|
||
|
||
elapsed = (datetime.now() - start_time).total_seconds()
|
||
log_info(f"模拟完成! 耗时: {elapsed:.1f}秒, 总动作: {total_actions}")
|
||
|
||
|
||
async def main():
|
||
parser = argparse.ArgumentParser(description='OASIS双平台并行模拟')
|
||
parser.add_argument(
|
||
'--config',
|
||
type=str,
|
||
required=True,
|
||
help='配置文件路径 (simulation_config.json)'
|
||
)
|
||
parser.add_argument(
|
||
'--twitter-only',
|
||
action='store_true',
|
||
help='只运行Twitter模拟'
|
||
)
|
||
parser.add_argument(
|
||
'--reddit-only',
|
||
action='store_true',
|
||
help='只运行Reddit模拟'
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if not os.path.exists(args.config):
|
||
print(f"错误: 配置文件不存在: {args.config}")
|
||
sys.exit(1)
|
||
|
||
config = load_config(args.config)
|
||
simulation_dir = os.path.dirname(args.config) or "."
|
||
|
||
# 初始化日志配置(禁用 OASIS 日志,清理旧文件)
|
||
init_logging_for_simulation(simulation_dir)
|
||
|
||
# 创建日志管理器
|
||
log_manager = SimulationLogManager(simulation_dir)
|
||
twitter_logger = log_manager.get_twitter_logger()
|
||
reddit_logger = log_manager.get_reddit_logger()
|
||
|
||
log_manager.info("=" * 60)
|
||
log_manager.info("OASIS 双平台并行模拟")
|
||
log_manager.info(f"配置文件: {args.config}")
|
||
log_manager.info(f"模拟ID: {config.get('simulation_id', 'unknown')}")
|
||
log_manager.info("=" * 60)
|
||
|
||
time_config = config.get("time_config", {})
|
||
log_manager.info(f"模拟参数:")
|
||
log_manager.info(f" - 总模拟时长: {time_config.get('total_simulation_hours', 72)}小时")
|
||
log_manager.info(f" - 每轮时间: {time_config.get('minutes_per_round', 30)}分钟")
|
||
log_manager.info(f" - Agent数量: {len(config.get('agent_configs', []))}")
|
||
|
||
log_manager.info("日志结构:")
|
||
log_manager.info(f" - 主日志: simulation.log")
|
||
log_manager.info(f" - Twitter动作: twitter/actions.jsonl")
|
||
log_manager.info(f" - Reddit动作: reddit/actions.jsonl")
|
||
log_manager.info("=" * 60)
|
||
|
||
start_time = datetime.now()
|
||
|
||
if args.twitter_only:
|
||
await run_twitter_simulation(config, simulation_dir, twitter_logger, log_manager)
|
||
elif args.reddit_only:
|
||
await run_reddit_simulation(config, simulation_dir, reddit_logger, log_manager)
|
||
else:
|
||
# 并行运行(每个平台使用独立的日志记录器)
|
||
await asyncio.gather(
|
||
run_twitter_simulation(config, simulation_dir, twitter_logger, log_manager),
|
||
run_reddit_simulation(config, simulation_dir, reddit_logger, log_manager),
|
||
)
|
||
|
||
total_elapsed = (datetime.now() - start_time).total_seconds()
|
||
log_manager.info("=" * 60)
|
||
log_manager.info(f"全部模拟完成! 总耗时: {total_elapsed:.1f}秒")
|
||
log_manager.info(f"日志文件:")
|
||
log_manager.info(f" - {os.path.join(simulation_dir, 'simulation.log')}")
|
||
log_manager.info(f" - {os.path.join(simulation_dir, 'twitter', 'actions.jsonl')}")
|
||
log_manager.info(f" - {os.path.join(simulation_dir, 'reddit', 'actions.jsonl')}")
|
||
log_manager.info("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|