Enhance signal handling and suppress warnings in simulation scripts

- Added signal handling to gracefully manage shutdown events across simulation scripts, ensuring proper cleanup of resources.
- Introduced a global shutdown event to allow simulations to respond to termination signals, improving robustness.
- Suppressed warnings related to multiprocessing resource tracking to avoid unnecessary log clutter during execution.
- Updated cleanup logic in `SimulationRunner` and `ZepGraphMemoryManager` to prevent redundant calls and ensure efficient resource management.
- Enhanced logging to provide clearer feedback during shutdown processes, improving traceability.
This commit is contained in:
666ghj 2025-12-09 00:37:12 +08:00
parent 3f750ffda2
commit 91eb73ae44
7 changed files with 262 additions and 23 deletions

View file

@ -3,6 +3,12 @@ MiroFish Backend - Flask应用工厂
""" """
import os import os
import warnings
# 抑制 multiprocessing resource_tracker 的警告(来自第三方库如 transformers
# 需要在所有其他导入之前设置
warnings.filterwarnings("ignore", message=".*resource_tracker.*")
from flask import Flask, request from flask import Flask, request
from flask_cors import CORS from flask_cors import CORS

View file

@ -854,6 +854,9 @@ class SimulationRunner:
return result return result
# 防止重复清理的标志
_cleanup_done = False
@classmethod @classmethod
def cleanup_all_simulations(cls): def cleanup_all_simulations(cls):
""" """
@ -861,12 +864,23 @@ class SimulationRunner:
在服务器关闭时调用确保所有子进程被终止 在服务器关闭时调用确保所有子进程被终止
""" """
# 防止重复清理
if cls._cleanup_done:
return
cls._cleanup_done = True
# 检查是否有内容需要清理(避免空进程的进程打印无用日志)
has_processes = bool(cls._processes)
has_updaters = bool(cls._graph_memory_enabled)
if not has_processes and not has_updaters:
return # 没有需要清理的内容,静默返回
logger.info("正在清理所有模拟进程...") logger.info("正在清理所有模拟进程...")
# 首先停止所有图谱记忆更新器 # 首先停止所有图谱记忆更新器stop_all 内部会打印日志)
try: try:
ZepGraphMemoryManager.stop_all() ZepGraphMemoryManager.stop_all()
logger.info("已停止所有图谱记忆更新器")
except Exception as e: except Exception as e:
logger.error(f"停止图谱记忆更新器失败: {e}") logger.error(f"停止图谱记忆更新器失败: {e}")
cls._graph_memory_enabled.clear() cls._graph_memory_enabled.clear()
@ -899,7 +913,7 @@ class SimulationRunner:
except Exception: except Exception:
process.kill() process.kill()
# 更新状态 # 更新 run_state.json
state = cls.get_run_state(simulation_id) state = cls.get_run_state(simulation_id)
if state: if state:
state.runner_status = RunnerStatus.STOPPED state.runner_status = RunnerStatus.STOPPED
@ -909,6 +923,24 @@ class SimulationRunner:
state.error = "服务器关闭,模拟被终止" state.error = "服务器关闭,模拟被终止"
cls._save_run_state(state) cls._save_run_state(state)
# 同时更新 state.json将状态设为 stopped
try:
sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id)
state_file = os.path.join(sim_dir, "state.json")
logger.info(f"尝试更新 state.json: {state_file}")
if os.path.exists(state_file):
with open(state_file, 'r', encoding='utf-8') as f:
state_data = json.load(f)
state_data['status'] = 'stopped'
state_data['updated_at'] = datetime.now().isoformat()
with open(state_file, 'w', encoding='utf-8') as f:
json.dump(state_data, f, indent=2, ensure_ascii=False)
logger.info(f"已更新 state.json 状态为 stopped: {simulation_id}")
else:
logger.warning(f"state.json 不存在: {state_file}")
except Exception as state_err:
logger.warning(f"更新 state.json 失败: {simulation_id}, error={state_err}")
except Exception as e: except Exception as e:
logger.error(f"清理进程失败: {simulation_id}, error={e}") logger.error(f"清理进程失败: {simulation_id}, error={e}")
@ -947,12 +979,25 @@ class SimulationRunner:
if _cleanup_registered: if _cleanup_registered:
return return
# Flask debug 模式下,只在 reloader 子进程中注册清理(实际运行应用的进程)
# WERKZEUG_RUN_MAIN=true 表示是 reloader 子进程
# 如果不是 debug 模式,则没有这个环境变量,也需要注册
is_reloader_process = os.environ.get('WERKZEUG_RUN_MAIN') == 'true'
is_debug_mode = os.environ.get('FLASK_DEBUG') == '1' or os.environ.get('WERKZEUG_RUN_MAIN') is not None
# 在 debug 模式下,只在 reloader 子进程中注册;非 debug 模式下始终注册
if is_debug_mode and not is_reloader_process:
_cleanup_registered = True # 标记已注册,防止子进程再次尝试
return
# 保存原有的信号处理器 # 保存原有的信号处理器
original_sigint = signal.getsignal(signal.SIGINT) original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGTERM) original_sigterm = signal.getsignal(signal.SIGTERM)
def cleanup_handler(signum=None, frame=None): def cleanup_handler(signum=None, frame=None):
"""信号处理器:先清理模拟进程,再调用原处理器""" """信号处理器:先清理模拟进程,再调用原处理器"""
# 只有在有进程需要清理时才打印日志
if cls._processes or cls._graph_memory_enabled:
logger.info(f"收到信号 {signum},开始清理...") logger.info(f"收到信号 {signum},开始清理...")
cls.cleanup_all_simulations() cls.cleanup_all_simulations()

