diff --git a/backend/app/api/simulation.py b/backend/app/api/simulation.py index c7ef7f0..3e0174b 100644 --- a/backend/app/api/simulation.py +++ b/backend/app/api/simulation.py @@ -1274,21 +1274,28 @@ def generate_profiles(): def start_simulation(): """ 开始运行模拟 - + 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "platform": "parallel", // 可选: twitter / reddit / parallel (默认) "max_rounds": 100, // 可选: 最大模拟轮数,用于截断过长的模拟 - "enable_graph_memory_update": false // 可选: 是否将Agent活动动态更新到Zep图谱记忆 + "enable_graph_memory_update": false, // 可选: 是否将Agent活动动态更新到Zep图谱记忆 + "force": false // 可选: 强制重新开始(会停止运行中的模拟并清理日志) } - + + 关于 force 参数: + - 启用后,如果模拟正在运行或已完成,会先停止并清理运行日志 + - 清理的内容包括:run_state.json, actions.jsonl, simulation.log 等 + - 不会清理配置文件(simulation_config.json)和 profile 文件 + - 适用于需要重新运行模拟的场景 + 关于 enable_graph_memory_update: - 启用后,模拟中所有Agent的活动(发帖、评论、点赞等)都会实时更新到Zep图谱 - 这可以让图谱"记住"模拟过程,用于后续分析或AI对话 - 需要模拟关联的项目有有效的 graph_id - 采用批量更新机制,减少API调用次数 - + 返回: { "success": true, @@ -1299,24 +1306,26 @@ def start_simulation(): "twitter_running": true, "reddit_running": true, "started_at": "2025-12-01T10:00:00", - "graph_memory_update_enabled": true // 是否启用了图谱记忆更新 + "graph_memory_update_enabled": true, // 是否启用了图谱记忆更新 + "force_restarted": true // 是否是强制重新开始 } } """ try: data = request.get_json() or {} - + simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 - + platform = data.get('platform', 'parallel') max_rounds = data.get('max_rounds') # 可选:最大模拟轮数 enable_graph_memory_update = data.get('enable_graph_memory_update', False) # 可选:是否启用图谱记忆更新 - + force = data.get('force', False) # 可选:强制重新开始 + # 验证 max_rounds 参数 if max_rounds is not None: try: @@ -1331,28 +1340,30 @@ def start_simulation(): "success": False, "error": "max_rounds 必须是有效的整数" }), 400 - + if platform not in ['twitter', 'reddit', 'parallel']: return jsonify({ "success": False, "error": f"无效的平台类型: {platform},可选: twitter/reddit/parallel" }), 400 - + # 检查模拟是否已准备好 manager = SimulationManager() state = manager.get_simulation(simulation_id) - + if not state: return jsonify({ "success": False, "error": f"模拟不存在: {simulation_id}" }), 404 + + force_restarted = False # 智能处理状态:如果准备工作已完成,允许重新启动 if state.status != SimulationStatus.READY: # 检查准备工作是否已完成 is_prepared, prepare_info = _check_simulation_prepared(simulation_id) - + if is_prepared: # 准备工作已完成,检查是否有正在运行的进程 if state.status == SimulationStatus.RUNNING: @@ -1360,11 +1371,27 @@ def start_simulation(): run_state = SimulationRunner.get_run_state(simulation_id) if run_state and run_state.runner_status.value == "running": # 进程确实在运行 - return jsonify({ - "success": False, - "error": f"模拟正在运行中,请先调用 /stop 接口停止" - }), 400 - + if force: + # 强制模式:停止运行中的模拟 + logger.info(f"强制模式:停止运行中的模拟 {simulation_id}") + try: + SimulationRunner.stop_simulation(simulation_id) + except Exception as e: + logger.warning(f"停止模拟时出现警告: {str(e)}") + else: + return jsonify({ + "success": False, + "error": f"模拟正在运行中,请先调用 /stop 接口停止,或使用 force=true 强制重新开始" + }), 400 + + # 如果是强制模式,清理运行日志 + if force: + logger.info(f"强制模式:清理模拟日志 {simulation_id}") + cleanup_result = SimulationRunner.cleanup_simulation_logs(simulation_id) + if not cleanup_result.get("success"): + logger.warning(f"清理日志时出现警告: {cleanup_result.get('errors')}") + force_restarted = True + # 进程不存在或已结束,重置状态为 ready logger.info(f"模拟 {simulation_id} 准备工作已完成,重置状态为 ready(原状态: {state.status.value})") state.status = SimulationStatus.READY @@ -1412,6 +1439,7 @@ def start_simulation(): if max_rounds: response_data['max_rounds_applied'] = max_rounds response_data['graph_memory_update_enabled'] = enable_graph_memory_update + response_data['force_restarted'] = force_restarted if enable_graph_memory_update: response_data['graph_id'] = graph_id @@ -1557,10 +1585,13 @@ def get_run_status(simulation_id: str): @simulation_bp.route('//run-status/detail', methods=['GET']) def get_run_status_detail(simulation_id: str): """ - 获取模拟运行详细状态(包含最近动作) + 获取模拟运行详细状态(包含所有动作) 用于前端展示实时动态 + Query参数: + platform: 过滤平台(twitter/reddit,可选) + 返回: { "success": true, @@ -1569,7 +1600,7 @@ def get_run_status_detail(simulation_id: str): "runner_status": "running", "current_round": 5, ... - "recent_actions": [ + "all_actions": [ { "round_num": 5, "timestamp": "2025-12-01T10:30:00", @@ -1582,12 +1613,15 @@ def get_run_status_detail(simulation_id: str): "success": true }, ... - ] + ], + "twitter_actions": [...], # Twitter 平台的所有动作 + "reddit_actions": [...] # Reddit 平台的所有动作 } } """ try: run_state = SimulationRunner.get_run_state(simulation_id) + platform_filter = request.args.get('platform') if not run_state: return jsonify({ @@ -1595,13 +1629,49 @@ def get_run_status_detail(simulation_id: str): "data": { "simulation_id": simulation_id, "runner_status": "idle", - "recent_actions": [] + "all_actions": [], + "twitter_actions": [], + "reddit_actions": [] } }) + # 获取完整的动作列表 + all_actions = SimulationRunner.get_all_actions( + simulation_id=simulation_id, + platform=platform_filter + ) + + # 分平台获取动作 + twitter_actions = SimulationRunner.get_all_actions( + simulation_id=simulation_id, + platform="twitter" + ) if not platform_filter or platform_filter == "twitter" else [] + + reddit_actions = SimulationRunner.get_all_actions( + simulation_id=simulation_id, + platform="reddit" + ) if not platform_filter or platform_filter == "reddit" else [] + + # 获取当前轮次的动作(recent_actions 只展示最新一轮) + current_round = run_state.current_round + recent_actions = SimulationRunner.get_all_actions( + simulation_id=simulation_id, + platform=platform_filter, + round_num=current_round + ) if current_round > 0 else [] + + # 获取基础状态信息 + result = run_state.to_dict() + result["all_actions"] = [a.to_dict() for a in all_actions] + result["twitter_actions"] = [a.to_dict() for a in twitter_actions] + result["reddit_actions"] = [a.to_dict() for a in reddit_actions] + result["rounds_count"] = len(run_state.rounds) + # recent_actions 只展示当前最新一轮两个平台的内容 + result["recent_actions"] = [a.to_dict() for a in recent_actions] + return jsonify({ "success": True, - "data": run_state.to_detail_dict() + "data": result }) except Exception as e: diff --git a/backend/app/services/oasis_profile_generator.py b/backend/app/services/oasis_profile_generator.py index 662ffdc..ac3b7e3 100644 --- a/backend/app/services/oasis_profile_generator.py +++ b/backend/app/services/oasis_profile_generator.py @@ -61,7 +61,7 @@ class OasisAgentProfile: """转换为Reddit平台格式""" profile = { "user_id": self.user_id, - "user_name": self.user_name, + "username": self.user_name, # OASIS 库要求字段名为 username(无下划线) "name": self.name, "bio": self.bio, "persona": self.persona, @@ -89,7 +89,7 @@ class OasisAgentProfile: """转换为Twitter平台格式""" profile = { "user_id": self.user_id, - "user_name": self.user_name, + "username": self.user_name, # OASIS 库要求字段名为 username(无下划线) "name": self.name, "bio": self.bio, "persona": self.persona, diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index 56614a5..2985a50 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -670,6 +670,136 @@ class SimulationRunner: logger.info(f"模拟已停止: {simulation_id}") return state + @classmethod + def _read_actions_from_file( + cls, + file_path: str, + default_platform: Optional[str] = None, + platform_filter: Optional[str] = None, + agent_id: Optional[int] = None, + round_num: Optional[int] = None + ) -> List[AgentAction]: + """ + 从单个动作文件中读取动作 + + Args: + file_path: 动作日志文件路径 + default_platform: 默认平台(当动作记录中没有 platform 字段时使用) + platform_filter: 过滤平台 + agent_id: 过滤 Agent ID + round_num: 过滤轮次 + """ + if not os.path.exists(file_path): + return [] + + actions = [] + + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if not line: + continue + + try: + data = json.loads(line) + + # 跳过非动作记录(如 simulation_start, round_start, round_end 等事件) + if "event_type" in data: + continue + + # 跳过没有 agent_id 的记录(非 Agent 动作) + if "agent_id" not in data: + continue + + # 获取平台:优先使用记录中的 platform,否则使用默认平台 + record_platform = data.get("platform") or default_platform or "" + + # 过滤 + if platform_filter and record_platform != platform_filter: + continue + if agent_id is not None and data.get("agent_id") != agent_id: + continue + if round_num is not None and data.get("round") != round_num: + continue + + actions.append(AgentAction( + round_num=data.get("round", 0), + timestamp=data.get("timestamp", ""), + platform=record_platform, + agent_id=data.get("agent_id", 0), + agent_name=data.get("agent_name", ""), + action_type=data.get("action_type", ""), + action_args=data.get("action_args", {}), + result=data.get("result"), + success=data.get("success", True), + )) + + except json.JSONDecodeError: + continue + + return actions + + @classmethod + def get_all_actions( + cls, + simulation_id: str, + platform: Optional[str] = None, + agent_id: Optional[int] = None, + round_num: Optional[int] = None + ) -> List[AgentAction]: + """ + 获取所有平台的完整动作历史(无分页限制) + + Args: + simulation_id: 模拟ID + platform: 过滤平台(twitter/reddit) + agent_id: 过滤Agent + round_num: 过滤轮次 + + Returns: + 完整的动作列表(按时间戳排序,新的在前) + """ + sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) + actions = [] + + # 读取 Twitter 动作文件(根据文件路径自动设置 platform 为 twitter) + twitter_actions_log = os.path.join(sim_dir, "twitter", "actions.jsonl") + if not platform or platform == "twitter": + actions.extend(cls._read_actions_from_file( + twitter_actions_log, + default_platform="twitter", # 自动填充 platform 字段 + platform_filter=platform, + agent_id=agent_id, + round_num=round_num + )) + + # 读取 Reddit 动作文件(根据文件路径自动设置 platform 为 reddit) + reddit_actions_log = os.path.join(sim_dir, "reddit", "actions.jsonl") + if not platform or platform == "reddit": + actions.extend(cls._read_actions_from_file( + reddit_actions_log, + default_platform="reddit", # 自动填充 platform 字段 + platform_filter=platform, + agent_id=agent_id, + round_num=round_num + )) + + # 如果分平台文件不存在,尝试读取旧的单一文件格式 + if not actions: + actions_log = os.path.join(sim_dir, "actions.jsonl") + actions = cls._read_actions_from_file( + actions_log, + default_platform=None, # 旧格式文件中应该有 platform 字段 + platform_filter=platform, + agent_id=agent_id, + round_num=round_num + ) + + # 按时间戳排序(新的在前) + actions.sort(key=lambda x: x.timestamp, reverse=True) + + return actions + @classmethod def get_actions( cls, @@ -681,7 +811,7 @@ class SimulationRunner: round_num: Optional[int] = None ) -> List[AgentAction]: """ - 获取动作历史 + 获取动作历史(带分页) Args: simulation_id: 模拟ID @@ -694,48 +824,12 @@ class SimulationRunner: Returns: 动作列表 """ - sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) - actions_log = os.path.join(sim_dir, "actions.jsonl") - - if not os.path.exists(actions_log): - return [] - - actions = [] - - with open(actions_log, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if not line: - continue - - try: - data = json.loads(line) - - # 过滤 - if platform and data.get("platform") != platform: - continue - if agent_id is not None and data.get("agent_id") != agent_id: - continue - if round_num is not None and data.get("round") != round_num: - continue - - actions.append(AgentAction( - round_num=data.get("round", 0), - timestamp=data.get("timestamp", ""), - platform=data.get("platform", ""), - agent_id=data.get("agent_id", 0), - agent_name=data.get("agent_name", ""), - action_type=data.get("action_type", ""), - action_args=data.get("action_args", {}), - result=data.get("result"), - success=data.get("success", True), - )) - - except json.JSONDecodeError: - continue - - # 按时间倒序排列 - actions.reverse() + actions = cls.get_all_actions( + simulation_id=simulation_id, + platform=platform, + agent_id=agent_id, + round_num=round_num + ) # 分页 return actions[offset:offset + limit] @@ -854,6 +948,81 @@ class SimulationRunner: return result + @classmethod + def cleanup_simulation_logs(cls, simulation_id: str) -> Dict[str, Any]: + """ + 清理模拟的运行日志(用于强制重新开始模拟) + + 会删除以下文件: + - run_state.json + - twitter/actions.jsonl + - reddit/actions.jsonl + - simulation.log + - stdout.log / stderr.log + + 注意:不会删除配置文件(simulation_config.json)和 profile 文件 + + Args: + simulation_id: 模拟ID + + Returns: + 清理结果信息 + """ + import shutil + + sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) + + if not os.path.exists(sim_dir): + return {"success": True, "message": "模拟目录不存在,无需清理"} + + cleaned_files = [] + errors = [] + + # 要删除的文件列表 + files_to_delete = [ + "run_state.json", + "simulation.log", + "stdout.log", + "stderr.log", + ] + + # 要删除的目录列表(包含动作日志) + dirs_to_clean = ["twitter", "reddit"] + + # 删除文件 + for filename in files_to_delete: + file_path = os.path.join(sim_dir, filename) + if os.path.exists(file_path): + try: + os.remove(file_path) + cleaned_files.append(filename) + except Exception as e: + errors.append(f"删除 {filename} 失败: {str(e)}") + + # 清理平台目录中的动作日志 + for dir_name in dirs_to_clean: + dir_path = os.path.join(sim_dir, dir_name) + if os.path.exists(dir_path): + actions_file = os.path.join(dir_path, "actions.jsonl") + if os.path.exists(actions_file): + try: + os.remove(actions_file) + cleaned_files.append(f"{dir_name}/actions.jsonl") + except Exception as e: + errors.append(f"删除 {dir_name}/actions.jsonl 失败: {str(e)}") + + # 清理内存中的运行状态 + if simulation_id in cls._run_states: + del cls._run_states[simulation_id] + + logger.info(f"清理模拟日志完成: {simulation_id}, 删除文件: {cleaned_files}") + + return { + "success": len(errors) == 0, + "cleaned_files": cleaned_files, + "errors": errors if errors else None + } + # 防止重复清理的标志 _cleanup_done = False diff --git a/frontend/src/api/simulation.js b/frontend/src/api/simulation.js index f3c85c1..907911a 100644 --- a/frontend/src/api/simulation.js +++ b/frontend/src/api/simulation.js @@ -108,3 +108,47 @@ export const getRunStatusDetail = (simulationId) => { return service.get(`/api/simulation/${simulationId}/run-status/detail`) } +/** + * 获取模拟中的帖子 + * @param {string} simulationId + * @param {string} platform - 'reddit' | 'twitter' + * @param {number} limit - 返回数量 + * @param {number} offset - 偏移量 + */ +export const getSimulationPosts = (simulationId, platform = 'reddit', limit = 50, offset = 0) => { + return service.get(`/api/simulation/${simulationId}/posts`, { + params: { platform, limit, offset } + }) +} + +/** + * 获取模拟时间线(按轮次汇总) + * @param {string} simulationId + * @param {number} startRound - 起始轮次 + * @param {number} endRound - 结束轮次 + */ +export const getSimulationTimeline = (simulationId, startRound = 0, endRound = null) => { + const params = { start_round: startRound } + if (endRound !== null) { + params.end_round = endRound + } + return service.get(`/api/simulation/${simulationId}/timeline`, { params }) +} + +/** + * 获取Agent统计信息 + * @param {string} simulationId + */ +export const getAgentStats = (simulationId) => { + return service.get(`/api/simulation/${simulationId}/agent-stats`) +} + +/** + * 获取模拟动作历史 + * @param {string} simulationId + * @param {Object} params - { limit, offset, platform, agent_id, round_num } + */ +export const getSimulationActions = (simulationId, params = {}) => { + return service.get(`/api/simulation/${simulationId}/actions`, { params }) +} + diff --git a/frontend/src/components/Step3Simulation.vue b/frontend/src/components/Step3Simulation.vue new file mode 100644 index 0000000..4020809 --- /dev/null +++ b/frontend/src/components/Step3Simulation.vue @@ -0,0 +1,1312 @@ + + + + + diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index f8a7a02..beabb64 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -2,6 +2,7 @@ import { createRouter, createWebHistory } from 'vue-router' import Home from '../views/Home.vue' import Process from '../views/MainView.vue' import SimulationView from '../views/SimulationView.vue' +import SimulationRunView from '../views/SimulationRunView.vue' const routes = [ { @@ -20,6 +21,12 @@ const routes = [ name: 'Simulation', component: SimulationView, props: true + }, + { + path: '/simulation/:simulationId/start', + name: 'SimulationRun', + component: SimulationRunView, + props: true } ] diff --git a/frontend/src/views/SimulationRunView.vue b/frontend/src/views/SimulationRunView.vue new file mode 100644 index 0000000..d22611d --- /dev/null +++ b/frontend/src/views/SimulationRunView.vue @@ -0,0 +1,343 @@ + + + + + diff --git a/frontend/src/views/SimulationView.vue b/frontend/src/views/SimulationView.vue index 12053e8..ad41e08 100644 --- a/frontend/src/views/SimulationView.vue +++ b/frontend/src/views/SimulationView.vue @@ -155,14 +155,19 @@ const handleNextStep = (params = {}) => { addLog('使用自动配置的模拟轮数') } - // TODO: 调用 startSimulation API 并跳转到 Step 3 - // 可以在这里调用 /api/simulation/start 接口 - // const startParams = { - // simulation_id: currentSimulationId.value, - // ...(params.maxRounds && { max_rounds: params.maxRounds }) - // } + // 构建路由参数 + const routeParams = { + name: 'SimulationRun', + params: { simulationId: currentSimulationId.value } + } - alert(`Step 3: 开始模拟 - Coming soon...\n${params.maxRounds ? `轮数: ${params.maxRounds}` : '使用自动配置轮数'}`) + // 如果有自定义轮数,通过 query 参数传递 + if (params.maxRounds) { + routeParams.query = { maxRounds: params.maxRounds } + } + + // 跳转到 Step 3 页面 + router.push(routeParams) } // --- Data Logic ---