From 91eb73ae44b4da9266439b64eb177e432b83c139 Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Tue, 9 Dec 2025 00:37:12 +0800 Subject: [PATCH] Enhance signal handling and suppress warnings in simulation scripts - Added signal handling to gracefully manage shutdown events across simulation scripts, ensuring proper cleanup of resources. - Introduced a global shutdown event to allow simulations to respond to termination signals, improving robustness. - Suppressed warnings related to multiprocessing resource tracking to avoid unnecessary log clutter during execution. - Updated cleanup logic in `SimulationRunner` and `ZepGraphMemoryManager` to prevent redundant calls and ensure efficient resource management. - Enhanced logging to provide clearer feedback during shutdown processes, improving traceability. --- backend/app/__init__.py | 6 ++ backend/app/services/simulation_runner.py | 53 +++++++++++- .../app/services/zep_graph_memory_updater.py | 21 +++-- backend/run.py | 11 +++ backend/scripts/run_parallel_simulation.py | 86 ++++++++++++++++++- backend/scripts/run_reddit_simulation.py | 55 ++++++++++-- backend/scripts/run_twitter_simulation.py | 53 +++++++++++- 7 files changed, 262 insertions(+), 23 deletions(-) diff --git a/backend/app/__init__.py b/backend/app/__init__.py index 40836c8..5b7f1fc 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -3,6 +3,12 @@ MiroFish Backend - Flask应用工厂 """ import os +import warnings + +# 抑制 multiprocessing resource_tracker 的警告(来自第三方库如 transformers) +# 需要在所有其他导入之前设置 +warnings.filterwarnings("ignore", message=".*resource_tracker.*") + from flask import Flask, request from flask_cors import CORS diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index eda1d7a..56614a5 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -854,6 +854,9 @@ class SimulationRunner: return result + # 防止重复清理的标志 + _cleanup_done = False + @classmethod def cleanup_all_simulations(cls): """ @@ -861,12 +864,23 @@ class SimulationRunner: 在服务器关闭时调用,确保所有子进程被终止 """ + # 防止重复清理 + if cls._cleanup_done: + return + cls._cleanup_done = True + + # 检查是否有内容需要清理(避免空进程的进程打印无用日志) + has_processes = bool(cls._processes) + has_updaters = bool(cls._graph_memory_enabled) + + if not has_processes and not has_updaters: + return # 没有需要清理的内容,静默返回 + logger.info("正在清理所有模拟进程...") - # 首先停止所有图谱记忆更新器 + # 首先停止所有图谱记忆更新器(stop_all 内部会打印日志) try: ZepGraphMemoryManager.stop_all() - logger.info("已停止所有图谱记忆更新器") except Exception as e: logger.error(f"停止图谱记忆更新器失败: {e}") cls._graph_memory_enabled.clear() @@ -899,7 +913,7 @@ class SimulationRunner: except Exception: process.kill() - # 更新状态 + # 更新 run_state.json state = cls.get_run_state(simulation_id) if state: state.runner_status = RunnerStatus.STOPPED @@ -908,6 +922,24 @@ class SimulationRunner: state.completed_at = datetime.now().isoformat() state.error = "服务器关闭,模拟被终止" cls._save_run_state(state) + + # 同时更新 state.json,将状态设为 stopped + try: + sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) + state_file = os.path.join(sim_dir, "state.json") + logger.info(f"尝试更新 state.json: {state_file}") + if os.path.exists(state_file): + with open(state_file, 'r', encoding='utf-8') as f: + state_data = json.load(f) + state_data['status'] = 'stopped' + state_data['updated_at'] = datetime.now().isoformat() + with open(state_file, 'w', encoding='utf-8') as f: + json.dump(state_data, f, indent=2, ensure_ascii=False) + logger.info(f"已更新 state.json 状态为 stopped: {simulation_id}") + else: + logger.warning(f"state.json 不存在: {state_file}") + except Exception as state_err: + logger.warning(f"更新 state.json 失败: {simulation_id}, error={state_err}") except Exception as e: logger.error(f"清理进程失败: {simulation_id}, error={e}") @@ -947,13 +979,26 @@ class SimulationRunner: if _cleanup_registered: return + # Flask debug 模式下,只在 reloader 子进程中注册清理(实际运行应用的进程) + # WERKZEUG_RUN_MAIN=true 表示是 reloader 子进程 + # 如果不是 debug 模式,则没有这个环境变量,也需要注册 + is_reloader_process = os.environ.get('WERKZEUG_RUN_MAIN') == 'true' + is_debug_mode = os.environ.get('FLASK_DEBUG') == '1' or os.environ.get('WERKZEUG_RUN_MAIN') is not None + + # 在 debug 模式下,只在 reloader 子进程中注册;非 debug 模式下始终注册 + if is_debug_mode and not is_reloader_process: + _cleanup_registered = True # 标记已注册,防止子进程再次尝试 + return + # 保存原有的信号处理器 original_sigint = signal.getsignal(signal.SIGINT) original_sigterm = signal.getsignal(signal.SIGTERM) def cleanup_handler(signum=None, frame=None): """信号处理器:先清理模拟进程,再调用原处理器""" - logger.info(f"收到信号 {signum},开始清理...") + # 只有在有进程需要清理时才打印日志 + if cls._processes or cls._graph_memory_enabled: + logger.info(f"收到信号 {signum},开始清理...") cls.cleanup_all_simulations() # 调用原有的信号处理器,让 Flask 正常退出 diff --git a/backend/app/services/zep_graph_memory_updater.py b/backend/app/services/zep_graph_memory_updater.py index 05f535d..ca48c31 100644 --- a/backend/app/services/zep_graph_memory_updater.py +++ b/backend/app/services/zep_graph_memory_updater.py @@ -506,16 +506,25 @@ class ZepGraphMemoryManager: del cls._updaters[simulation_id] logger.info(f"已停止图谱记忆更新器: simulation_id={simulation_id}") + # 防止 stop_all 重复调用的标志 + _stop_all_done = False + @classmethod def stop_all(cls): """停止所有更新器""" + # 防止重复调用 + if cls._stop_all_done: + return + cls._stop_all_done = True + with cls._lock: - for simulation_id, updater in list(cls._updaters.items()): - try: - updater.stop() - except Exception as e: - logger.error(f"停止更新器失败: simulation_id={simulation_id}, error={e}") - cls._updaters.clear() + if cls._updaters: + for simulation_id, updater in list(cls._updaters.items()): + try: + updater.stop() + except Exception as e: + logger.error(f"停止更新器失败: simulation_id={simulation_id}, error={e}") + cls._updaters.clear() logger.info("已停止所有图谱记忆更新器") @classmethod diff --git a/backend/run.py b/backend/run.py index 7f22ba8..da4cdf3 100644 --- a/backend/run.py +++ b/backend/run.py @@ -4,6 +4,17 @@ MiroFish Backend 启动入口 import os import sys +import warnings + +# 抑制 multiprocessing resource_tracker 的警告(来自第三方库如 transformers) +# 这个警告是无害的,只是在进程被终止时资源没有被正确清理 +warnings.filterwarnings("ignore", message=".*resource_tracker.*") +warnings.filterwarnings("ignore", category=UserWarning, module=".*multiprocessing.*") + +# 额外:通过环境变量告诉 Python 不要跟踪 multiprocessing 资源 +# 这可以从根本上避免警告 +import os +os.environ.setdefault('PYTHONWARNINGS', 'ignore::UserWarning') # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) diff --git a/backend/scripts/run_parallel_simulation.py b/backend/scripts/run_parallel_simulation.py index 83f8506..b9f9737 100644 --- a/backend/scripts/run_parallel_simulation.py +++ b/backend/scripts/run_parallel_simulation.py @@ -29,13 +29,23 @@ import argparse import asyncio import json import logging +import multiprocessing import os import random +import signal import sqlite3 import sys +import warnings from datetime import datetime from typing import Dict, Any, List, Optional, Tuple +# 抑制 multiprocessing resource_tracker 的警告(来自第三方库如 transformers) +warnings.filterwarnings("ignore", message=".*resource_tracker.*") + +# 全局变量:用于信号处理 +_shutdown_event = None +_cleanup_done = False + # 添加 backend 目录到路径 # 脚本固定位于 backend/scripts/ 目录 _scripts_dir = os.path.dirname(os.path.abspath(__file__)) @@ -1181,6 +1191,12 @@ async def run_twitter_simulation( start_time = datetime.now() for round_num in range(total_rounds): + # 检查是否收到退出信号 + if _shutdown_event and _shutdown_event.is_set(): + if main_logger: + main_logger.info(f"收到退出信号,在第 {round_num + 1} 轮停止模拟") + break + simulated_minutes = round_num * minutes_per_round simulated_hour = (simulated_minutes // 60) % 24 simulated_day = simulated_minutes // (60 * 24) + 1 @@ -1374,6 +1390,12 @@ async def run_reddit_simulation( start_time = datetime.now() for round_num in range(total_rounds): + # 检查是否收到退出信号 + if _shutdown_event and _shutdown_event.is_set(): + if main_logger: + main_logger.info(f"收到退出信号,在第 {round_num + 1} 轮停止模拟") + break + simulated_minutes = round_num * minutes_per_round simulated_hour = (simulated_minutes // 60) % 24 simulated_day = simulated_minutes // (60 * 24) + 1 @@ -1465,6 +1487,10 @@ async def main(): args = parser.parse_args() + # 在 main 函数开始时创建 shutdown 事件,确保整个程序都能响应退出信号 + global _shutdown_event + _shutdown_event = asyncio.Event() + if not os.path.exists(args.config): print(f"错误: 配置文件不存在: {args.config}") sys.exit(1) @@ -1549,15 +1575,22 @@ async def main(): ) ipc_handler.update_status("alive") - # 等待命令循环 + # 等待命令循环(使用全局 _shutdown_event) try: - while True: + while not _shutdown_event.is_set(): should_continue = await ipc_handler.process_commands() if not should_continue: break - await asyncio.sleep(0.5) # 轮询间隔 + # 使用 wait_for 替代 sleep,这样可以响应 shutdown_event + try: + await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5) + break # 收到退出信号 + except asyncio.TimeoutError: + pass # 超时继续循环 except KeyboardInterrupt: print("\n收到中断信号") + except asyncio.CancelledError: + print("\n任务被取消") except Exception as e: print(f"\n命令处理出错: {e}") @@ -1582,5 +1615,50 @@ async def main(): log_manager.info("=" * 60) +def setup_signal_handlers(loop=None): + """ + 设置信号处理器,确保收到 SIGTERM/SIGINT 时能够正确退出 + + 持久化模拟场景:模拟完成后不退出,等待 interview 命令 + 当收到终止信号时,需要: + 1. 通知 asyncio 循环退出等待 + 2. 让程序有机会正常清理资源(关闭数据库、环境等) + 3. 然后才退出 + """ + def signal_handler(signum, frame): + global _cleanup_done + sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT" + print(f"\n收到 {sig_name} 信号,正在退出...") + + if not _cleanup_done: + _cleanup_done = True + # 设置事件通知 asyncio 循环退出(让循环有机会清理资源) + if _shutdown_event: + _shutdown_event.set() + + # 不要直接 sys.exit(),让 asyncio 循环正常退出并清理资源 + # 如果是重复收到信号,才强制退出 + else: + print("强制退出...") + sys.exit(1) + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + if __name__ == "__main__": - asyncio.run(main()) + setup_signal_handlers() + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n程序被中断") + except SystemExit: + pass + finally: + # 清理 multiprocessing 资源跟踪器(防止退出时的警告) + try: + from multiprocessing import resource_tracker + resource_tracker._resource_tracker._stop() + except Exception: + pass + print("模拟进程已退出") diff --git a/backend/scripts/run_reddit_simulation.py b/backend/scripts/run_reddit_simulation.py index 2fa073e..14907cb 100644 --- a/backend/scripts/run_reddit_simulation.py +++ b/backend/scripts/run_reddit_simulation.py @@ -19,11 +19,16 @@ import json import logging import os import random +import signal import sys import sqlite3 from datetime import datetime from typing import Dict, Any, List, Optional +# 全局变量:用于信号处理 +_shutdown_event = None +_cleanup_done = False + # 添加项目路径 _scripts_dir = os.path.dirname(os.path.abspath(__file__)) _backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..')) @@ -659,15 +664,21 @@ class RedditSimulationRunner: self.ipc_handler.update_status("alive") - # 等待命令循环 + # 等待命令循环(使用全局 _shutdown_event) try: - while True: + while not _shutdown_event.is_set(): should_continue = await self.ipc_handler.process_commands() if not should_continue: break - await asyncio.sleep(0.5) # 轮询间隔 + try: + await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5) + break # 收到退出信号 + except asyncio.TimeoutError: + pass except KeyboardInterrupt: print("\n收到中断信号") + except asyncio.CancelledError: + print("\n任务被取消") except Exception as e: print(f"\n命令处理出错: {e}") @@ -704,6 +715,10 @@ async def main(): args = parser.parse_args() + # 在 main 函数开始时创建 shutdown 事件 + global _shutdown_event + _shutdown_event = asyncio.Event() + if not os.path.exists(args.config): print(f"错误: 配置文件不存在: {args.config}") sys.exit(1) @@ -719,6 +734,36 @@ async def main(): await runner.run(max_rounds=args.max_rounds) -if __name__ == "__main__": - asyncio.run(main()) +def setup_signal_handlers(): + """ + 设置信号处理器,确保收到 SIGTERM/SIGINT 时能够正确退出 + 让程序有机会正常清理资源(关闭数据库、环境等) + """ + def signal_handler(signum, frame): + global _cleanup_done + sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT" + print(f"\n收到 {sig_name} 信号,正在退出...") + if not _cleanup_done: + _cleanup_done = True + if _shutdown_event: + _shutdown_event.set() + else: + # 重复收到信号才强制退出 + print("强制退出...") + sys.exit(1) + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + +if __name__ == "__main__": + setup_signal_handlers() + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n程序被中断") + except SystemExit: + pass + finally: + print("模拟进程已退出") diff --git a/backend/scripts/run_twitter_simulation.py b/backend/scripts/run_twitter_simulation.py index c2a0f1f..caab9e9 100644 --- a/backend/scripts/run_twitter_simulation.py +++ b/backend/scripts/run_twitter_simulation.py @@ -19,11 +19,16 @@ import json import logging import os import random +import signal import sys import sqlite3 from datetime import datetime from typing import Dict, Any, List, Optional +# 全局变量:用于信号处理 +_shutdown_event = None +_cleanup_done = False + # 添加项目路径 _scripts_dir = os.path.dirname(os.path.abspath(__file__)) _backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..')) @@ -671,15 +676,21 @@ class TwitterSimulationRunner: self.ipc_handler.update_status("alive") - # 等待命令循环 + # 等待命令循环(使用全局 _shutdown_event) try: - while True: + while not _shutdown_event.is_set(): should_continue = await self.ipc_handler.process_commands() if not should_continue: break - await asyncio.sleep(0.5) # 轮询间隔 + try: + await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5) + break # 收到退出信号 + except asyncio.TimeoutError: + pass except KeyboardInterrupt: print("\n收到中断信号") + except asyncio.CancelledError: + print("\n任务被取消") except Exception as e: print(f"\n命令处理出错: {e}") @@ -716,6 +727,10 @@ async def main(): args = parser.parse_args() + # 在 main 函数开始时创建 shutdown 事件 + global _shutdown_event + _shutdown_event = asyncio.Event() + if not os.path.exists(args.config): print(f"错误: 配置文件不存在: {args.config}") sys.exit(1) @@ -731,5 +746,35 @@ async def main(): await runner.run(max_rounds=args.max_rounds) +def setup_signal_handlers(): + """ + 设置信号处理器,确保收到 SIGTERM/SIGINT 时能够正确退出 + 让程序有机会正常清理资源(关闭数据库、环境等) + """ + def signal_handler(signum, frame): + global _cleanup_done + sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT" + print(f"\n收到 {sig_name} 信号,正在退出...") + if not _cleanup_done: + _cleanup_done = True + if _shutdown_event: + _shutdown_event.set() + else: + # 重复收到信号才强制退出 + print("强制退出...") + sys.exit(1) + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + if __name__ == "__main__": - asyncio.run(main()) + setup_signal_handlers() + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n程序被中断") + except SystemExit: + pass + finally: + print("模拟进程已退出")