View file

@ -506,10 +506,19 @@ class ZepGraphMemoryManager:
del cls._updaters[simulation_id] del cls._updaters[simulation_id]
logger.info(f"已停止图谱记忆更新器: simulation_id={simulation_id}") logger.info(f"已停止图谱记忆更新器: simulation_id={simulation_id}")
# 防止 stop_all 重复调用的标志
_stop_all_done = False
@classmethod @classmethod
def stop_all(cls): def stop_all(cls):
"""停止所有更新器""" """停止所有更新器"""
# 防止重复调用
if cls._stop_all_done:
return
cls._stop_all_done = True
with cls._lock: with cls._lock:
if cls._updaters:
for simulation_id, updater in list(cls._updaters.items()): for simulation_id, updater in list(cls._updaters.items()):
try: try:
updater.stop() updater.stop()

View file

@ -4,6 +4,17 @@ MiroFish Backend 启动入口
import os import os
import sys import sys
import warnings
# 抑制 multiprocessing resource_tracker 的警告(来自第三方库如 transformers
# 这个警告是无害的,只是在进程被终止时资源没有被正确清理
warnings.filterwarnings("ignore", message=".*resource_tracker.*")
warnings.filterwarnings("ignore", category=UserWarning, module=".*multiprocessing.*")
# 额外:通过环境变量告诉 Python 不要跟踪 multiprocessing 资源
# 这可以从根本上避免警告
import os
os.environ.setdefault('PYTHONWARNINGS', 'ignore::UserWarning')
# 添加项目根目录到路径 # 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

View file

