From af5c23569554fad974fc0e2df1d1e068602ba715 Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Mon, 1 Dec 2025 19:40:07 +0800 Subject: [PATCH] Enhance OASIS simulation capabilities and profile generation - Updated README.md to include detailed descriptions of new features, including Zep mixed search functionality and detailed persona generation for individual and group entities. - Implemented a robust mechanism for checking simulation preparation status to avoid redundant profile generation. - Added support for parallel profile generation, improving efficiency in creating OASIS Agent Profiles. - Enhanced the simulation configuration generator to adopt a stepwise approach, ensuring better handling of complex configurations. - Introduced error handling and retry mechanisms for LLM calls, improving the reliability of profile generation. - Updated simulation management to support new API parameters for controlling profile generation behavior. --- backend/README.md | 287 ++++++- backend/app/api/simulation.py | 241 +++++- .../app/services/oasis_profile_generator.py | 706 ++++++++++++++-- .../services/simulation_config_generator.py | 763 ++++++++++++------ backend/app/services/simulation_manager.py | 13 +- 5 files changed, 1602 insertions(+), 408 deletions(-) diff --git a/backend/README.md b/backend/README.md index 2521693..e620370 100644 --- a/backend/README.md +++ b/backend/README.md @@ -1057,9 +1057,165 @@ for node in all_nodes: | 方法 | 说明 | |------|------| -| `generate_profile_from_entity(entity, user_id)` | 从实体生成单个Profile | -| `generate_profiles_from_entities(entities)` | 批量生成Profile | -| `save_profiles_to_json(profiles, path, platform)` | 保存到JSON文件 | +| `generate_profile_from_entity(entity, user_id)` | 从实体生成单个Profile(带详细人设) | +| `generate_profiles_from_entities(entities, graph_id)` | 批量生成Profile | +| `save_profiles(profiles, path, platform)` | 保存Profile文件 | +| `_search_zep_for_entity(entity_name)` | 调用Zep检索获取额外上下文 | + +### 优化特性(v2.0) + +1. **Zep混合搜索功能**:使用多种查询策略获取丰富的实体信息 +2. **区分实体类型**:个人实体 vs 群体/机构实体,使用不同的提示词 +3. **详细人设生成**:生成500字以上的详细人设描述 + +### Zep混合搜索策略 + +`_search_zep_for_entity()` 方法采用多种搜索策略获取丰富信息: + +**查询策略:** +```python +queries = [ + f"总结{entity_name}的全部活动、事件和行为", + f"{entity_name}与其他实体的关系和互动", + f"{entity_name}的背景、历史和重要信息", + f"关于{entity_name}的所有事实和描述", +] +``` + +**说明:** Zep没有内置的混合搜索接口,需要分别搜索edges和nodes。我们使用**并行请求**同时执行两个搜索: + +```python +# 并行执行edges和nodes搜索 +with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + edge_future = executor.submit(search_edges) # scope="edges" + node_future = executor.submit(search_nodes) # scope="nodes" + + edge_result = edge_future.result(timeout=30) + node_result = node_future.result(timeout=30) +``` + +**搜索参数:** + +| 搜索类型 | scope | limit | 说明 | +|----------|-------|-------|------| +| 边搜索 | edges | 30 | 获取事实/关系信息 | +| 节点搜索 | nodes | 20 | 获取相关实体摘要 | + +**关键参数:** +- 必须传递 `graph_id` 参数,否则Zep API会返回400错误 +- 使用 `rrf` (Reciprocal Rank Fusion) reranker,稳定可靠 +- 使用线程池并行执行,提高效率 + +**返回数据结构:** +```python +{ + "facts": [...], # 事实列表(来自edges) + "node_summaries": [...], # 相关节点摘要(来自nodes) + "context": "..." # 综合上下文文本 +} +``` + +### LLM生成与JSON修复 + +为了避免LLM生成的JSON解析失败,实现了以下优化: + +1. **不限制max_tokens**:让LLM自由发挥,充分利用模型的上下文能力 +2. **多次重试机制**:最多3次尝试,每次降低temperature +3. **截断检测与修复**:检测`finish_reason='length'`,自动闭合JSON +4. **完善JSON修复机制**: + - `_fix_truncated_json()`: 修复被截断的JSON(闭合括号和字符串) + - `_try_fix_json()`: 多级修复策略 + - 提取JSON部分 + - 替换字符串中的换行符 + - 移除控制字符 + - 从损坏JSON中提取部分信息 +5. **字段验证**:确保必需字段存在,缺失时使用entity_summary填充 + +**错误处理流程**: +``` +LLM调用 → 检查截断 → JSON解析 → 修复尝试 → 部分提取 → 规则生成 +``` + +### 并行生成与实时输出 + +支持并行生成Agent人设,提高生成效率: + +```python +profiles = generator.generate_profiles_from_entities( + entities=filtered.entities, + use_llm=True, + graph_id="mirofish_xxx", + parallel_count=5 # 并行生成数量,默认5 +) +``` + +**API参数**: +```json +POST /api/simulation/prepare +{ + "simulation_id": "sim_xxx", + "parallel_profile_count": 5, // 可选,并行生成人设数量,默认5 + "force_regenerate": false // 可选,强制重新生成,默认false +} +``` + +**实时输出**: +- 每生成一个人设,立即输出到控制台(完整内容不截断) +- 包含用户名、简介、详细人设、年龄、性别、MBTI等信息 +- 方便实时监控生成进度和质量 + +### 避免重复生成 + +系统会自动检测已完成的准备工作,避免重复生成: + +**检测条件**: +1. `state.json` 存在且 `config_generated=true` +2. 必要文件存在:`reddit_profiles.json`, `twitter_profiles.csv`, `simulation_config.json` + +**API响应**: +```json +// 已准备完成时 +{ + "success": true, + "data": { + "simulation_id": "sim_xxx", + "status": "ready", + "message": "已有完成的准备工作,无需重复生成", + "already_prepared": true, + "prepare_info": { + "entities_count": 93, + "profiles_count": 93, + "entity_types": ["Student", "Professor", ...], + "existing_files": [...] + } + } +} +``` + +**强制重新生成**: +```json +POST /api/simulation/prepare +{ + "simulation_id": "sim_xxx", + "force_regenerate": true // 忽略已有准备,强制重新生成 +} +``` + +### 实体类型分类 + +```python +# 个人类型实体 - 生成具体人物设定 +INDIVIDUAL_ENTITY_TYPES = [ + "student", "alumni", "professor", "person", "publicfigure", + "expert", "faculty", "official", "journalist", "activist" +] + +# 群体/机构类型实体 - 生成官方账号设定 +GROUP_ENTITY_TYPES = [ + "university", "governmentagency", "organization", "ngo", + "mediaoutlet", "company", "institution", "group", "community" +] +``` ### Profile数据结构 @@ -1071,7 +1227,7 @@ class OasisAgentProfile: user_name: str # 用户名 name: str # 显示名称 bio: str # 简介(max 150字符) - persona: str # 详细人设描述 + persona: str # 详细人设描述(500字以上) # Reddit字段 karma: int = 1000 @@ -1094,6 +1250,37 @@ class OasisAgentProfile: source_entity_type: Optional[str] = None ``` +### 详细人设生成示例 + +**个人实体人设结构:** +```markdown +## 一、基本信息 +- 姓名/称呼、年龄、职业/身份 +- 教育背景、所在地 + +## 二、人物背景 +- 过去的重要经历 +- 与事件的关联 +- 社会关系网络 + +## 三、性格特征 +- MBTI类型及表现 +- 核心性格特点 +- 情绪表达方式 + +## 四、社交媒体行为模式 +- 发帖频率和时间 +- 内容偏好类型 +- 语言风格特点 + +## 五、立场与观点 +- 对核心话题的态度 +- 可能被激怒/感动的内容 + +## 六、独特特征 +- 口头禅、个人爱好等 +``` + ### Profile生成策略 **1. LLM生成(默认)** @@ -1135,12 +1322,31 @@ Generate a social media user profile with: 使用LLM分析模拟需求、文档内容、图谱实体信息,自动生成最佳的模拟参数配置。 +**采用分步生成策略**(避免一次性生成过长内容导致失败): +1. 生成时间配置(轻量级) +2. 生成事件配置和热点话题 +3. 分批生成Agent配置(**每批5个**,保证生成质量) +4. 生成平台配置 + | 方法 | 说明 | |------|------| -| `generate_config(...)` | 智能生成完整模拟配置 | -| `_build_context(...)` | 构建LLM上下文(最大5万字) | -| `_generate_config_with_llm(...)` | 调用LLM生成配置 | -| `_generate_default_config(...)` | 默认配置(LLM失败时) | +| `generate_config(...)` | 智能生成完整模拟配置(分步) | +| `_generate_time_config(...)` | 生成时间配置 | +| `_generate_event_config(...)` | 生成事件配置 | +| `_generate_agent_configs_batch(...)` | 分批生成Agent配置 | +| `_generate_agent_config_by_rule(...)` | 规则生成(LLM失败时) | + +### 中国人作息时间配置 + +系统针对中国用户群体,采用符合北京时间的作息习惯: + +| 时段 | 时间范围 | 活跃度系数 | 说明 | +|------|----------|------------|------| +| 深夜 | 0:00-5:59 | 0.05 | 几乎无人活动 | +| 早间 | 6:00-8:59 | 0.4 | 逐渐醒来 | +| 工作 | 9:00-18:59 | 0.7 | 工作时段中等活跃 | +| 高峰 | 19:00-22:59 | 1.5 | 晚间最活跃 | +| 夜间 | 23:00-23:59 | 0.5 | 活跃度下降 | ### LLM智能生成的配置内容 @@ -1152,10 +1358,14 @@ class TimeSimulationConfig: minutes_per_round: int = 30 # 每轮代表的时间(分钟) agents_per_hour_min: int = 5 # 每小时激活Agent数量(最小) agents_per_hour_max: int = 20 # 每小时激活Agent数量(最大) - peak_hours: List[int] # 高峰时段 [9,10,11,14,15,20,21,22] - off_peak_hours: List[int] # 低谷时段 [0,1,2,3,4,5] + peak_hours: List[int] = [19,20,21,22] # 高峰时段(晚间) + off_peak_hours: List[int] = [0,1,2,3,4,5] # 低谷时段(凌晨) peak_activity_multiplier: float = 1.5 # 高峰活跃度乘数 - off_peak_activity_multiplier: float = 0.3 # 低谷活跃度乘数 + off_peak_activity_multiplier: float = 0.05 # 凌晨活跃度极低 + morning_hours: List[int] = [6,7,8] # 早间时段 + morning_activity_multiplier: float = 0.4 + work_hours: List[int] = [9-18] # 工作时段 + work_activity_multiplier: float = 0.7 ``` **2. AgentActivityConfig(每个Agent的活动配置)** @@ -1178,14 +1388,18 @@ class AgentActivityConfig: influence_weight: float = 1.0 # 影响力权重 ``` -**3. 不同实体类型的默认参数差异** +**3. 不同实体类型的默认参数差异(符合中国人作息)** -| 实体类型 | 活跃度 | 发帖频率 | 响应延迟 | 影响力 | -|----------|--------|----------|----------|--------| -| University/GovernmentAgency | 0.2 | 0.1/小时 | 60-240分钟 | 3.0 | -| MediaOutlet | 0.6 | 1.0/小时 | 5-30分钟 | 2.5 | -| PublicFigure/Expert | 0.5 | 0.3/小时 | 10-60分钟 | 2.0 | -| Student/Person | 0.7 | 0.5/小时 | 1-20分钟 | 1.0 | +| 实体类型 | 活跃度 | 发帖频率 | 活跃时段 | 响应延迟 | 影响力 | +|----------|--------|----------|----------|----------|--------| +| University/GovernmentAgency | 0.2 | 0.1/小时 | 9:00-17:59(工作时间) | 60-240分钟 | 3.0 | +| MediaOutlet | 0.5 | 0.8/小时 | 7:00-23:59(全天) | 5-30分钟 | 2.5 | +| Professor/Expert | 0.4 | 0.3/小时 | 8:00-21:59(工作+晚间) | 15-90分钟 | 2.0 | +| Student | 0.8 | 0.6/小时 | 8-13, 18-23(上午+晚间) | 1-15分钟 | 0.8 | +| Alumni | 0.6 | 0.4/小时 | 12-13, 19-23(午休+晚间) | 5-30分钟 | 1.0 | +| Person(普通人) | 0.7 | 0.5/小时 | 9-13, 18-23(白天+晚间) | 2-20分钟 | 1.0 | + +**注意**:凌晨0-5点所有实体类型都几乎不活动(符合中国人作息习惯) --- @@ -1228,8 +1442,41 @@ uploads/simulations/sim_xxxx/ ``` **重要:OASIS平台的Profile格式要求不同:** -- **Twitter**: 使用CSV格式,字段:`user_id,user_name,name,bio,friend_count,follower_count,statuses_count,created_at` -- **Reddit**: 使用JSON格式,支持详细人设字段:`realname,username,bio,persona,age,gender,mbti,country,profession,interested_topics` + +**Twitter CSV格式**(符合OASIS官方要求): +```csv +user_id,name,username,user_char,description +0,张教授,professor_zhang,"完整人设描述(LLM内部使用)","简短简介(外部显示)" +``` +- `user_id`: 从0开始的顺序ID +- `name`: 真实姓名 +- `username`: 系统用户名 +- `user_char`: 完整人设(bio + persona),注入LLM系统提示,指导Agent行为 +- `description`: 简短简介,显示在用户资料页面 + +**Reddit JSON格式**: +```json +[ + { + "realname": "张教授", + "username": "professor_zhang", + "bio": "简短简介", + "persona": "详细人设描述", + "age": 42, + "gender": "男", + "mbti": "INTJ", + "country": "中国", + "profession": "教授", + "interested_topics": ["高等教育", "学术诚信"] + } +] +``` + +**user_char vs description 区别**: +| 字段 | 用途 | 可见性 | +|------|------|--------| +| user_char | LLM系统提示,决定Agent如何思考和行动 | 内部使用 | +| description | 用户资料页面的简介 | 其他用户可见 | ### 配置文件示例 (simulation_config.json) diff --git a/backend/app/api/simulation.py b/backend/app/api/simulation.py index 6d8651b..df445d2 100644 --- a/backend/app/api/simulation.py +++ b/backend/app/api/simulation.py @@ -213,6 +213,112 @@ def create_simulation(): }), 500 +def _check_simulation_prepared(simulation_id: str) -> tuple: + """ + 检查模拟是否已经准备完成 + + 检查条件: + 1. state.json 存在且 status 为 "ready" + 2. 必要文件存在:reddit_profiles.json, twitter_profiles.csv, simulation_config.json + + Args: + simulation_id: 模拟ID + + Returns: + (is_prepared: bool, info: dict) + """ + import os + from ..config import Config + + simulation_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) + + # 检查目录是否存在 + if not os.path.exists(simulation_dir): + return False, {"reason": "模拟目录不存在"} + + # 必要文件列表 + required_files = [ + "state.json", + "simulation_config.json", + "reddit_profiles.json", + "twitter_profiles.csv", + "run_reddit_simulation.py", + "run_twitter_simulation.py", + "run_parallel_simulation.py" + ] + + # 检查文件是否存在 + existing_files = [] + missing_files = [] + for f in required_files: + file_path = os.path.join(simulation_dir, f) + if os.path.exists(file_path): + existing_files.append(f) + else: + missing_files.append(f) + + if missing_files: + return False, { + "reason": "缺少必要文件", + "missing_files": missing_files, + "existing_files": existing_files + } + + # 检查state.json中的状态 + state_file = os.path.join(simulation_dir, "state.json") + try: + import json + with open(state_file, 'r', encoding='utf-8') as f: + state_data = json.load(f) + + status = state_data.get("status", "") + + # 如果状态是ready或preparing(已有文件),认为准备完成 + if status in ["ready", "preparing"] and state_data.get("config_generated"): + # 获取文件统计信息 + profiles_file = os.path.join(simulation_dir, "reddit_profiles.json") + config_file = os.path.join(simulation_dir, "simulation_config.json") + + profiles_count = 0 + if os.path.exists(profiles_file): + with open(profiles_file, 'r', encoding='utf-8') as f: + profiles_data = json.load(f) + profiles_count = len(profiles_data) if isinstance(profiles_data, list) else 0 + + # 如果状态是preparing但文件已完成,自动更新状态为ready + if status == "preparing": + try: + state_data["status"] = "ready" + from datetime import datetime + state_data["updated_at"] = datetime.now().isoformat() + with open(state_file, 'w', encoding='utf-8') as f: + json.dump(state_data, f, ensure_ascii=False, indent=2) + logger.info(f"自动更新模拟状态: {simulation_id} preparing -> ready") + status = "ready" + except Exception as e: + logger.warning(f"自动更新状态失败: {e}") + + return True, { + "status": status, + "entities_count": state_data.get("entities_count", 0), + "profiles_count": profiles_count, + "entity_types": state_data.get("entity_types", []), + "config_generated": state_data.get("config_generated", False), + "created_at": state_data.get("created_at"), + "updated_at": state_data.get("updated_at"), + "existing_files": existing_files + } + else: + return False, { + "reason": f"状态不是ready: {status}", + "status": status, + "config_generated": state_data.get("config_generated", False) + } + + except Exception as e: + return False, {"reason": f"读取状态文件失败: {str(e)}"} + + @simulation_bp.route('/prepare', methods=['POST']) def prepare_simulation(): """ @@ -221,17 +327,25 @@ def prepare_simulation(): 这是一个耗时操作,接口会立即返回task_id, 使用 GET /api/simulation/prepare/status 查询进度 + 特性: + - 自动检测已完成的准备工作,避免重复生成 + - 如果已准备完成,直接返回已有结果 + - 支持强制重新生成(force_regenerate=true) + 步骤: - 1. 从Zep图谱读取并过滤实体 - 2. 为每个实体生成OASIS Agent Profile(带重试机制) - 3. LLM智能生成模拟配置(带重试机制) - 4. 保存配置文件和预设脚本 + 1. 检查是否已有完成的准备工作 + 2. 从Zep图谱读取并过滤实体 + 3. 为每个实体生成OASIS Agent Profile(带重试机制) + 4. LLM智能生成模拟配置(带重试机制) + 5. 保存配置文件和预设脚本 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "entity_types": ["Student", "PublicFigure"], // 可选,指定实体类型 - "use_llm_for_profiles": true // 可选,是否用LLM生成人设 + "use_llm_for_profiles": true, // 可选,是否用LLM生成人设 + "parallel_profile_count": 5, // 可选,并行生成人设数量,默认5 + "force_regenerate": false // 可选,强制重新生成,默认false } 返回: @@ -239,14 +353,17 @@ def prepare_simulation(): "success": true, "data": { "simulation_id": "sim_xxxx", - "task_id": "task_xxxx", - "status": "preparing", - "message": "准备任务已启动" + "task_id": "task_xxxx", // 新任务时返回 + "status": "preparing|ready", + "message": "准备任务已启动|已有完成的准备工作", + "already_prepared": true|false // 是否已准备完成 } } """ import threading + import os from ..models.task import TaskManager, TaskStatus + from ..config import Config try: data = request.get_json() or {} @@ -267,6 +384,25 @@ def prepare_simulation(): "error": f"模拟不存在: {simulation_id}" }), 404 + # 检查是否强制重新生成 + force_regenerate = data.get('force_regenerate', False) + + # 检查是否已经准备完成(避免重复生成) + if not force_regenerate: + is_prepared, prepare_info = _check_simulation_prepared(simulation_id) + if is_prepared: + logger.info(f"模拟 {simulation_id} 已准备完成,跳过重复生成") + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "status": "ready", + "message": "已有完成的准备工作,无需重复生成", + "already_prepared": True, + "prepare_info": prepare_info + } + }) + # 从项目获取必要信息 project = ProjectManager.get_project(state.project_id) if not project: @@ -288,6 +424,7 @@ def prepare_simulation(): entity_types_list = data.get('entity_types') use_llm_for_profiles = data.get('use_llm_for_profiles', True) + parallel_profile_count = data.get('parallel_profile_count', 5) # 创建异步任务 task_manager = TaskManager() @@ -384,7 +521,8 @@ def prepare_simulation(): document_text=document_text, defined_entity_types=entity_types_list, use_llm_for_profiles=use_llm_for_profiles, - progress_callback=progress_callback + progress_callback=progress_callback, + parallel_profile_count=parallel_profile_count ) # 任务完成 @@ -414,7 +552,8 @@ def prepare_simulation(): "simulation_id": simulation_id, "task_id": task_id, "status": "preparing", - "message": "准备任务已启动,请通过 /api/simulation/prepare/status 查询进度" + "message": "准备任务已启动,请通过 /api/simulation/prepare/status 查询进度", + "already_prepared": False } }) @@ -438,9 +577,14 @@ def get_prepare_status(): """ 查询准备任务进度 + 支持两种查询方式: + 1. 通过task_id查询正在进行的任务进度 + 2. 通过simulation_id检查是否已有完成的准备工作 + 请求(JSON): { - "task_id": "task_xxxx" // 必填,prepare返回的task_id + "task_id": "task_xxxx", // 可选,prepare返回的task_id + "simulation_id": "sim_xxxx" // 可选,模拟ID(用于检查已完成的准备) } 返回: @@ -448,21 +592,11 @@ def get_prepare_status(): "success": true, "data": { "task_id": "task_xxxx", - "status": "processing", // pending/processing/completed/failed - "progress": 45, // 0-100 总进度 - "message": "[2/4] 生成Agent人设: 35/93 - 生成 教授张三 的人设...", - "progress_detail": { // 详细进度信息 - "current_stage": "generating_profiles", - "current_stage_name": "生成Agent人设", - "stage_index": 2, // 当前阶段序号 - "total_stages": 4, // 总阶段数 - "stage_progress": 38, // 阶段内进度 0-100 - "current_item": 35, // 当前处理项目序号 - "total_items": 93, // 当前阶段总项目数 - "item_description": "生成 教授张三 的人设..." - }, - "result": null, // 完成后返回结果 - "error": null // 失败时返回错误信息 + "status": "processing|completed|ready", + "progress": 45, + "message": "...", + "already_prepared": true|false, // 是否已有完成的准备 + "prepare_info": {...} // 已准备完成时的详细信息 } } """ @@ -472,24 +606,75 @@ def get_prepare_status(): data = request.get_json() or {} task_id = data.get('task_id') + simulation_id = data.get('simulation_id') + + # 如果提供了simulation_id,先检查是否已准备完成 + if simulation_id: + is_prepared, prepare_info = _check_simulation_prepared(simulation_id) + if is_prepared: + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "status": "ready", + "progress": 100, + "message": "已有完成的准备工作", + "already_prepared": True, + "prepare_info": prepare_info + } + }) + + # 如果没有task_id,返回错误 if not task_id: + if simulation_id: + # 有simulation_id但未准备完成 + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "status": "not_started", + "progress": 0, + "message": "尚未开始准备,请调用 /api/simulation/prepare 开始", + "already_prepared": False + } + }) return jsonify({ "success": False, - "error": "请提供 task_id" + "error": "请提供 task_id 或 simulation_id" }), 400 task_manager = TaskManager() task = task_manager.get_task(task_id) if not task: + # 任务不存在,但如果有simulation_id,检查是否已准备完成 + if simulation_id: + is_prepared, prepare_info = _check_simulation_prepared(simulation_id) + if is_prepared: + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "task_id": task_id, + "status": "ready", + "progress": 100, + "message": "任务已完成(准备工作已存在)", + "already_prepared": True, + "prepare_info": prepare_info + } + }) + return jsonify({ "success": False, "error": f"任务不存在: {task_id}" }), 404 + task_dict = task.to_dict() + task_dict["already_prepared"] = False + return jsonify({ "success": True, - "data": task.to_dict() + "data": task_dict }) except Exception as e: diff --git a/backend/app/services/oasis_profile_generator.py b/backend/app/services/oasis_profile_generator.py index fe0144a..f0ff6ee 100644 --- a/backend/app/services/oasis_profile_generator.py +++ b/backend/app/services/oasis_profile_generator.py @@ -1,6 +1,11 @@ """ OASIS Agent Profile生成器 将Zep图谱中的实体转换为OASIS模拟平台所需的Agent Profile格式 + +优化改进: +1. 调用Zep检索功能二次丰富节点信息 +2. 优化提示词生成非常详细的人设 +3. 区分个人实体和抽象群体实体 """ import json @@ -10,6 +15,7 @@ from dataclasses import dataclass, field from datetime import datetime from openai import OpenAI +from zep_cloud.client import Zep from ..config import Config from ..utils.logger import get_logger @@ -137,6 +143,11 @@ class OasisProfileGenerator: OASIS Profile生成器 将Zep图谱中的实体转换为OASIS模拟所需的Agent Profile + + 优化特性: + 1. 调用Zep图谱检索功能获取更丰富的上下文 + 2. 生成非常详细的人设(包括基本信息、职业经历、性格特征、社交媒体行为等) + 3. 区分个人实体和抽象群体实体 """ # MBTI类型列表 @@ -153,11 +164,25 @@ class OasisProfileGenerator: "Canada", "Australia", "Brazil", "India", "South Korea" ] + # 个人类型实体(需要生成具体人设) + INDIVIDUAL_ENTITY_TYPES = [ + "student", "alumni", "professor", "person", "publicfigure", + "expert", "faculty", "official", "journalist", "activist" + ] + + # 群体/机构类型实体(需要生成群体代表人设) + GROUP_ENTITY_TYPES = [ + "university", "governmentagency", "organization", "ngo", + "mediaoutlet", "company", "institution", "group", "community" + ] + def __init__( self, api_key: Optional[str] = None, base_url: Optional[str] = None, - model_name: Optional[str] = None + model_name: Optional[str] = None, + zep_api_key: Optional[str] = None, + graph_id: Optional[str] = None ): self.api_key = api_key or Config.LLM_API_KEY self.base_url = base_url or Config.LLM_BASE_URL @@ -170,6 +195,17 @@ class OasisProfileGenerator: api_key=self.api_key, base_url=self.base_url ) + + # Zep客户端用于检索丰富上下文 + self.zep_api_key = zep_api_key or Config.ZEP_API_KEY + self.zep_client = None + self.graph_id = graph_id + + if self.zep_api_key: + try: + self.zep_client = Zep(api_key=self.zep_api_key) + except Exception as e: + logger.warning(f"Zep客户端初始化失败: {e}") def generate_profile_from_entity( self, @@ -245,28 +281,195 @@ class OasisProfileGenerator: suffix = random.randint(100, 999) return f"{username}_{suffix}" + def _search_zep_for_entity(self, entity: EntityNode) -> Dict[str, Any]: + """ + 使用Zep图谱混合搜索功能获取实体相关的丰富信息 + + Zep没有内置混合搜索接口,需要分别搜索edges和nodes然后合并结果。 + 使用并行请求同时搜索,提高效率。 + + Args: + entity: 实体节点对象 + + Returns: + 包含facts, node_summaries, context的字典 + """ + import concurrent.futures + + if not self.zep_client: + return {"facts": [], "node_summaries": [], "context": ""} + + entity_name = entity.name + + results = { + "facts": [], + "node_summaries": [], + "context": "" + } + + # 必须有graph_id才能进行搜索 + if not self.graph_id: + logger.debug(f"跳过Zep检索:未设置graph_id") + return results + + comprehensive_query = f"关于{entity_name}的所有信息、活动、事件、关系和背景" + + def search_edges(): + """搜索边(事实/关系)""" + try: + return self.zep_client.graph.search( + query=comprehensive_query, + graph_id=self.graph_id, + limit=30, + scope="edges", + reranker="rrf" + ) + except Exception as e: + logger.debug(f"Zep边搜索失败: {e}") + return None + + def search_nodes(): + """搜索节点(实体摘要)""" + try: + return self.zep_client.graph.search( + query=comprehensive_query, + graph_id=self.graph_id, + limit=20, + scope="nodes", + reranker="rrf" + ) + except Exception as e: + logger.debug(f"Zep节点搜索失败: {e}") + return None + + try: + # 并行执行edges和nodes搜索 + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + edge_future = executor.submit(search_edges) + node_future = executor.submit(search_nodes) + + # 获取结果 + edge_result = edge_future.result(timeout=30) + node_result = node_future.result(timeout=30) + + # 处理边搜索结果 + all_facts = set() + if edge_result and hasattr(edge_result, 'edges') and edge_result.edges: + for edge in edge_result.edges: + if hasattr(edge, 'fact') and edge.fact: + all_facts.add(edge.fact) + results["facts"] = list(all_facts) + + # 处理节点搜索结果 + all_summaries = set() + if node_result and hasattr(node_result, 'nodes') and node_result.nodes: + for node in node_result.nodes: + if hasattr(node, 'summary') and node.summary: + all_summaries.add(node.summary) + if hasattr(node, 'name') and node.name and node.name != entity_name: + all_summaries.add(f"相关实体: {node.name}") + results["node_summaries"] = list(all_summaries) + + # 构建综合上下文 + context_parts = [] + if results["facts"]: + context_parts.append("事实信息:\n" + "\n".join(f"- {f}" for f in results["facts"][:20])) + if results["node_summaries"]: + context_parts.append("相关实体:\n" + "\n".join(f"- {s}" for s in results["node_summaries"][:10])) + results["context"] = "\n\n".join(context_parts) + + logger.info(f"Zep混合检索完成: {entity_name}, 获取 {len(results['facts'])} 条事实, {len(results['node_summaries'])} 个相关节点") + + except concurrent.futures.TimeoutError: + logger.warning(f"Zep检索超时 ({entity_name})") + except Exception as e: + logger.warning(f"Zep检索失败 ({entity_name}): {e}") + + return results + def _build_entity_context(self, entity: EntityNode) -> str: - """构建实体的上下文信息""" + """ + 构建实体的完整上下文信息 + + 包括: + 1. 实体本身的边信息(事实) + 2. 关联节点的详细信息 + 3. Zep混合检索到的丰富信息 + """ context_parts = [] - # 添加相关边信息 + # 1. 添加实体属性信息 + if entity.attributes: + attrs = [] + for key, value in entity.attributes.items(): + if value and str(value).strip(): + attrs.append(f"- {key}: {value}") + if attrs: + context_parts.append("### 实体属性\n" + "\n".join(attrs)) + + # 2. 添加相关边信息(事实/关系) + existing_facts = set() if entity.related_edges: relationships = [] - for edge in entity.related_edges[:10]: # 最多取10条 - if edge.get("fact"): - relationships.append(edge["fact"]) + for edge in entity.related_edges: # 不限制数量 + fact = edge.get("fact", "") + edge_name = edge.get("edge_name", "") + direction = edge.get("direction", "") + + if fact: + relationships.append(f"- {fact}") + existing_facts.add(fact) + elif edge_name: + if direction == "outgoing": + relationships.append(f"- {entity.name} --[{edge_name}]--> (相关实体)") + else: + relationships.append(f"- (相关实体) --[{edge_name}]--> {entity.name}") if relationships: - context_parts.append("Related facts:\n" + "\n".join(f"- {r}" for r in relationships)) + context_parts.append("### 相关事实和关系\n" + "\n".join(relationships)) - # 添加关联节点信息 + # 3. 添加关联节点的详细信息 if entity.related_nodes: - related_names = [n["name"] for n in entity.related_nodes[:5]] - if related_names: - context_parts.append(f"Related to: {', '.join(related_names)}") + related_info = [] + for node in entity.related_nodes: # 不限制数量 + node_name = node.get("name", "") + node_labels = node.get("labels", []) + node_summary = node.get("summary", "") + + # 过滤掉默认标签 + custom_labels = [l for l in node_labels if l not in ["Entity", "Node"]] + label_str = f" ({', '.join(custom_labels)})" if custom_labels else "" + + if node_summary: + related_info.append(f"- **{node_name}**{label_str}: {node_summary}") + else: + related_info.append(f"- **{node_name}**{label_str}") + + if related_info: + context_parts.append("### 关联实体信息\n" + "\n".join(related_info)) + + # 4. 使用Zep混合检索获取更丰富的信息 + zep_results = self._search_zep_for_entity(entity) + + if zep_results.get("facts"): + # 去重:排除已存在的事实 + new_facts = [f for f in zep_results["facts"] if f not in existing_facts] + if new_facts: + context_parts.append("### Zep检索到的事实信息\n" + "\n".join(f"- {f}" for f in new_facts[:15])) + + if zep_results.get("node_summaries"): + context_parts.append("### Zep检索到的相关节点\n" + "\n".join(f"- {s}" for s in zep_results["node_summaries"][:10])) return "\n\n".join(context_parts) + def _is_individual_entity(self, entity_type: str) -> bool: + """判断是否是个人类型实体""" + return entity_type.lower() in self.INDIVIDUAL_ENTITY_TYPES + + def _is_group_entity(self, entity_type: str) -> bool: + """判断是否是群体/机构类型实体""" + return entity_type.lower() in self.GROUP_ENTITY_TYPES + def _generate_profile_with_llm( self, entity_name: str, @@ -275,63 +478,271 @@ class OasisProfileGenerator: entity_attributes: Dict[str, Any], context: str ) -> Dict[str, Any]: - """使用LLM生成详细人设""" + """ + 使用LLM生成非常详细的人设 - prompt = f"""Based on the following entity information, generate a detailed social media user profile for simulation purposes. + 根据实体类型区分: + - 个人实体:生成具体的人物设定 + - 群体/机构实体:生成代表性账号设定 + """ + + is_individual = self._is_individual_entity(entity_type) + + if is_individual: + prompt = self._build_individual_persona_prompt( + entity_name, entity_type, entity_summary, entity_attributes, context + ) + else: + prompt = self._build_group_persona_prompt( + entity_name, entity_type, entity_summary, entity_attributes, context + ) -Entity Information: -- Name: {entity_name} -- Type: {entity_type} -- Summary: {entity_summary} -- Attributes: {json.dumps(entity_attributes, ensure_ascii=False)} - -Context: -{context} - -Generate a JSON object with the following fields: -{{ - "bio": "A short bio (max 150 chars) suitable for social media", - "persona": "A detailed persona description (2-3 sentences) describing personality, interests, and behavior patterns", - "age": , - "gender": "", - "mbti": "", - "country": "", - "profession": "", - "interested_topics": ["topic1", "topic2", ...] -}} - -Important: -- The profile should be consistent with the entity type and context -- Make the persona feel realistic and suitable for social media simulation -- If the entity is an organization, institution, or non-person, adapt the profile accordingly (e.g., as an official account) -- Return ONLY the JSON object, no additional text""" - - try: - # 使用重试机制调用LLM API - from ..utils.retry import RetryableAPIClient - - retry_client = RetryableAPIClient(max_retries=3, initial_delay=1.0) - - def call_llm(): - return self.client.chat.completions.create( + # 尝试多次生成,直到成功或达到最大重试次数 + max_attempts = 3 + last_error = None + + for attempt in range(max_attempts): + try: + response = self.client.chat.completions.create( model=self.model_name, messages=[ - {"role": "system", "content": "You are a profile generator for social media simulation. Generate realistic user profiles based on entity information."}, + {"role": "system", "content": self._get_system_prompt(is_individual)}, {"role": "user", "content": prompt} ], response_format={"type": "json_object"}, - temperature=0.7 + temperature=0.7 - (attempt * 0.1) # 每次重试降低温度 + # 不设置max_tokens,让LLM自由发挥 ) + + content = response.choices[0].message.content + + # 检查是否被截断(finish_reason不是'stop') + finish_reason = response.choices[0].finish_reason + if finish_reason == 'length': + logger.warning(f"LLM输出被截断 (attempt {attempt+1}), 尝试修复...") + content = self._fix_truncated_json(content) + + # 尝试解析JSON + try: + result = json.loads(content) + + # 验证必需字段 + if "bio" not in result or not result["bio"]: + result["bio"] = entity_summary[:200] if entity_summary else f"{entity_type}: {entity_name}" + if "persona" not in result or not result["persona"]: + result["persona"] = entity_summary or f"{entity_name}是一个{entity_type}。" + + return result + + except json.JSONDecodeError as je: + logger.warning(f"JSON解析失败 (attempt {attempt+1}): {str(je)[:80]}") + + # 尝试修复JSON + result = self._try_fix_json(content, entity_name, entity_type, entity_summary) + if result.get("_fixed"): + del result["_fixed"] + return result + + last_error = je + + except Exception as e: + logger.warning(f"LLM调用失败 (attempt {attempt+1}): {str(e)[:80]}") + last_error = e + import time + time.sleep(1 * (attempt + 1)) # 指数退避 + + logger.warning(f"LLM生成人设失败({max_attempts}次尝试): {last_error}, 使用规则生成") + return self._generate_profile_rule_based( + entity_name, entity_type, entity_summary, entity_attributes + ) + + def _fix_truncated_json(self, content: str) -> str: + """修复被截断的JSON(输出被max_tokens限制截断)""" + import re + + # 如果JSON被截断,尝试闭合它 + content = content.strip() + + # 计算未闭合的括号 + open_braces = content.count('{') - content.count('}') + open_brackets = content.count('[') - content.count(']') + + # 检查是否有未闭合的字符串 + # 简单检查:如果最后一个引号后没有逗号或闭合括号,可能是字符串被截断 + if content and content[-1] not in '",}]': + # 尝试闭合字符串 + content += '"' + + # 闭合括号 + content += ']' * open_brackets + content += '}' * open_braces + + return content + + def _try_fix_json(self, content: str, entity_name: str, entity_type: str, entity_summary: str = "") -> Dict[str, Any]: + """尝试修复损坏的JSON""" + import re + + # 1. 首先尝试修复被截断的情况 + content = self._fix_truncated_json(content) + + # 2. 尝试提取JSON部分 + json_match = re.search(r'\{[\s\S]*\}', content) + if json_match: + json_str = json_match.group() - response = retry_client.call_with_retry(call_llm) - result = json.loads(response.choices[0].message.content) - return result + # 3. 处理字符串中的换行符问题 + # 找到所有字符串值并替换其中的换行符 + def fix_string_newlines(match): + s = match.group(0) + # 替换字符串内的实际换行符为空格 + s = s.replace('\n', ' ').replace('\r', ' ') + # 替换多余空格 + s = re.sub(r'\s+', ' ', s) + return s - except Exception as e: - logger.warning(f"LLM生成人设失败(已重试): {str(e)}, 使用规则生成") - return self._generate_profile_rule_based( - entity_name, entity_type, entity_summary, entity_attributes - ) + # 匹配JSON字符串值 + json_str = re.sub(r'"[^"\\]*(?:\\.[^"\\]*)*"', fix_string_newlines, json_str) + + # 4. 尝试解析 + try: + result = json.loads(json_str) + result["_fixed"] = True + return result + except json.JSONDecodeError as e: + # 5. 如果还是失败,尝试更激进的修复 + try: + # 移除所有控制字符 + json_str = re.sub(r'[\x00-\x1f\x7f-\x9f]', ' ', json_str) + # 替换所有连续空白 + json_str = re.sub(r'\s+', ' ', json_str) + result = json.loads(json_str) + result["_fixed"] = True + return result + except: + pass + + # 6. 尝试从内容中提取部分信息 + bio_match = re.search(r'"bio"\s*:\s*"([^"]*)"', content) + persona_match = re.search(r'"persona"\s*:\s*"([^"]*)', content) # 可能被截断 + + bio = bio_match.group(1) if bio_match else (entity_summary[:200] if entity_summary else f"{entity_type}: {entity_name}") + persona = persona_match.group(1) if persona_match else (entity_summary or f"{entity_name}是一个{entity_type}。") + + # 如果提取到了有意义的内容,标记为已修复 + if bio_match or persona_match: + logger.info(f"从损坏的JSON中提取了部分信息") + return { + "bio": bio, + "persona": persona, + "_fixed": True + } + + # 7. 完全失败,返回基础结构 + logger.warning(f"JSON修复失败,返回基础结构") + return { + "bio": entity_summary[:200] if entity_summary else f"{entity_type}: {entity_name}", + "persona": entity_summary or f"{entity_name}是一个{entity_type}。" + } + + def _get_system_prompt(self, is_individual: bool) -> str: + """获取系统提示词""" + base_prompt = "你是社交媒体用户画像生成专家。生成详细、真实的人设用于舆论模拟,最大程度还原已有现实情况。必须返回有效的JSON格式,所有字符串值不能包含未转义的换行符。使用中文。" + return base_prompt + + def _build_individual_persona_prompt( + self, + entity_name: str, + entity_type: str, + entity_summary: str, + entity_attributes: Dict[str, Any], + context: str + ) -> str: + """构建个人实体的详细人设提示词""" + + attrs_str = json.dumps(entity_attributes, ensure_ascii=False) if entity_attributes else "无" + context_str = context[:3000] if context else "无额外上下文" + + return f"""为实体生成详细的社交媒体用户人设,最大程度还原已有现实情况。 + +实体名称: {entity_name} +实体类型: {entity_type} +实体摘要: {entity_summary} +实体属性: {attrs_str} + +上下文信息: +{context_str} + +请生成JSON,包含以下字段: + +1. bio: 社交媒体简介,200字 +2. persona: 详细人设描述(2000字的纯文本),需包含: + - 基本信息(年龄、职业、教育背景、所在地) + - 人物背景(重要经历、与事件的关联、社会关系) + - 性格特征(MBTI类型、核心性格、情绪表达方式) + - 社交媒体行为(发帖频率、内容偏好、互动风格、语言特点) + - 立场观点(对话题的态度、可能被激怒/感动的内容) + - 独特特征(口头禅、特殊经历、个人爱好) + - 个人记忆(人设的重要部分,要介绍这个个体与事件的关联,以及这个个体在事件中的已有动作与反应) +3. age: 年龄数字 +4. gender: 性别(男/女) +5. mbti: MBTI类型 +6. country: 国家 +7. profession: 职业 +8. interested_topics: 感兴趣话题数组 + +重要: +- 所有字段值必须是字符串或数字,不要使用换行符 +- persona必须是一段连贯的文字描述 +- 使用中文 +- 内容要与实体信息保持一致""" + + def _build_group_persona_prompt( + self, + entity_name: str, + entity_type: str, + entity_summary: str, + entity_attributes: Dict[str, Any], + context: str + ) -> str: + """构建群体/机构实体的详细人设提示词""" + + attrs_str = json.dumps(entity_attributes, ensure_ascii=False) if entity_attributes else "无" + context_str = context[:3000] if context else "无额外上下文" + + return f"""为机构/群体实体生成详细的社交媒体账号设定,最大程度还原已有现实情况。 + +实体名称: {entity_name} +实体类型: {entity_type} +实体摘要: {entity_summary} +实体属性: {attrs_str} + +上下文信息: +{context_str} + +请生成JSON,包含以下字段: + +1. bio: 官方账号简介,200字,专业得体 +2. persona: 详细账号设定描述(2000字的纯文本),需包含: + - 机构基本信息(正式名称、机构性质、成立背景、主要职能) + - 账号定位(账号类型、目标受众、核心功能) + - 发言风格(语言特点、常用表达、禁忌话题) + - 发布内容特点(内容类型、发布频率、活跃时间段) + - 立场态度(对核心话题的官方立场、面对争议的处理方式) + - 特殊说明(代表的群体画像、运营习惯) + - 机构记忆(机构人设的重要部分,要介绍这个机构与事件的关联,以及这个机构在事件中的已有动作与反应) +3. age: null(机构不适用) +4. gender: null(机构不适用) +5. mbti: 可选,用于描述账号风格,如ISTJ代表严谨保守 +6. country: 国家 +7. profession: 机构职能描述 +8. interested_topics: 关注领域数组 + +重要: +- 所有字段值必须是字符串、数字或null +- persona必须是一段连贯的文字描述,不要使用换行符 +- 使用中文 +- 机构账号发言要符合其身份定位""" def _generate_profile_rule_based( self, @@ -398,29 +809,46 @@ Important: "interested_topics": ["General", "Social Issues"], } + def set_graph_id(self, graph_id: str): + """设置图谱ID用于Zep检索""" + self.graph_id = graph_id + def generate_profiles_from_entities( self, entities: List[EntityNode], use_llm: bool = True, - progress_callback: Optional[callable] = None + progress_callback: Optional[callable] = None, + graph_id: Optional[str] = None, + parallel_count: int = 5 ) -> List[OasisAgentProfile]: """ - 批量从实体生成Agent Profile + 批量从实体生成Agent Profile(支持并行生成) Args: entities: 实体列表 use_llm: 是否使用LLM生成详细人设 progress_callback: 进度回调函数 (current, total, message) + graph_id: 图谱ID,用于Zep检索获取更丰富上下文 + parallel_count: 并行生成数量,默认5 Returns: Agent Profile列表 """ - profiles = [] - total = len(entities) + import concurrent.futures + from threading import Lock - for idx, entity in enumerate(entities): - if progress_callback: - progress_callback(idx + 1, total, f"生成 {entity.name} 的人设...") + # 设置graph_id用于Zep检索 + if graph_id: + self.graph_id = graph_id + + total = len(entities) + profiles = [None] * total # 预分配列表保持顺序 + completed_count = [0] # 使用列表以便在闭包中修改 + lock = Lock() + + def generate_single_profile(idx: int, entity: EntityNode) -> tuple: + """生成单个profile的工作函数""" + entity_type = entity.get_entity_type() or "Entity" try: profile = self.generate_profile_from_entity( @@ -428,23 +856,115 @@ Important: user_id=idx, use_llm=use_llm ) - profiles.append(profile) + + # 实时输出生成的人设到控制台和日志 + self._print_generated_profile(entity.name, entity_type, profile) + + return idx, profile, None except Exception as e: logger.error(f"生成实体 {entity.name} 的人设失败: {str(e)}") # 创建一个基础profile - profiles.append(OasisAgentProfile( + fallback_profile = OasisAgentProfile( user_id=idx, user_name=self._generate_username(entity.name), name=entity.name, - bio=f"{entity.get_entity_type() or 'Entity'}: {entity.name}", + bio=f"{entity_type}: {entity.name}", persona=entity.summary or f"A participant in social discussions.", source_entity_uuid=entity.uuid, - source_entity_type=entity.get_entity_type(), - )) + source_entity_type=entity_type, + ) + return idx, fallback_profile, str(e) + + logger.info(f"开始并行生成 {total} 个Agent人设(并行数: {parallel_count})...") + print(f"\n{'='*60}") + print(f"开始生成Agent人设 - 共 {total} 个实体,并行数: {parallel_count}") + print(f"{'='*60}\n") + + # 使用线程池并行执行 + with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_count) as executor: + # 提交所有任务 + future_to_entity = { + executor.submit(generate_single_profile, idx, entity): (idx, entity) + for idx, entity in enumerate(entities) + } + + # 收集结果 + for future in concurrent.futures.as_completed(future_to_entity): + idx, entity = future_to_entity[future] + entity_type = entity.get_entity_type() or "Entity" + + try: + result_idx, profile, error = future.result() + profiles[result_idx] = profile + + with lock: + completed_count[0] += 1 + current = completed_count[0] + + if progress_callback: + progress_callback( + current, + total, + f"已完成 {current}/{total}: {entity.name}({entity_type})" + ) + + if error: + logger.warning(f"[{current}/{total}] {entity.name} 使用备用人设: {error}") + else: + logger.info(f"[{current}/{total}] 成功生成人设: {entity.name} ({entity_type})") + + except Exception as e: + logger.error(f"处理实体 {entity.name} 时发生异常: {str(e)}") + with lock: + completed_count[0] += 1 + profiles[idx] = OasisAgentProfile( + user_id=idx, + user_name=self._generate_username(entity.name), + name=entity.name, + bio=f"{entity_type}: {entity.name}", + persona=entity.summary or "A participant in social discussions.", + source_entity_uuid=entity.uuid, + source_entity_type=entity_type, + ) + + print(f"\n{'='*60}") + print(f"人设生成完成!共生成 {len([p for p in profiles if p])} 个Agent") + print(f"{'='*60}\n") return profiles + def _print_generated_profile(self, entity_name: str, entity_type: str, profile: OasisAgentProfile): + """实时输出生成的人设到控制台(完整内容,不截断)""" + separator = "-" * 70 + + # 构建完整输出内容(不截断) + topics_str = ', '.join(profile.interested_topics) if profile.interested_topics else '无' + + output_lines = [ + f"\n{separator}", + f"[已生成] {entity_name} ({entity_type})", + f"{separator}", + f"用户名: {profile.user_name}", + f"", + f"【简介】", + f"{profile.bio}", + f"", + f"【详细人设】", + f"{profile.persona}", + f"", + f"【基本属性】", + f"年龄: {profile.age} | 性别: {profile.gender} | MBTI: {profile.mbti}", + f"职业: {profile.profession} | 国家: {profile.country}", + f"兴趣话题: {topics_str}", + separator + ] + + output = "\n".join(output_lines) + + # 只输出到控制台(避免重复,logger不再输出完整内容) + print(output) + def save_profiles( self, profiles: List[OasisAgentProfile], @@ -470,10 +990,18 @@ Important: def _save_twitter_csv(self, profiles: List[OasisAgentProfile], file_path: str): """ - 保存Twitter Profile为CSV格式 + 保存Twitter Profile为CSV格式(符合OASIS官方要求) OASIS Twitter要求的CSV字段: - user_id, user_name, name, bio, friend_count, follower_count, statuses_count, created_at + - user_id: 用户ID(根据CSV顺序从0开始) + - name: 用户真实姓名 + - username: 系统中的用户名 + - user_char: 详细人设描述(注入到LLM系统提示中,指导Agent行为) + - description: 简短的公开简介(显示在用户资料页面) + + user_char vs description 区别: + - user_char: 内部使用,LLM系统提示,决定Agent如何思考和行动 + - description: 外部显示,其他用户可见的简介 """ import csv @@ -484,28 +1012,32 @@ Important: with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) - # 写入表头 - headers = ['user_id', 'user_name', 'name', 'bio', 'friend_count', - 'follower_count', 'statuses_count', 'created_at'] + # 写入OASIS要求的表头 + headers = ['user_id', 'name', 'username', 'user_char', 'description'] writer.writerow(headers) # 写入数据行 - for profile in profiles: - # bio需要处理换行符和逗号 - bio = profile.bio.replace('\n', ' ').replace('\r', ' ') + for idx, profile in enumerate(profiles): + # user_char: 完整人设(bio + persona),用于LLM系统提示 + user_char = profile.bio + if profile.persona and profile.persona != profile.bio: + user_char = f"{profile.bio} {profile.persona}" + # 处理换行符(CSV中用空格替代) + user_char = user_char.replace('\n', ' ').replace('\r', ' ') + + # description: 简短简介,用于外部显示 + description = profile.bio.replace('\n', ' ').replace('\r', ' ') + row = [ - profile.user_id, - profile.user_name, - profile.name, - bio, - profile.friend_count, - profile.follower_count, - profile.statuses_count, - profile.created_at + idx, # user_id: 从0开始的顺序ID + profile.name, # name: 真实姓名 + profile.user_name, # username: 用户名 + user_char, # user_char: 完整人设(内部LLM使用) + description # description: 简短简介(外部显示) ] writer.writerow(row) - logger.info(f"已保存 {len(profiles)} 个Twitter Profile到 {file_path} (CSV格式)") + logger.info(f"已保存 {len(profiles)} 个Twitter Profile到 {file_path} (OASIS CSV格式)") def _save_reddit_json(self, profiles: List[OasisAgentProfile], file_path: str): """ diff --git a/backend/app/services/simulation_config_generator.py b/backend/app/services/simulation_config_generator.py index b75ee24..a75f00e 100644 --- a/backend/app/services/simulation_config_generator.py +++ b/backend/app/services/simulation_config_generator.py @@ -2,10 +2,17 @@ 模拟配置智能生成器 使用LLM根据模拟需求、文档内容、图谱信息自动生成细致的模拟参数 实现全程自动化,无需人工设置参数 + +采用分步生成策略,避免一次性生成过长内容导致失败: +1. 生成时间配置 +2. 生成事件配置 +3. 分批生成Agent配置 +4. 生成平台配置 """ import json -from typing import Dict, Any, List, Optional +import math +from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass, field, asdict from datetime import datetime @@ -17,6 +24,28 @@ from .zep_entity_reader import EntityNode, ZepEntityReader logger = get_logger('mirofish.simulation_config') +# 中国作息时间配置(北京时间) +CHINA_TIMEZONE_CONFIG = { + # 深夜时段(几乎无人活动) + "dead_hours": [0, 1, 2, 3, 4, 5], + # 早间时段(逐渐醒来) + "morning_hours": [6, 7, 8], + # 工作时段 + "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18], + # 晚间高峰(最活跃) + "peak_hours": [19, 20, 21, 22], + # 夜间时段(活跃度下降) + "night_hours": [23], + # 活跃度系数 + "activity_multipliers": { + "dead": 0.05, # 凌晨几乎无人 + "morning": 0.4, # 早间逐渐活跃 + "work": 0.7, # 工作时段中等 + "peak": 1.5, # 晚间高峰 + "night": 0.5 # 深夜下降 + } +} + @dataclass class AgentActivityConfig: @@ -52,7 +81,7 @@ class AgentActivityConfig: @dataclass class TimeSimulationConfig: - """时间模拟配置""" + """时间模拟配置(基于中国人作息习惯)""" # 模拟总时长(模拟小时数) total_simulation_hours: int = 72 # 默认模拟72小时(3天) @@ -63,13 +92,21 @@ class TimeSimulationConfig: agents_per_hour_min: int = 5 agents_per_hour_max: int = 20 - # 高峰时段(活跃度提升) - peak_hours: List[int] = field(default_factory=lambda: [9, 10, 11, 14, 15, 20, 21, 22]) + # 高峰时段(晚间19-22点,中国人最活跃的时间) + peak_hours: List[int] = field(default_factory=lambda: [19, 20, 21, 22]) peak_activity_multiplier: float = 1.5 - # 低谷时段(活跃度降低) - off_peak_hours: List[int] = field(default_factory=lambda: [0, 1, 2, 3, 4, 5, 6]) - off_peak_activity_multiplier: float = 0.3 + # 低谷时段(凌晨0-5点,几乎无人活动) + off_peak_hours: List[int] = field(default_factory=lambda: [0, 1, 2, 3, 4, 5]) + off_peak_activity_multiplier: float = 0.05 # 凌晨活跃度极低 + + # 早间时段 + morning_hours: List[int] = field(default_factory=lambda: [6, 7, 8]) + morning_activity_multiplier: float = 0.4 + + # 工作时段 + work_hours: List[int] = field(default_factory=lambda: [9, 10, 11, 12, 13, 14, 15, 16, 17, 18]) + work_activity_multiplier: float = 0.7 @dataclass @@ -137,12 +174,13 @@ class SimulationParameters: def to_dict(self) -> Dict[str, Any]: """转换为字典""" + time_dict = asdict(self.time_config) return { "simulation_id": self.simulation_id, "project_id": self.project_id, "graph_id": self.graph_id, "simulation_requirement": self.simulation_requirement, - "time_config": asdict(self.time_config), + "time_config": time_dict, "agent_configs": [asdict(a) for a in self.agent_configs], "event_config": asdict(self.event_config), "twitter_config": asdict(self.twitter_config) if self.twitter_config else None, @@ -164,10 +202,17 @@ class SimulationConfigGenerator: 使用LLM分析模拟需求、文档内容、图谱实体信息, 自动生成最佳的模拟参数配置 + + 采用分步生成策略: + 1. 生成时间配置和事件配置(轻量级) + 2. 分批生成Agent配置(每批10-15个) + 3. 生成平台配置 """ # 上下文最大字符数 MAX_CONTEXT_LENGTH = 50000 + # 每批生成的Agent数量 + AGENTS_PER_BATCH = 15 def __init__( self, @@ -197,9 +242,10 @@ class SimulationConfigGenerator: entities: List[EntityNode], enable_twitter: bool = True, enable_reddit: bool = True, + progress_callback: Optional[Callable[[int, int, str], None]] = None, ) -> SimulationParameters: """ - 智能生成完整的模拟配置 + 智能生成完整的模拟配置(分步生成) Args: simulation_id: 模拟ID @@ -210,37 +256,107 @@ class SimulationConfigGenerator: entities: 过滤后的实体列表 enable_twitter: 是否启用Twitter enable_reddit: 是否启用Reddit + progress_callback: 进度回调函数(current_step, total_steps, message) Returns: SimulationParameters: 完整的模拟参数 """ - logger.info(f"开始智能生成模拟配置: simulation_id={simulation_id}") + logger.info(f"开始智能生成模拟配置: simulation_id={simulation_id}, 实体数={len(entities)}") - # 1. 构建上下文信息(截断到50000字符) + # 计算总步骤数 + num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH) + total_steps = 3 + num_batches # 时间配置 + 事件配置 + N批Agent + 平台配置 + current_step = 0 + + def report_progress(step: int, message: str): + nonlocal current_step + current_step = step + if progress_callback: + progress_callback(step, total_steps, message) + logger.info(f"[{step}/{total_steps}] {message}") + + # 1. 构建基础上下文信息 context = self._build_context( simulation_requirement=simulation_requirement, document_text=document_text, entities=entities ) - # 2. 调用LLM生成配置 - llm_result = self._generate_config_with_llm( - context=context, - entities=entities, - enable_twitter=enable_twitter, - enable_reddit=enable_reddit - ) + reasoning_parts = [] - # 3. 构建SimulationParameters对象 - params = self._build_parameters( + # ========== 步骤1: 生成时间配置 ========== + report_progress(1, "生成时间配置...") + time_config_result = self._generate_time_config(context, len(entities)) + time_config = self._parse_time_config(time_config_result) + reasoning_parts.append(f"时间配置: {time_config_result.get('reasoning', '成功')}") + + # ========== 步骤2: 生成事件配置 ========== + report_progress(2, "生成事件配置和热点话题...") + event_config_result = self._generate_event_config(context, simulation_requirement) + event_config = self._parse_event_config(event_config_result) + reasoning_parts.append(f"事件配置: {event_config_result.get('reasoning', '成功')}") + + # ========== 步骤3-N: 分批生成Agent配置 ========== + all_agent_configs = [] + for batch_idx in range(num_batches): + start_idx = batch_idx * self.AGENTS_PER_BATCH + end_idx = min(start_idx + self.AGENTS_PER_BATCH, len(entities)) + batch_entities = entities[start_idx:end_idx] + + report_progress( + 3 + batch_idx, + f"生成Agent配置 ({start_idx + 1}-{end_idx}/{len(entities)})..." + ) + + batch_configs = self._generate_agent_configs_batch( + context=context, + entities=batch_entities, + start_idx=start_idx, + simulation_requirement=simulation_requirement + ) + all_agent_configs.extend(batch_configs) + + reasoning_parts.append(f"Agent配置: 成功生成 {len(all_agent_configs)} 个") + + # ========== 最后一步: 生成平台配置 ========== + report_progress(total_steps, "生成平台配置...") + twitter_config = None + reddit_config = None + + if enable_twitter: + twitter_config = PlatformConfig( + platform="twitter", + recency_weight=0.4, + popularity_weight=0.3, + relevance_weight=0.3, + viral_threshold=10, + echo_chamber_strength=0.5 + ) + + if enable_reddit: + reddit_config = PlatformConfig( + platform="reddit", + recency_weight=0.3, + popularity_weight=0.4, + relevance_weight=0.3, + viral_threshold=15, + echo_chamber_strength=0.6 + ) + + # 构建最终参数 + params = SimulationParameters( simulation_id=simulation_id, project_id=project_id, graph_id=graph_id, simulation_requirement=simulation_requirement, - entities=entities, - llm_result=llm_result, - enable_twitter=enable_twitter, - enable_reddit=enable_reddit + time_config=time_config, + agent_configs=all_agent_configs, + event_config=event_config, + twitter_config=twitter_config, + reddit_config=reddit_config, + llm_model=self.model_name, + llm_base_url=self.base_url, + generation_reasoning=" | ".join(reasoning_parts) ) logger.info(f"模拟配置生成完成: {len(params.agent_configs)} 个Agent配置") @@ -297,288 +413,397 @@ class SimulationConfigGenerator: return "\n".join(lines) - def _generate_config_with_llm( + def _call_llm_with_retry(self, prompt: str, system_prompt: str) -> Dict[str, Any]: + """带重试的LLM调用,包含JSON修复逻辑""" + import re + + max_attempts = 3 + last_error = None + + for attempt in range(max_attempts): + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ], + response_format={"type": "json_object"}, + temperature=0.7 - (attempt * 0.1) # 每次重试降低温度 + # 不设置max_tokens,让LLM自由发挥 + ) + + content = response.choices[0].message.content + finish_reason = response.choices[0].finish_reason + + # 检查是否被截断 + if finish_reason == 'length': + logger.warning(f"LLM输出被截断 (attempt {attempt+1})") + content = self._fix_truncated_json(content) + + # 尝试解析JSON + try: + return json.loads(content) + except json.JSONDecodeError as e: + logger.warning(f"JSON解析失败 (attempt {attempt+1}): {str(e)[:80]}") + + # 尝试修复JSON + fixed = self._try_fix_config_json(content) + if fixed: + return fixed + + last_error = e + + except Exception as e: + logger.warning(f"LLM调用失败 (attempt {attempt+1}): {str(e)[:80]}") + last_error = e + import time + time.sleep(2 * (attempt + 1)) + + raise last_error or Exception("LLM调用失败") + + def _fix_truncated_json(self, content: str) -> str: + """修复被截断的JSON""" + content = content.strip() + + # 计算未闭合的括号 + open_braces = content.count('{') - content.count('}') + open_brackets = content.count('[') - content.count(']') + + # 检查是否有未闭合的字符串 + if content and content[-1] not in '",}]': + content += '"' + + # 闭合括号 + content += ']' * open_brackets + content += '}' * open_braces + + return content + + def _try_fix_config_json(self, content: str) -> Optional[Dict[str, Any]]: + """尝试修复配置JSON""" + import re + + # 修复被截断的情况 + content = self._fix_truncated_json(content) + + # 提取JSON部分 + json_match = re.search(r'\{[\s\S]*\}', content) + if json_match: + json_str = json_match.group() + + # 移除字符串中的换行符 + def fix_string(match): + s = match.group(0) + s = s.replace('\n', ' ').replace('\r', ' ') + s = re.sub(r'\s+', ' ', s) + return s + + json_str = re.sub(r'"[^"\\]*(?:\\.[^"\\]*)*"', fix_string, json_str) + + try: + return json.loads(json_str) + except: + # 尝试移除所有控制字符 + json_str = re.sub(r'[\x00-\x1f\x7f-\x9f]', ' ', json_str) + json_str = re.sub(r'\s+', ' ', json_str) + try: + return json.loads(json_str) + except: + pass + + return None + + def _generate_time_config(self, context: str, num_entities: int) -> Dict[str, Any]: + """生成时间配置""" + prompt = f"""基于以下模拟需求,生成时间模拟配置。 + +{context[:5000]} + +## 任务 +请生成时间配置JSON,注意: +- 用户群体为中国人,需符合北京时间作息习惯 +- 凌晨0-5点几乎无人活动(活跃度系数0.05) +- 早上6-8点逐渐活跃(活跃度系数0.4) +- 工作时间9-18点中等活跃(活跃度系数0.7) +- 晚间19-22点是高峰期(活跃度系数1.5) +- 23点后活跃度下降(活跃度系数0.5) + +当前实体数量: {num_entities} + +返回JSON格式(不要markdown): +{{ + "total_simulation_hours": <72-168,根据事件性质决定>, + "minutes_per_round": <15-60>, + "agents_per_hour_min": <每小时最少激活Agent数>, + "agents_per_hour_max": <每小时最多激活Agent数>, + "peak_hours": [19, 20, 21, 22], + "off_peak_hours": [0, 1, 2, 3, 4, 5], + "morning_hours": [6, 7, 8], + "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18], + "reasoning": "<简要说明>" +}}""" + + system_prompt = "你是社交媒体模拟专家。返回纯JSON格式,时间配置需符合中国人作息习惯。" + + try: + return self._call_llm_with_retry(prompt, system_prompt) + except Exception as e: + logger.warning(f"时间配置LLM生成失败: {e}, 使用默认配置") + return self._get_default_time_config(num_entities) + + def _get_default_time_config(self, num_entities: int) -> Dict[str, Any]: + """获取默认时间配置(中国人作息)""" + return { + "total_simulation_hours": 72, + "minutes_per_round": 30, + "agents_per_hour_min": max(1, num_entities // 15), + "agents_per_hour_max": max(5, num_entities // 5), + "peak_hours": [19, 20, 21, 22], + "off_peak_hours": [0, 1, 2, 3, 4, 5], + "morning_hours": [6, 7, 8], + "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18], + "reasoning": "使用默认中国人作息配置" + } + + def _parse_time_config(self, result: Dict[str, Any]) -> TimeSimulationConfig: + """解析时间配置结果""" + return TimeSimulationConfig( + total_simulation_hours=result.get("total_simulation_hours", 72), + minutes_per_round=result.get("minutes_per_round", 30), + agents_per_hour_min=result.get("agents_per_hour_min", 5), + agents_per_hour_max=result.get("agents_per_hour_max", 20), + peak_hours=result.get("peak_hours", [19, 20, 21, 22]), + off_peak_hours=result.get("off_peak_hours", [0, 1, 2, 3, 4, 5]), + off_peak_activity_multiplier=0.05, # 凌晨几乎无人 + morning_hours=result.get("morning_hours", [6, 7, 8]), + morning_activity_multiplier=0.4, + work_hours=result.get("work_hours", list(range(9, 19))), + work_activity_multiplier=0.7, + peak_activity_multiplier=1.5 + ) + + def _generate_event_config(self, context: str, simulation_requirement: str) -> Dict[str, Any]: + """生成事件配置""" + prompt = f"""基于以下模拟需求,生成事件配置。 + +模拟需求: {simulation_requirement} + +{context[:3000]} + +## 任务 +请生成事件配置JSON: +- 提取热点话题关键词 +- 描述舆论发展方向 +- 设计初始帖子内容 + +返回JSON格式(不要markdown): +{{ + "hot_topics": ["关键词1", "关键词2", ...], + "narrative_direction": "<舆论发展方向描述>", + "initial_posts": [ + {{"content": "帖子内容", "poster_type": "MediaOutlet"}}, + ... + ], + "reasoning": "<简要说明>" +}}""" + + system_prompt = "你是舆论分析专家。返回纯JSON格式。" + + try: + return self._call_llm_with_retry(prompt, system_prompt) + except Exception as e: + logger.warning(f"事件配置LLM生成失败: {e}, 使用默认配置") + return { + "hot_topics": [], + "narrative_direction": "", + "initial_posts": [], + "reasoning": "使用默认配置" + } + + def _parse_event_config(self, result: Dict[str, Any]) -> EventConfig: + """解析事件配置结果""" + return EventConfig( + initial_posts=result.get("initial_posts", []), + scheduled_events=[], + hot_topics=result.get("hot_topics", []), + narrative_direction=result.get("narrative_direction", "") + ) + + def _generate_agent_configs_batch( self, context: str, entities: List[EntityNode], - enable_twitter: bool, - enable_reddit: bool - ) -> Dict[str, Any]: - """调用LLM生成配置""" + start_idx: int, + simulation_requirement: str + ) -> List[AgentActivityConfig]: + """分批生成Agent配置""" - # 构建实体列表用于Agent配置 + # 构建实体信息 entity_list = [] for i, e in enumerate(entities): entity_list.append({ - "agent_id": i, - "entity_uuid": e.uuid, + "agent_id": start_idx + i, "entity_name": e.name, "entity_type": e.get_entity_type() or "Unknown", - "summary": e.summary[:200] if e.summary else "" + "summary": e.summary[:150] if e.summary else "" }) - prompt = f"""你是一个社交媒体舆论模拟专家。请根据以下信息,生成详细的模拟参数配置。 + prompt = f"""基于以下信息,为每个实体生成社交媒体活动配置。 -{context} +模拟需求: {simulation_requirement} -## 实体列表(需要为每个实体生成活动配置) +## 实体列表 ```json {json.dumps(entity_list, ensure_ascii=False, indent=2)} ``` ## 任务 -请生成一个JSON配置,包含以下部分: +为每个实体生成活动配置,注意: +- **时间符合中国人作息**:凌晨0-5点几乎不活动,晚间19-22点最活跃 +- **官方机构**(University/GovernmentAgency):活跃度低(0.1-0.3),工作时间(9-17)活动,响应慢(60-240分钟),影响力高(2.5-3.0) +- **媒体**(MediaOutlet):活跃度中(0.4-0.6),全天活动(8-23),响应快(5-30分钟),影响力高(2.0-2.5) +- **个人**(Student/Person/Alumni):活跃度高(0.6-0.9),主要晚间活动(18-23),响应快(1-15分钟),影响力低(0.8-1.2) +- **公众人物/专家**:活跃度中(0.4-0.6),影响力中高(1.5-2.0) -1. **time_config** - 时间模拟配置 - - total_simulation_hours: 模拟总时长(小时),根据事件性质决定(短期热点24-72小时,长期舆论168-336小时) - - minutes_per_round: 每轮代表的时间(分钟),建议15-60 - - agents_per_hour_min/max: 每小时激活的Agent数量范围 - - peak_hours: 高峰时段列表(0-23) - - off_peak_hours: 低谷时段列表 - -2. **agent_configs** - 每个Agent的活动配置(必须为每个实体生成) - 对于每个agent_id,设置: - - activity_level: 活跃度(0.0-1.0),官方机构通常0.1-0.3,媒体0.3-0.5,个人0.5-0.9 - - posts_per_hour: 每小时发帖频率,官方机构0.05-0.2,媒体0.5-2,个人0.1-1 - - comments_per_hour: 每小时评论频率 - - active_hours: 活跃时间段列表,官方通常工作时间,个人更分散 - - response_delay_min/max: 响应延迟(模拟分钟),官方较慢(30-180),个人较快(1-30) - - sentiment_bias: 情感倾向(-1到1),根据实体立场设置 - - stance: 立场(supportive/opposing/neutral/observer) - - influence_weight: 影响力权重,知名人物和媒体较高 - -3. **event_config** - 事件配置 - - initial_posts: 初始帖子列表,包含content和poster_agent_id - - hot_topics: 热点话题关键词列表 - - narrative_direction: 舆论发展方向描述 - -4. **platform_configs** - 平台配置(如果启用) - - viral_threshold: 病毒传播阈值 - - echo_chamber_strength: 回声室效应强度(0-1) - -5. **reasoning** - 你的推理说明,解释为什么这样设置参数 - -## 重要原则 -- 官方机构(University、GovernmentAgency)发言频率低但影响力大 -- 媒体(MediaOutlet)发言频率中等,传播速度快 -- 个人(Student、PublicFigure)发言频率高但影响力分散 -- 根据模拟需求判断各实体的立场和情感倾向 -- 时间配置要符合真实社交媒体的使用规律 - -请返回JSON格式,不要包含markdown代码块标记。""" +返回JSON格式(不要markdown): +{{ + "agent_configs": [ + {{ + "agent_id": <必须与输入一致>, + "activity_level": <0.0-1.0>, + "posts_per_hour": <发帖频率>, + "comments_per_hour": <评论频率>, + "active_hours": [<活跃小时列表,考虑中国人作息>], + "response_delay_min": <最小响应延迟分钟>, + "response_delay_max": <最大响应延迟分钟>, + "sentiment_bias": <-1.0到1.0>, + "stance": "", + "influence_weight": <影响力权重> + }}, + ... + ] +}}""" + system_prompt = "你是社交媒体行为分析专家。返回纯JSON,配置需符合中国人作息习惯。" + try: - # 使用重试机制调用LLM API - from ..utils.retry import RetryableAPIClient - - retry_client = RetryableAPIClient(max_retries=3, initial_delay=2.0, max_delay=60.0) - - def call_llm(): - return self.client.chat.completions.create( - model=self.model_name, - messages=[ - { - "role": "system", - "content": "你是社交媒体舆论模拟专家,擅长设计真实的模拟参数。返回纯JSON格式,不要markdown。" - }, - {"role": "user", "content": prompt} - ], - response_format={"type": "json_object"}, - temperature=0.7, - max_tokens=8000 - ) - - response = retry_client.call_with_retry(call_llm) - result = json.loads(response.choices[0].message.content) - logger.info(f"LLM配置生成成功") - return result - + result = self._call_llm_with_retry(prompt, system_prompt) + llm_configs = {cfg["agent_id"]: cfg for cfg in result.get("agent_configs", [])} except Exception as e: - logger.error(f"LLM配置生成失败(已重试): {str(e)}") - # 返回默认配置 - return self._generate_default_config(entities) - - def _generate_default_config(self, entities: List[EntityNode]) -> Dict[str, Any]: - """生成默认配置(LLM失败时的fallback)""" - agent_configs = [] - - for i, e in enumerate(entities): - entity_type = (e.get_entity_type() or "Unknown").lower() - - # 根据实体类型设置默认参数 - if entity_type in ["university", "governmentagency", "ngo"]: - config = { - "agent_id": i, - "activity_level": 0.2, - "posts_per_hour": 0.1, - "comments_per_hour": 0.05, - "active_hours": list(range(9, 18)), - "response_delay_min": 60, - "response_delay_max": 240, - "sentiment_bias": 0.0, - "stance": "neutral", - "influence_weight": 3.0 - } - elif entity_type in ["mediaoutlet"]: - config = { - "agent_id": i, - "activity_level": 0.6, - "posts_per_hour": 1.0, - "comments_per_hour": 0.5, - "active_hours": list(range(6, 24)), - "response_delay_min": 5, - "response_delay_max": 30, - "sentiment_bias": 0.0, - "stance": "observer", - "influence_weight": 2.5 - } - elif entity_type in ["publicfigure", "expert"]: - config = { - "agent_id": i, - "activity_level": 0.5, - "posts_per_hour": 0.3, - "comments_per_hour": 0.5, - "active_hours": list(range(8, 23)), - "response_delay_min": 10, - "response_delay_max": 60, - "sentiment_bias": 0.0, - "stance": "neutral", - "influence_weight": 2.0 - } - else: # Student, Person, etc. - config = { - "agent_id": i, - "activity_level": 0.7, - "posts_per_hour": 0.5, - "comments_per_hour": 1.0, - "active_hours": list(range(7, 24)), - "response_delay_min": 1, - "response_delay_max": 20, - "sentiment_bias": 0.0, - "stance": "neutral", - "influence_weight": 1.0 - } - - agent_configs.append(config) - - return { - "time_config": { - "total_simulation_hours": 72, - "minutes_per_round": 30, - "agents_per_hour_min": max(1, len(entities) // 10), - "agents_per_hour_max": max(5, len(entities) // 3), - "peak_hours": [9, 10, 11, 14, 15, 20, 21, 22], - "off_peak_hours": [0, 1, 2, 3, 4, 5] - }, - "agent_configs": agent_configs, - "event_config": { - "initial_posts": [], - "hot_topics": [], - "narrative_direction": "" - }, - "reasoning": "使用默认配置(LLM生成失败)" - } - - def _build_parameters( - self, - simulation_id: str, - project_id: str, - graph_id: str, - simulation_requirement: str, - entities: List[EntityNode], - llm_result: Dict[str, Any], - enable_twitter: bool, - enable_reddit: bool - ) -> SimulationParameters: - """根据LLM结果构建SimulationParameters对象""" - - # 时间配置 - time_cfg = llm_result.get("time_config", {}) - time_config = TimeSimulationConfig( - total_simulation_hours=time_cfg.get("total_simulation_hours", 72), - minutes_per_round=time_cfg.get("minutes_per_round", 30), - agents_per_hour_min=time_cfg.get("agents_per_hour_min", 5), - agents_per_hour_max=time_cfg.get("agents_per_hour_max", 20), - peak_hours=time_cfg.get("peak_hours", [9, 10, 11, 14, 15, 20, 21, 22]), - off_peak_hours=time_cfg.get("off_peak_hours", [0, 1, 2, 3, 4, 5]), - peak_activity_multiplier=time_cfg.get("peak_activity_multiplier", 1.5), - off_peak_activity_multiplier=time_cfg.get("off_peak_activity_multiplier", 0.3) - ) - - # Agent配置 - agent_configs = [] - llm_agent_configs = {cfg["agent_id"]: cfg for cfg in llm_result.get("agent_configs", [])} + logger.warning(f"Agent配置批次LLM生成失败: {e}, 使用规则生成") + llm_configs = {} + # 构建AgentActivityConfig对象 + configs = [] for i, entity in enumerate(entities): - cfg = llm_agent_configs.get(i, {}) + agent_id = start_idx + i + cfg = llm_configs.get(agent_id, {}) - agent_config = AgentActivityConfig( - agent_id=i, + # 如果LLM没有生成,使用规则生成 + if not cfg: + cfg = self._generate_agent_config_by_rule(entity) + + config = AgentActivityConfig( + agent_id=agent_id, entity_uuid=entity.uuid, entity_name=entity.name, entity_type=entity.get_entity_type() or "Unknown", activity_level=cfg.get("activity_level", 0.5), posts_per_hour=cfg.get("posts_per_hour", 0.5), comments_per_hour=cfg.get("comments_per_hour", 1.0), - active_hours=cfg.get("active_hours", list(range(8, 23))), + active_hours=cfg.get("active_hours", list(range(9, 23))), response_delay_min=cfg.get("response_delay_min", 5), response_delay_max=cfg.get("response_delay_max", 60), sentiment_bias=cfg.get("sentiment_bias", 0.0), stance=cfg.get("stance", "neutral"), influence_weight=cfg.get("influence_weight", 1.0) ) - agent_configs.append(agent_config) + configs.append(config) - # 事件配置 - event_cfg = llm_result.get("event_config", {}) - event_config = EventConfig( - initial_posts=event_cfg.get("initial_posts", []), - scheduled_events=event_cfg.get("scheduled_events", []), - hot_topics=event_cfg.get("hot_topics", []), - narrative_direction=event_cfg.get("narrative_direction", "") - ) + return configs + + def _generate_agent_config_by_rule(self, entity: EntityNode) -> Dict[str, Any]: + """基于规则生成单个Agent配置(中国人作息)""" + entity_type = (entity.get_entity_type() or "Unknown").lower() - # 平台配置 - twitter_config = None - reddit_config = None - - platform_cfgs = llm_result.get("platform_configs", {}) - - if enable_twitter: - tw_cfg = platform_cfgs.get("twitter", {}) - twitter_config = PlatformConfig( - platform="twitter", - recency_weight=tw_cfg.get("recency_weight", 0.4), - popularity_weight=tw_cfg.get("popularity_weight", 0.3), - relevance_weight=tw_cfg.get("relevance_weight", 0.3), - viral_threshold=tw_cfg.get("viral_threshold", 10), - echo_chamber_strength=tw_cfg.get("echo_chamber_strength", 0.5) - ) - - if enable_reddit: - rd_cfg = platform_cfgs.get("reddit", {}) - reddit_config = PlatformConfig( - platform="reddit", - recency_weight=rd_cfg.get("recency_weight", 0.3), - popularity_weight=rd_cfg.get("popularity_weight", 0.4), - relevance_weight=rd_cfg.get("relevance_weight", 0.3), - viral_threshold=rd_cfg.get("viral_threshold", 15), - echo_chamber_strength=rd_cfg.get("echo_chamber_strength", 0.6) - ) - - return SimulationParameters( - simulation_id=simulation_id, - project_id=project_id, - graph_id=graph_id, - simulation_requirement=simulation_requirement, - time_config=time_config, - agent_configs=agent_configs, - event_config=event_config, - twitter_config=twitter_config, - reddit_config=reddit_config, - llm_model=self.model_name, - llm_base_url=self.base_url, - generation_reasoning=llm_result.get("reasoning", "") - ) - + if entity_type in ["university", "governmentagency", "ngo"]: + # 官方机构:工作时间活动,低频率,高影响力 + return { + "activity_level": 0.2, + "posts_per_hour": 0.1, + "comments_per_hour": 0.05, + "active_hours": list(range(9, 18)), # 9:00-17:59 + "response_delay_min": 60, + "response_delay_max": 240, + "sentiment_bias": 0.0, + "stance": "neutral", + "influence_weight": 3.0 + } + elif entity_type in ["mediaoutlet"]: + # 媒体:全天活动,中等频率,高影响力 + return { + "activity_level": 0.5, + "posts_per_hour": 0.8, + "comments_per_hour": 0.3, + "active_hours": list(range(7, 24)), # 7:00-23:59 + "response_delay_min": 5, + "response_delay_max": 30, + "sentiment_bias": 0.0, + "stance": "observer", + "influence_weight": 2.5 + } + elif entity_type in ["professor", "expert", "official"]: + # 专家/教授:工作+晚间活动,中等频率 + return { + "activity_level": 0.4, + "posts_per_hour": 0.3, + "comments_per_hour": 0.5, + "active_hours": list(range(8, 22)), # 8:00-21:59 + "response_delay_min": 15, + "response_delay_max": 90, + "sentiment_bias": 0.0, + "stance": "neutral", + "influence_weight": 2.0 + } + elif entity_type in ["student"]: + # 学生:晚间为主,高频率 + return { + "activity_level": 0.8, + "posts_per_hour": 0.6, + "comments_per_hour": 1.5, + "active_hours": [8, 9, 10, 11, 12, 13, 18, 19, 20, 21, 22, 23], # 上午+晚间 + "response_delay_min": 1, + "response_delay_max": 15, + "sentiment_bias": 0.0, + "stance": "neutral", + "influence_weight": 0.8 + } + elif entity_type in ["alumni"]: + # 校友:晚间为主 + return { + "activity_level": 0.6, + "posts_per_hour": 0.4, + "comments_per_hour": 0.8, + "active_hours": [12, 13, 19, 20, 21, 22, 23], # 午休+晚间 + "response_delay_min": 5, + "response_delay_max": 30, + "sentiment_bias": 0.0, + "stance": "neutral", + "influence_weight": 1.0 + } + else: + # 普通人:晚间高峰 + return { + "activity_level": 0.7, + "posts_per_hour": 0.5, + "comments_per_hour": 1.2, + "active_hours": [9, 10, 11, 12, 13, 18, 19, 20, 21, 22, 23], # 白天+晚间 + "response_delay_min": 2, + "response_delay_max": 20, + "sentiment_bias": 0.0, + "stance": "neutral", + "influence_weight": 1.0 + } + diff --git a/backend/app/services/simulation_manager.py b/backend/app/services/simulation_manager.py index fbb42a6..dcfaa96 100644 --- a/backend/app/services/simulation_manager.py +++ b/backend/app/services/simulation_manager.py @@ -238,14 +238,15 @@ class SimulationManager: document_text: str, defined_entity_types: Optional[List[str]] = None, use_llm_for_profiles: bool = True, - progress_callback: Optional[callable] = None + progress_callback: Optional[callable] = None, + parallel_profile_count: int = 3 ) -> SimulationState: """ 准备模拟环境(全程自动化) 步骤: 1. 从Zep图谱读取并过滤实体 - 2. 为每个实体生成OASIS Agent Profile(可选LLM增强) + 2. 为每个实体生成OASIS Agent Profile(可选LLM增强,支持并行) 3. 使用LLM智能生成模拟配置参数(时间、活跃度、发言频率等) 4. 保存配置文件和Profile文件 5. 复制预设脚本到模拟目录 @@ -257,6 +258,7 @@ class SimulationManager: defined_entity_types: 预定义的实体类型(可选) use_llm_for_profiles: 是否使用LLM生成详细人设 progress_callback: 进度回调函数 (stage, progress, message) + parallel_profile_count: 并行生成人设的数量,默认3 Returns: SimulationState @@ -314,7 +316,8 @@ class SimulationManager: total=total_entities ) - generator = OasisProfileGenerator() + # 传入graph_id以启用Zep检索功能,获取更丰富的上下文 + generator = OasisProfileGenerator(graph_id=state.graph_id) def profile_progress(current, total, msg): if progress_callback: @@ -330,7 +333,9 @@ class SimulationManager: profiles = generator.generate_profiles_from_entities( entities=filtered.entities, use_llm=use_llm_for_profiles, - progress_callback=profile_progress + progress_callback=profile_progress, + graph_id=state.graph_id, # 传入graph_id用于Zep检索 + parallel_count=parallel_profile_count # 并行生成数量 ) state.profiles_count = len(profiles)