@ -29,13 +29,23 @@ import argparse
import asyncio import asyncio
import json import json
import logging import logging
import multiprocessing
import os import os
import random import random
import signal
import sqlite3 import sqlite3
import sys import sys
import warnings
from datetime import datetime from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
# 抑制 multiprocessing resource_tracker 的警告(来自第三方库如 transformers
warnings.filterwarnings("ignore", message=".*resource_tracker.*")
# 全局变量:用于信号处理
_shutdown_event = None
_cleanup_done = False
# 添加 backend 目录到路径 # 添加 backend 目录到路径
# 脚本固定位于 backend/scripts/ 目录 # 脚本固定位于 backend/scripts/ 目录
_scripts_dir = os.path.dirname(os.path.abspath(__file__)) _scripts_dir = os.path.dirname(os.path.abspath(__file__))
@ -1181,6 +1191,12 @@ async def run_twitter_simulation(
start_time = datetime.now() start_time = datetime.now()
for round_num in range(total_rounds): for round_num in range(total_rounds):
# 检查是否收到退出信号
if _shutdown_event and _shutdown_event.is_set():
if main_logger:
main_logger.info(f"收到退出信号,在第 {round_num + 1} 轮停止模拟")
break
simulated_minutes = round_num * minutes_per_round simulated_minutes = round_num * minutes_per_round
simulated_hour = (simulated_minutes // 60) % 24 simulated_hour = (simulated_minutes // 60) % 24
simulated_day = simulated_minutes // (60 * 24) + 1 simulated_day = simulated_minutes // (60 * 24) + 1
@ -1374,6 +1390,12 @@ async def run_reddit_simulation(
start_time = datetime.now() start_time = datetime.now()
for round_num in range(total_rounds): for round_num in range(total_rounds):
# 检查是否收到退出信号
if _shutdown_event and _shutdown_event.is_set():
if main_logger:
main_logger.info(f"收到退出信号,在第 {round_num + 1} 轮停止模拟")
break
simulated_minutes = round_num * minutes_per_round simulated_minutes = round_num * minutes_per_round
simulated_hour = (simulated_minutes // 60) % 24 simulated_hour = (simulated_minutes // 60) % 24
simulated_day = simulated_minutes // (60 * 24) + 1 simulated_day = simulated_minutes // (60 * 24) + 1
@ -1465,6 +1487,10 @@ async def main():
args = parser.parse_args() args = parser.parse_args()
# 在 main 函数开始时创建 shutdown 事件,确保整个程序都能响应退出信号
global _shutdown_event
_shutdown_event = asyncio.Event()
if not os.path.exists(args.config): if not os.path.exists(args.config):
print(f"错误: 配置文件不存在: {args.config}") print(f"错误: 配置文件不存在: {args.config}")
sys.exit(1) sys.exit(1)
@ -1549,15 +1575,22 @@ async def main():
) )
ipc_handler.update_status("alive") ipc_handler.update_status("alive")
# 等待命令循环 # 等待命令循环(使用全局 _shutdown_event
try: try:
while True: while not _shutdown_event.is_set():
should_continue = await ipc_handler.process_commands() should_continue = await ipc_handler.process_commands()
if not should_continue: if not should_continue:
break break
await asyncio.sleep(0.5) # 轮询间隔 # 使用 wait_for 替代 sleep这样可以响应 shutdown_event
try:
await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5)
break # 收到退出信号
except asyncio.TimeoutError:
pass # 超时继续循环
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n收到中断信号") print("\n收到中断信号")
except asyncio.CancelledError:
print("\n任务被取消")
except Exception as e: except Exception as e:
print(f"\n命令处理出错: {e}") print(f"\n命令处理出错: {e}")
@ -1582,5 +1615,50 @@ async def main():
log_manager.info("=" * 60) log_manager.info("=" * 60)
def setup_signal_handlers(loop=None):
"""
设置信号处理器确保收到 SIGTERM/SIGINT 时能够正确退出
持久化模拟场景模拟完成后不退出等待 interview 命令
当收到终止信号时需要
1. 通知 asyncio 循环退出等待
2. 让程序有机会正常清理资源关闭数据库环境等
3. 然后才退出
"""
def signal_handler(signum, frame):
global _cleanup_done
sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
print(f"\n收到 {sig_name} 信号,正在退出...")
if not _cleanup_done:
_cleanup_done = True
# 设置事件通知 asyncio 循环退出(让循环有机会清理资源)
if _shutdown_event:
_shutdown_event.set()
# 不要直接 sys.exit(),让 asyncio 循环正常退出并清理资源
# 如果是重复收到信号,才强制退出
else:
print("强制退出...")
sys.exit(1)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if __name__ == "__main__": if __name__ == "__main__":
setup_signal_handlers()
try:
asyncio.run(main()) asyncio.run(main())
except KeyboardInterrupt:
print("\n程序被中断")
except SystemExit:
pass
finally:
# 清理 multiprocessing 资源跟踪器(防止退出时的警告)
try:
from multiprocessing import resource_tracker
resource_tracker._resource_tracker._stop()
except Exception:
pass
print("模拟进程已退出")

View file

@ -19,11 +19,16 @@ import json
import logging import logging
import os import os
import random import random
import signal
import sys import sys
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
# 全局变量:用于信号处理
_shutdown_event = None
_cleanup_done = False
# 添加项目路径 # 添加项目路径
_scripts_dir = os.path.dirname(os.path.abspath(__file__)) _scripts_dir = os.path.dirname(os.path.abspath(__file__))
_backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..')) _backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..'))
@ -659,15 +664,21 @@ class RedditSimulationRunner:
self.ipc_handler.update_status("alive") self.ipc_handler.update_status("alive")
# 等待命令循环 # 等待命令循环(使用全局 _shutdown_event
try: try:
while True: while not _shutdown_event.is_set():
should_continue = await self.ipc_handler.process_commands() should_continue = await self.ipc_handler.process_commands()
if not should_continue: if not should_continue:
break break
await asyncio.sleep(0.5) # 轮询间隔 try:
await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5)
break # 收到退出信号
except asyncio.TimeoutError:
pass
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n收到中断信号") print("\n收到中断信号")
except asyncio.CancelledError:
print("\n任务被取消")
except Exception as e: except Exception as e:
print(f"\n命令处理出错: {e}") print(f"\n命令处理出错: {e}")
@ -704,6 +715,10 @@ async def main():
args = parser.parse_args() args = parser.parse_args()
# 在 main 函数开始时创建 shutdown 事件
global _shutdown_event
_shutdown_event = asyncio.Event()
if not os.path.exists(args.config): if not os.path.exists(args.config):
print(f"错误: 配置文件不存在: {args.config}") print(f"错误: 配置文件不存在: {args.config}")
sys.exit(1) sys.exit(1)
@ -719,6 +734,36 @@ async def main():
await runner.run(max_rounds=args.max_rounds) await runner.run(max_rounds=args.max_rounds)
if __name__ == "__main__": def setup_signal_handlers():
asyncio.run(main()) """
设置信号处理器确保收到 SIGTERM/SIGINT 时能够正确退出
让程序有机会正常清理资源关闭数据库环境等
"""
def signal_handler(signum, frame):
global _cleanup_done
sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
print(f"\n收到 {sig_name} 信号,正在退出...")
if not _cleanup_done:
_cleanup_done = True
if _shutdown_event:
_shutdown_event.set()
else:
# 重复收到信号才强制退出
print("强制退出...")
sys.exit(1)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if __name__ == "__main__":
setup_signal_handlers()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n程序被中断")
except SystemExit:
pass
finally:
print("模拟进程已退出")

View file

@ -19,11 +19,16 @@ import json
import logging import logging
import os import os
import random import random
import signal
import sys import sys
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
# 全局变量:用于信号处理
_shutdown_event = None
_cleanup_done = False
# 添加项目路径 # 添加项目路径
_scripts_dir = os.path.dirname(os.path.abspath(__file__)) _scripts_dir = os.path.dirname(os.path.abspath(__file__))
_backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..')) _backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..'))
@ -671,15 +676,21 @@ class TwitterSimulationRunner:
self.ipc_handler.update_status("alive") self.ipc_handler.update_status("alive")
# 等待命令循环 # 等待命令循环(使用全局 _shutdown_event
try: try:
while True: while not _shutdown_event.is_set():
should_continue = await self.ipc_handler.process_commands() should_continue = await self.ipc_handler.process_commands()
if not should_continue: if not should_continue:
break break
await asyncio.sleep(0.5) # 轮询间隔 try:
await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5)
break # 收到退出信号
except asyncio.TimeoutError:
pass
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n收到中断信号") print("\n收到中断信号")
except asyncio.CancelledError:
print("\n任务被取消")
except Exception as e: except Exception as e:
print(f"\n命令处理出错: {e}") print(f"\n命令处理出错: {e}")
@ -716,6 +727,10 @@ async def main():
args = parser.parse_args() args = parser.parse_args()
# 在 main 函数开始时创建 shutdown 事件
global _shutdown_event
_shutdown_event = asyncio.Event()
if not os.path.exists(args.config): if not os.path.exists(args.config):
print(f"错误: 配置文件不存在: {args.config}") print(f"错误: 配置文件不存在: {args.config}")
sys.exit(1) sys.exit(1)
@ -731,5 +746,35 @@ async def main():
await runner.run(max_rounds=args.max_rounds) await runner.run(max_rounds=args.max_rounds)
def setup_signal_handlers():
"""
设置信号处理器确保收到 SIGTERM/SIGINT 时能够正确退出
让程序有机会正常清理资源关闭数据库环境等
"""
def signal_handler(signum, frame):
global _cleanup_done
sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
print(f"\n收到 {sig_name} 信号,正在退出...")
if not _cleanup_done:
_cleanup_done = True
if _shutdown_event:
_shutdown_event.set()
else:
# 重复收到信号才强制退出
print("强制退出...")
sys.exit(1)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if __name__ == "__main__": if __name__ == "__main__":
setup_signal_handlers()
try:
asyncio.run(main()) asyncio.run(main())
except KeyboardInterrupt:
print("\n程序被中断")
except SystemExit:
pass
finally:
print("模拟进程已退出")