diff --git a/backend/README.md b/backend/README.md index 72ece0c..44d437e 100644 --- a/backend/README.md +++ b/backend/README.md @@ -28,6 +28,8 @@ 4. **模拟配置智能生成**: 使用 LLM 根据需求自动生成模拟参数(时间、活跃度、事件等) 5. **双平台模拟**: 支持 Twitter 和 Reddit 双平台并行舆论模拟(基于 OASIS 框架) 6. **图谱记忆动态更新**: 可选功能,将模拟中Agent的活动实时更新到Zep图谱,让图谱"记住"模拟过程 +7. **智能报告生成**: 使用 LangChain + Zep 实现 ReACT 模式的模拟分析报告自动生成 +8. **Report Agent对话**: 报告生成后可与Report Agent对话,自主调用检索工具回答问题 --- @@ -42,10 +44,11 @@ │ │ API层 │ │ 服务层 │ │ 模型层 │ │ │ │ - graph.py │→ │ - 本体生成 │→ │ - Project │ │ │ │ - simulation │ │ - 图谱构建 │ │ - Task │ │ -│ └────────────────┘ │ - 实体读取 │ └─────────────────┘ │ -│ │ - 人设生成 │ │ +│ │ - report.py │ │ - 实体读取 │ │ - Report │ │ +│ └────────────────┘ │ - 人设生成 │ └─────────────────┘ │ │ │ - 配置生成 │ │ │ │ - 模拟运行 │ │ +│ │ - 报告生成 │ │ │ └──────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ 外部服务集成 │ @@ -78,6 +81,16 @@ 模拟完成 → 环境进入等待模式 → 发送Interview命令 → Agent回答 → 获取结果 → (可选)关闭环境 ``` +5. **报告生成流程**: + ``` + 模拟完成 → 调用Report API → ReACT规划大纲 → 逐章节生成(多次工具调用) → 生成Markdown报告 → 解锁Interview功能 + ``` + +6. **Report Agent对话流程**: + ``` + 用户提问 → Agent分析 → 调用Zep检索工具 → 整合信息 → 返回回答 + ``` + --- ## 技术栈 @@ -89,6 +102,7 @@ ### AI & 知识图谱 - **Zep Cloud SDK 2.0+**: 知识图谱构建与管理 - **OpenAI SDK 1.0+**: LLM 调用(支持 OpenAI 兼容接口) +- **LangChain 0.2+**: Report Agent框架(ReACT模式) - **OASIS-AI**: 社交媒体模拟框架 - **CAMEL-AI**: Agent 行为模拟 @@ -117,6 +131,10 @@ backend/ │ │ ├── project.json # 项目元数据 │ │ ├── files/ # 上传的文件 │ │ └── extracted_text.txt # 提取的文本 +│ ├── reports/ # 报告数据 +│ │ └── report_xxx/ +│ │ ├── report_xxx.json # 报告元数据 +│ │ └── report_xxx.md # Markdown报告 │ └── simulations/ # 模拟数据 │ └── sim_xxx/ │ ├── state.json # 模拟状态 @@ -142,7 +160,8 @@ backend/ ├── api/ # API路由 │ ├── __init__.py │ ├── graph.py # 图谱相关接口 - │ └── simulation.py # 模拟相关接口 + │ ├── simulation.py # 模拟相关接口 + │ └── report.py # 报告相关接口 ├── models/ # 数据模型 │ ├── __init__.py │ ├── project.py # 项目模型 @@ -153,12 +172,14 @@ backend/ │ ├── graph_builder.py # 图谱构建 │ ├── text_processor.py # 文本处理 │ ├── zep_entity_reader.py # 实体读取 + │ ├── zep_tools.py # Zep检索工具服务 │ ├── oasis_profile_generator.py # 人设生成 │ ├── simulation_config_generator.py # 配置生成 │ ├── simulation_manager.py # 模拟管理 │ ├── simulation_runner.py # 模拟运行 │ ├── simulation_ipc.py # 模拟IPC通信(Interview功能) - │ └── zep_graph_memory_updater.py # 图谱记忆动态更新 + │ ├── zep_graph_memory_updater.py # 图谱记忆动态更新 + │ └── report_agent.py # 报告生成Agent(ReACT模式) └── utils/ # 工具类 ├── __init__.py ├── file_parser.py # 文件解析 @@ -240,6 +261,55 @@ backend/ - `SimulationIPCClient`: IPC客户端(Flask端使用) - `SimulationIPCServer`: IPC服务器(模拟脚本端使用) +### 5. Report Agent模块(报告生成) + +**功能**: 模拟完成后自动生成分析报告,支持与用户对话 + +**特点**: +- **ReACT模式**: Reasoning + Acting,多轮思考与工具调用 +- **大纲规划**: LLM分析模拟需求,自动规划报告目录结构 +- **分段生成**: 逐章节生成,每章节可多次调用Zep检索工具 +- **Markdown输出**: 生成专业的Markdown格式报告 +- **对话功能**: 报告完成后可与Report Agent对话,自主调用工具回答问题 + +**工具(MCP封装)**: +- `search_graph`: 图谱语义搜索 +- `get_graph_statistics`: 获取图谱统计信息 +- `get_entity_summary`: 获取实体关系摘要 +- `get_simulation_context`: 获取模拟上下文 +- `get_entities_by_type`: 按类型获取实体 + +**核心服务**: +- `ZepToolsService`: Zep检索工具封装 +- `ReportAgent`: 报告生成Agent(ReACT模式) +- `ReportManager`: 报告持久化管理 + +**工作原理**: +``` +┌─────────────────────────────────────────────────────────────┐ +│ Report Agent (ReACT) │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ 1. 规划阶段 │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ LLM分析模拟需求 → 获取图谱上下文 → 生成报告大纲 │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ +│ 2. 生成阶段 (每章节) │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ Thought → Action → Observation → ... → Final │ │ +│ │ ↓ ↓ ↓ │ │ +│ │ 分析需求 调用工具 分析结果 生成内容 │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ +│ 3. 输出阶段 │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ 组装章节 → 生成Markdown → 保存JSON/MD文件 │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + **工作原理**: ``` Flask后端 模拟脚本 @@ -964,6 +1034,245 @@ Flask后端 模拟脚本 - `/stop`: 强制终止模拟进程 - `/close-env`: 优雅地关闭环境,让模拟进程正常退出 +--- + +### Report 报告接口 + +> **说明**: 报告生成完成后才能解锁Interview功能。Report Agent使用ReACT模式,可以在对话中自主调用Zep检索工具。 + +#### 1. 生成报告 + +**接口**: `POST /api/report/generate` + +**请求参数**: +```json +{ + "simulation_id": "sim_xxxx", + "force_regenerate": false +} +``` + +| 参数 | 类型 | 必填 | 默认值 | 说明 | +|------|------|------|--------|------| +| simulation_id | String | 是 | - | 模拟ID | +| force_regenerate | Boolean | 否 | false | 强制重新生成 | + +**返回示例**: +```json +{ + "success": true, + "data": { + "simulation_id": "sim_xxxx", + "task_id": "task_xxxx", + "status": "generating", + "message": "报告生成任务已启动", + "already_generated": false + } +} +``` + +**如果报告已存在**: +```json +{ + "success": true, + "data": { + "simulation_id": "sim_xxxx", + "report_id": "report_xxxx", + "status": "completed", + "message": "报告已存在", + "already_generated": true + } +} +``` + +--- + +#### 2. 查询生成进度 + +**接口**: `POST /api/report/generate/status` + +**请求参数**: +```json +{ + "task_id": "task_xxxx", + "simulation_id": "sim_xxxx" +} +``` + +**返回示例**: +```json +{ + "success": true, + "data": { + "task_id": "task_xxxx", + "status": "processing", + "progress": 45, + "message": "[generating] 正在生成章节: 关键发现 (3/5)" + } +} +``` + +--- + +#### 3. 获取报告 + +**接口**: `GET /api/report/{report_id}` + +**返回示例**: +```json +{ + "success": true, + "data": { + "report_id": "report_xxxx", + "simulation_id": "sim_xxxx", + "graph_id": "mirofish_xxxx", + "simulation_requirement": "模拟武汉大学撤销处分后的舆情走向", + "status": "completed", + "outline": { + "title": "武汉大学撤销处分事件舆情分析报告", + "summary": "基于模拟结果的全面舆情分析", + "sections": [ + {"title": "执行摘要", "content": "..."}, + {"title": "模拟背景", "content": "..."}, + {"title": "关键发现", "content": "..."}, + {"title": "舆情分析", "content": "..."}, + {"title": "建议与展望", "content": "..."} + ] + }, + "markdown_content": "# 武汉大学撤销处分事件舆情分析报告\n\n...", + "created_at": "2025-12-09T10:00:00", + "completed_at": "2025-12-09T10:05:00" + } +} +``` + +--- + +#### 4. 根据模拟ID获取报告 + +**接口**: `GET /api/report/by-simulation/{simulation_id}` + +**返回示例**: +```json +{ + "success": true, + "data": {...}, + "has_report": true +} +``` + +--- + +#### 5. 下载报告 + +**接口**: `GET /api/report/{report_id}/download` + +**返回**: Markdown文件下载 + +--- + +#### 6. 与Report Agent对话 + +**接口**: `POST /api/report/chat` + +**请求参数**: +```json +{ + "simulation_id": "sim_xxxx", + "message": "请详细解释一下舆情的主要趋势", + "chat_history": [ + {"role": "user", "content": "报告提到了哪些关键人物?"}, + {"role": "assistant", "content": "根据分析,关键人物包括..."} + ] +} +``` + +| 参数 | 类型 | 必填 | 默认值 | 说明 | +|------|------|------|--------|------| +| simulation_id | String | 是 | - | 模拟ID | +| message | String | 是 | - | 用户消息 | +| chat_history | Array | 否 | [] | 对话历史(用于上下文) | + +**返回示例**: +```json +{ + "success": true, + "data": { + "response": "根据模拟数据分析,舆情的主要趋势表现为...\n\n1. **初期阶段**:...\n2. **发酵阶段**:...\n3. **高峰阶段**:...", + "tool_calls": [ + {"name": "search_graph", "parameters": {"query": "舆情趋势"}}, + {"name": "get_graph_statistics", "parameters": {}} + ], + "sources": [] + } +} +``` + +--- + +#### 7. 检查报告状态 + +**接口**: `GET /api/report/check/{simulation_id}` + +**用途**: 判断是否解锁Interview功能 + +**返回示例**: +```json +{ + "success": true, + "data": { + "simulation_id": "sim_xxxx", + "has_report": true, + "report_status": "completed", + "report_id": "report_xxxx", + "interview_unlocked": true + } +} +``` + +--- + +#### 8. 列出所有报告 + +**接口**: `GET /api/report/list?simulation_id=sim_xxxx&limit=50` + +**返回示例**: +```json +{ + "success": true, + "data": [...], + "count": 5 +} +``` + +--- + +#### 9. 删除报告 + +**接口**: `DELETE /api/report/{report_id}` + +--- + +#### 10. 工具调试接口 + +**图谱搜索**: `POST /api/report/tools/search` +```json +{ + "graph_id": "mirofish_xxxx", + "query": "舆情走向", + "limit": 10 +} +``` + +**图谱统计**: `POST /api/report/tools/statistics` +```json +{ + "graph_id": "mirofish_xxxx" +} +``` + +--- + #### 6. 获取运行状态 **接口**: `GET /api/simulation/{simulation_id}/run-status` @@ -1192,7 +1501,50 @@ created_at: str # 创建时间 --- -### 6. SimulationParameters (模拟参数) +### 6. Report (报告模型) + +**文件**: `app/services/report_agent.py` + +**字段**: +```python +report_id: str # 报告ID (report_xxx) +simulation_id: str # 模拟ID +graph_id: str # 图谱ID +simulation_requirement: str # 模拟需求 +status: ReportStatus # 状态 +outline: ReportOutline # 报告大纲 +markdown_content: str # Markdown内容 +created_at: str # 创建时间 +completed_at: str # 完成时间 +error: str # 错误信息 +``` + +**状态枚举**: +```python +PENDING = "pending" # 等待中 +PLANNING = "planning" # 规划大纲中 +GENERATING = "generating" # 生成内容中 +COMPLETED = "completed" # 已完成 +FAILED = "failed" # 失败 +``` + +**ReportOutline字段**: +```python +title: str # 报告标题 +summary: str # 报告摘要 +sections: List[ReportSection] # 章节列表 +``` + +**ReportSection字段**: +```python +title: str # 章节标题 +content: str # 章节内容 +subsections: List[ReportSection] # 子章节 +``` + +--- + +### 7. SimulationParameters (模拟参数) **文件**: `app/services/simulation_config_generator.py` @@ -1815,6 +2167,270 @@ result = SimulationRunner.interview_all_agents( --- +### 10. ZepToolsService (Zep检索工具服务) + +**文件**: `app/services/zep_tools.py` + +**功能**: 封装多种Zep图谱检索工具,供Report Agent调用 + +**核心方法**: + +```python +def search_graph( + graph_id: str, + query: str, + limit: int = 10 +) -> SearchResult: + """ + 图谱语义搜索 + + 使用混合搜索(语义+BM25)查找相关信息 + 返回: facts列表、edges列表、nodes列表 + """ + +def get_all_nodes(graph_id: str) -> List[NodeInfo]: + """获取图谱所有节点""" + +def get_all_edges(graph_id: str) -> List[EdgeInfo]: + """获取图谱所有边""" + +def get_node_detail(node_uuid: str) -> Optional[NodeInfo]: + """获取单个节点详情""" + +def get_node_edges(node_uuid: str) -> List[EdgeInfo]: + """获取节点相关的边""" + +def get_entities_by_type( + graph_id: str, + entity_type: str +) -> List[NodeInfo]: + """按类型获取实体""" + +def get_entity_summary( + graph_id: str, + entity_name: str +) -> Dict[str, Any]: + """获取实体关系摘要""" + +def get_graph_statistics(graph_id: str) -> Dict[str, Any]: + """ + 获取图谱统计信息 + + 返回: + - total_nodes: 节点总数 + - total_edges: 边总数 + - entity_types: 实体类型分布 + - relation_types: 关系类型分布 + """ + +def get_simulation_context( + graph_id: str, + simulation_requirement: str, + limit: int = 30 +) -> Dict[str, Any]: + """ + 获取模拟相关上下文 + + 综合搜索与模拟需求相关的所有信息 + """ +``` + +**容错机制**: +- 所有API调用带3次重试 +- 指数退避策略 +- 搜索失败返回空结果而非抛出异常 + +--- + +### 11. ReportAgent (报告生成Agent) + +**文件**: `app/services/report_agent.py` + +**功能**: 使用ReACT模式生成模拟分析报告 + +**核心类**: + +```python +class ReportAgent: + """ + Report Agent - 模拟报告生成Agent + + 采用ReACT(Reasoning + Acting)模式: + 1. 规划阶段:分析模拟需求,规划报告目录结构 + 2. 生成阶段:逐章节生成内容,每章节可多次调用工具获取信息 + 3. 对话阶段:支持与用户对话,自主调用检索工具 + """ + + # 配置 + MAX_TOOL_CALLS_PER_SECTION = 5 # 每章节最大工具调用次数 + MAX_REFLECTION_ROUNDS = 2 # 最大反思轮数 +``` + +**核心方法**: + +```python +def plan_outline( + progress_callback: Optional[Callable] = None +) -> ReportOutline: + """ + 规划报告大纲 + + 步骤: + 1. 获取模拟上下文(图谱统计、相关事实) + 2. 使用LLM分析并生成大纲结构 + 3. 返回包含章节列表的大纲对象 + """ + +def _generate_section_react( + section: ReportSection, + outline: ReportOutline, + previous_sections: List[str], + progress_callback: Optional[Callable] = None +) -> str: + """ + 使用ReACT模式生成单个章节 + + ReACT循环: + 1. Thought(思考)- 分析需要什么信息 + 2. Action(行动)- 调用工具获取信息 + 3. Observation(观察)- 分析工具返回结果 + 4. 重复直到信息足够或达到最大次数 + 5. Final Answer(最终回答)- 生成章节内容 + """ + +def generate_report( + progress_callback: Optional[Callable] = None +) -> Report: + """ + 生成完整报告 + + 步骤: + 1. 规划大纲 + 2. 逐章节生成(ReACT模式) + 3. 组装Markdown报告 + 4. 保存报告文件 + """ + +def chat( + message: str, + chat_history: List[Dict[str, str]] = None +) -> Dict[str, Any]: + """ + 与Report Agent对话 + + 在对话中Agent可以自主调用检索工具来回答问题 + + Returns: + { + "response": "Agent回复", + "tool_calls": [调用的工具列表], + "sources": [信息来源] + } + """ +``` + +**工具调用格式**: + +Agent使用以下格式调用工具: + +``` + +{"name": "search_graph", "parameters": {"query": "舆情走向", "limit": 10}} + +``` + +或者: + +``` +[TOOL_CALL] search_graph(query="舆情走向", limit="10") +``` + +**报告大纲结构示例**: + +```json +{ + "title": "武汉大学撤销处分事件舆情分析报告", + "summary": "基于模拟结果的全面舆情分析", + "sections": [ + { + "title": "执行摘要", + "description": "简要总结模拟结果和关键发现" + }, + { + "title": "模拟背景", + "description": "描述模拟的初始条件和场景设定" + }, + { + "title": "关键发现", + "description": "分析模拟中的重要发现和趋势" + }, + { + "title": "舆情分析", + "description": "分析舆论走向、情绪变化、关键意见领袖" + }, + { + "title": "影响评估", + "description": "评估事件的影响范围和程度" + }, + { + "title": "建议与展望", + "description": "基于分析结果提出建议" + } + ] +} +``` + +--- + +### 12. ReportManager (报告管理器) + +**文件**: `app/services/report_agent.py` + +**功能**: 报告的持久化存储和检索 + +**核心方法**: + +```python +@classmethod +def save_report(cls, report: Report) -> None: + """ + 保存报告 + + 同时保存: + - JSON文件(报告元数据) + - Markdown文件(报告内容) + """ + +@classmethod +def get_report(cls, report_id: str) -> Optional[Report]: + """获取报告""" + +@classmethod +def get_report_by_simulation(cls, simulation_id: str) -> Optional[Report]: + """根据模拟ID获取报告""" + +@classmethod +def list_reports( + cls, + simulation_id: Optional[str] = None, + limit: int = 50 +) -> List[Report]: + """列出报告""" + +@classmethod +def delete_report(cls, report_id: str) -> bool: + """删除报告""" +``` + +**存储结构**: +``` +uploads/reports/ +├── report_abc123.json # 报告元数据 +└── report_abc123.md # Markdown报告 +``` + +--- + ## 工具类 ### 1. FileParser (文件解析器) @@ -1962,6 +2578,11 @@ ZEP_API_KEY=z_xxx # OASIS模拟配置 OASIS_DEFAULT_MAX_ROUNDS=10 + +# Report Agent配置(可选) +REPORT_AGENT_MAX_TOOL_CALLS=5 +REPORT_AGENT_MAX_REFLECTION_ROUNDS=2 +REPORT_AGENT_TEMPERATURE=0.5 ``` ### 配置项说明 @@ -1977,6 +2598,9 @@ OASIS_DEFAULT_MAX_ROUNDS=10 | LLM_MODEL_NAME | String | gpt-4o-mini | LLM模型名称 | | ZEP_API_KEY | String | - | Zep API密钥(必填) | | OASIS_DEFAULT_MAX_ROUNDS | Integer | 10 | 默认模拟轮数 | +| REPORT_AGENT_MAX_TOOL_CALLS | Integer | 5 | 每章节最大工具调用次数 | +| REPORT_AGENT_MAX_REFLECTION_ROUNDS | Integer | 2 | 最大反思轮数 | +| REPORT_AGENT_TEMPERATURE | Float | 0.5 | 报告生成温度参数 | --- @@ -2102,7 +2726,40 @@ curl -X POST http://localhost:5001/api/simulation/{sim_xxx}/interview/all \ # Step 12: 获取Interview历史 curl http://localhost:5001/api/simulation/{sim_xxx}/interview/history -# Step 13: 关闭模拟环境(优雅退出) +# Step 13: 生成模拟分析报告 +curl -X POST http://localhost:5001/api/report/generate \ + -H "Content-Type: application/json" \ + -d '{ + "simulation_id": "sim_xxx" + }' + +# 返回: task_id + +# Step 14: 查询报告生成进度 +curl -X POST http://localhost:5001/api/report/generate/status \ + -H "Content-Type: application/json" \ + -d '{ + "task_id": "task_xxx", + "simulation_id": "sim_xxx" + }' + +# 等待status=completed + +# Step 15: 获取报告 +curl http://localhost:5001/api/report/by-simulation/sim_xxx + +# Step 16: 与Report Agent对话 +curl -X POST http://localhost:5001/api/report/chat \ + -H "Content-Type: application/json" \ + -d '{ + "simulation_id": "sim_xxx", + "message": "请解释一下舆情的主要趋势" + }' + +# Step 17: 下载Markdown报告 +curl -O http://localhost:5001/api/report/{report_id}/download + +# Step 18: 关闭模拟环境(优雅退出) curl -X POST http://localhost:5001/api/simulation/close-env \ -H "Content-Type: application/json" \ -d '{ @@ -2278,11 +2935,31 @@ MIT License --- -**最后更新**: 2025-12-08 -**版本**: v1.2.0 +**最后更新**: 2025-12-09 +**版本**: v1.3.0 ### 更新日志 +**v1.3.0 (2025-12-09)**: +- 新增 Report Agent 模拟报告生成功能 + - 使用 LangChain + Zep 实现 ReACT 模式 + - 自动规划报告大纲,分段生成内容 + - 每章节可多次调用Zep检索工具获取信息 + - 生成专业的Markdown格式报告 +- 新增 Report Agent 对话功能 + - 报告完成后可与Agent对话 + - Agent自主调用检索工具回答问题 +- 新增 Zep 检索工具服务 + - 封装图谱搜索、节点读取、边查询等工具 + - 支持语义搜索、统计分析、上下文获取 +- 新增报告管理接口 + - 报告生成、查询、下载、删除 + - 报告状态检查(解锁Interview功能) +- 依赖更新 + - 新增 langchain>=0.2.0 + - 新增 langchain-core>=0.2.0 + - 新增 langchain-openai>=0.1.0 + **v1.2.0 (2025-12-08)**: - 新增 Interview 采访功能 - 支持单个Agent采访 diff --git a/backend/app/__init__.py b/backend/app/__init__.py index 5b7f1fc..aba624b 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -63,9 +63,10 @@ def create_app(config_class=Config): return response # 注册蓝图 - from .api import graph_bp, simulation_bp + from .api import graph_bp, simulation_bp, report_bp app.register_blueprint(graph_bp, url_prefix='/api/graph') app.register_blueprint(simulation_bp, url_prefix='/api/simulation') + app.register_blueprint(report_bp, url_prefix='/api/report') # 健康检查 @app.route('/health') diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index ad7a722..ffda743 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -6,7 +6,9 @@ from flask import Blueprint graph_bp = Blueprint('graph', __name__) simulation_bp = Blueprint('simulation', __name__) +report_bp = Blueprint('report', __name__) from . import graph # noqa: E402, F401 from . import simulation # noqa: E402, F401 +from . import report # noqa: E402, F401 diff --git a/backend/app/api/report.py b/backend/app/api/report.py new file mode 100644 index 0000000..ce44db9 --- /dev/null +++ b/backend/app/api/report.py @@ -0,0 +1,829 @@ +""" +Report API路由 +提供模拟报告生成、获取、对话等接口 +""" + +import os +import traceback +import threading +from flask import request, jsonify, send_file + +from . import report_bp +from ..config import Config +from ..services.report_agent import ReportAgent, ReportManager, ReportStatus +from ..services.simulation_manager import SimulationManager +from ..models.project import ProjectManager +from ..models.task import TaskManager, TaskStatus +from ..utils.logger import get_logger + +logger = get_logger('mirofish.api.report') + + +# ============== 报告生成接口 ============== + +@report_bp.route('/generate', methods=['POST']) +def generate_report(): + """ + 生成模拟分析报告(异步任务) + + 这是一个耗时操作,接口会立即返回task_id, + 使用 GET /api/report/generate/status 查询进度 + + 请求(JSON): + { + "simulation_id": "sim_xxxx", // 必填,模拟ID + "force_regenerate": false // 可选,强制重新生成 + } + + 返回: + { + "success": true, + "data": { + "simulation_id": "sim_xxxx", + "task_id": "task_xxxx", + "status": "generating", + "message": "报告生成任务已启动" + } + } + """ + try: + data = request.get_json() or {} + + simulation_id = data.get('simulation_id') + if not simulation_id: + return jsonify({ + "success": False, + "error": "请提供 simulation_id" + }), 400 + + force_regenerate = data.get('force_regenerate', False) + + # 获取模拟信息 + manager = SimulationManager() + state = manager.get_simulation(simulation_id) + + if not state: + return jsonify({ + "success": False, + "error": f"模拟不存在: {simulation_id}" + }), 404 + + # 检查是否已有报告 + if not force_regenerate: + existing_report = ReportManager.get_report_by_simulation(simulation_id) + if existing_report and existing_report.status == ReportStatus.COMPLETED: + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "report_id": existing_report.report_id, + "status": "completed", + "message": "报告已存在", + "already_generated": True + } + }) + + # 获取项目信息 + project = ProjectManager.get_project(state.project_id) + if not project: + return jsonify({ + "success": False, + "error": f"项目不存在: {state.project_id}" + }), 404 + + graph_id = state.graph_id or project.graph_id + if not graph_id: + return jsonify({ + "success": False, + "error": "缺少图谱ID,请确保已构建图谱" + }), 400 + + simulation_requirement = project.simulation_requirement + if not simulation_requirement: + return jsonify({ + "success": False, + "error": "缺少模拟需求描述" + }), 400 + + # 创建异步任务 + task_manager = TaskManager() + task_id = task_manager.create_task( + task_type="report_generate", + metadata={ + "simulation_id": simulation_id, + "graph_id": graph_id + } + ) + + # 定义后台任务 + def run_generate(): + try: + task_manager.update_task( + task_id, + status=TaskStatus.PROCESSING, + progress=0, + message="初始化Report Agent..." + ) + + # 创建Report Agent + agent = ReportAgent( + graph_id=graph_id, + simulation_id=simulation_id, + simulation_requirement=simulation_requirement + ) + + # 进度回调 + def progress_callback(stage, progress, message): + task_manager.update_task( + task_id, + progress=progress, + message=f"[{stage}] {message}" + ) + + # 生成报告 + report = agent.generate_report(progress_callback=progress_callback) + + # 保存报告 + ReportManager.save_report(report) + + if report.status == ReportStatus.COMPLETED: + task_manager.complete_task( + task_id, + result={ + "report_id": report.report_id, + "simulation_id": simulation_id, + "status": "completed" + } + ) + else: + task_manager.fail_task(task_id, report.error or "报告生成失败") + + except Exception as e: + logger.error(f"报告生成失败: {str(e)}") + task_manager.fail_task(task_id, str(e)) + + # 启动后台线程 + thread = threading.Thread(target=run_generate, daemon=True) + thread.start() + + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "task_id": task_id, + "status": "generating", + "message": "报告生成任务已启动,请通过 /api/report/generate/status 查询进度", + "already_generated": False + } + }) + + except Exception as e: + logger.error(f"启动报告生成任务失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('/generate/status', methods=['POST']) +def get_generate_status(): + """ + 查询报告生成任务进度 + + 请求(JSON): + { + "task_id": "task_xxxx", // 可选,generate返回的task_id + "simulation_id": "sim_xxxx" // 可选,模拟ID + } + + 返回: + { + "success": true, + "data": { + "task_id": "task_xxxx", + "status": "processing|completed|failed", + "progress": 45, + "message": "..." + } + } + """ + try: + data = request.get_json() or {} + + task_id = data.get('task_id') + simulation_id = data.get('simulation_id') + + # 如果提供了simulation_id,先检查是否已有完成的报告 + if simulation_id: + existing_report = ReportManager.get_report_by_simulation(simulation_id) + if existing_report and existing_report.status == ReportStatus.COMPLETED: + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "report_id": existing_report.report_id, + "status": "completed", + "progress": 100, + "message": "报告已生成", + "already_completed": True + } + }) + + if not task_id: + return jsonify({ + "success": False, + "error": "请提供 task_id 或 simulation_id" + }), 400 + + task_manager = TaskManager() + task = task_manager.get_task(task_id) + + if not task: + return jsonify({ + "success": False, + "error": f"任务不存在: {task_id}" + }), 404 + + return jsonify({ + "success": True, + "data": task.to_dict() + }) + + except Exception as e: + logger.error(f"查询任务状态失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e) + }), 500 + + +# ============== 报告获取接口 ============== + +@report_bp.route('/', methods=['GET']) +def get_report(report_id: str): + """ + 获取报告详情 + + 返回: + { + "success": true, + "data": { + "report_id": "report_xxxx", + "simulation_id": "sim_xxxx", + "status": "completed", + "outline": {...}, + "markdown_content": "...", + "created_at": "...", + "completed_at": "..." + } + } + """ + try: + report = ReportManager.get_report(report_id) + + if not report: + return jsonify({ + "success": False, + "error": f"报告不存在: {report_id}" + }), 404 + + return jsonify({ + "success": True, + "data": report.to_dict() + }) + + except Exception as e: + logger.error(f"获取报告失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('/by-simulation/', methods=['GET']) +def get_report_by_simulation(simulation_id: str): + """ + 根据模拟ID获取报告 + + 返回: + { + "success": true, + "data": { + "report_id": "report_xxxx", + ... + } + } + """ + try: + report = ReportManager.get_report_by_simulation(simulation_id) + + if not report: + return jsonify({ + "success": False, + "error": f"该模拟暂无报告: {simulation_id}", + "has_report": False + }), 404 + + return jsonify({ + "success": True, + "data": report.to_dict(), + "has_report": True + }) + + except Exception as e: + logger.error(f"获取报告失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('/list', methods=['GET']) +def list_reports(): + """ + 列出所有报告 + + Query参数: + simulation_id: 按模拟ID过滤(可选) + limit: 返回数量限制(默认50) + + 返回: + { + "success": true, + "data": [...], + "count": 10 + } + """ + try: + simulation_id = request.args.get('simulation_id') + limit = request.args.get('limit', 50, type=int) + + reports = ReportManager.list_reports( + simulation_id=simulation_id, + limit=limit + ) + + return jsonify({ + "success": True, + "data": [r.to_dict() for r in reports], + "count": len(reports) + }) + + except Exception as e: + logger.error(f"列出报告失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('//download', methods=['GET']) +def download_report(report_id: str): + """ + 下载报告(Markdown格式) + + 返回Markdown文件 + """ + try: + report = ReportManager.get_report(report_id) + + if not report: + return jsonify({ + "success": False, + "error": f"报告不存在: {report_id}" + }), 404 + + md_path = ReportManager._get_report_markdown_path(report_id) + + if not os.path.exists(md_path): + # 如果MD文件不存在,生成一个临时文件 + import tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: + f.write(report.markdown_content) + temp_path = f.name + + return send_file( + temp_path, + as_attachment=True, + download_name=f"{report_id}.md" + ) + + return send_file( + md_path, + as_attachment=True, + download_name=f"{report_id}.md" + ) + + except Exception as e: + logger.error(f"下载报告失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('/', methods=['DELETE']) +def delete_report(report_id: str): + """删除报告""" + try: + success = ReportManager.delete_report(report_id) + + if not success: + return jsonify({ + "success": False, + "error": f"报告不存在: {report_id}" + }), 404 + + return jsonify({ + "success": True, + "message": f"报告已删除: {report_id}" + }) + + except Exception as e: + logger.error(f"删除报告失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ============== Report Agent对话接口 ============== + +@report_bp.route('/chat', methods=['POST']) +def chat_with_report_agent(): + """ + 与Report Agent对话 + + Report Agent可以在对话中自主调用检索工具来回答问题 + + 请求(JSON): + { + "simulation_id": "sim_xxxx", // 必填,模拟ID + "message": "请解释一下舆情走向", // 必填,用户消息 + "chat_history": [ // 可选,对话历史 + {"role": "user", "content": "..."}, + {"role": "assistant", "content": "..."} + ] + } + + 返回: + { + "success": true, + "data": { + "response": "Agent回复...", + "tool_calls": [调用的工具列表], + "sources": [信息来源] + } + } + """ + try: + data = request.get_json() or {} + + simulation_id = data.get('simulation_id') + message = data.get('message') + chat_history = data.get('chat_history', []) + + if not simulation_id: + return jsonify({ + "success": False, + "error": "请提供 simulation_id" + }), 400 + + if not message: + return jsonify({ + "success": False, + "error": "请提供 message" + }), 400 + + # 获取模拟和项目信息 + manager = SimulationManager() + state = manager.get_simulation(simulation_id) + + if not state: + return jsonify({ + "success": False, + "error": f"模拟不存在: {simulation_id}" + }), 404 + + project = ProjectManager.get_project(state.project_id) + if not project: + return jsonify({ + "success": False, + "error": f"项目不存在: {state.project_id}" + }), 404 + + graph_id = state.graph_id or project.graph_id + if not graph_id: + return jsonify({ + "success": False, + "error": "缺少图谱ID" + }), 400 + + simulation_requirement = project.simulation_requirement or "" + + # 创建Agent并进行对话 + agent = ReportAgent( + graph_id=graph_id, + simulation_id=simulation_id, + simulation_requirement=simulation_requirement + ) + + result = agent.chat(message=message, chat_history=chat_history) + + return jsonify({ + "success": True, + "data": result + }) + + except Exception as e: + logger.error(f"对话失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ============== 报告进度与分章节接口 ============== + +@report_bp.route('//progress', methods=['GET']) +def get_report_progress(report_id: str): + """ + 获取报告生成进度(实时) + + 返回: + { + "success": true, + "data": { + "status": "generating", + "progress": 45, + "message": "正在生成章节: 关键发现", + "current_section": "关键发现", + "completed_sections": ["执行摘要", "模拟背景"], + "updated_at": "2025-12-09T..." + } + } + """ + try: + progress = ReportManager.get_progress(report_id) + + if not progress: + return jsonify({ + "success": False, + "error": f"报告不存在或进度信息不可用: {report_id}" + }), 404 + + return jsonify({ + "success": True, + "data": progress + }) + + except Exception as e: + logger.error(f"获取报告进度失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('//sections', methods=['GET']) +def get_report_sections(report_id: str): + """ + 获取已生成的章节列表(分章节输出) + + 前端可以轮询此接口获取已生成的章节内容,无需等待整个报告完成 + + 返回: + { + "success": true, + "data": { + "report_id": "report_xxxx", + "sections": [ + { + "filename": "section_01.md", + "section_index": 1, + "content": "## 执行摘要\\n\\n..." + }, + ... + ], + "total_sections": 3, + "is_complete": false + } + } + """ + try: + sections = ReportManager.get_generated_sections(report_id) + + # 获取报告状态 + report = ReportManager.get_report(report_id) + is_complete = report is not None and report.status == ReportStatus.COMPLETED + + return jsonify({ + "success": True, + "data": { + "report_id": report_id, + "sections": sections, + "total_sections": len(sections), + "is_complete": is_complete + } + }) + + except Exception as e: + logger.error(f"获取章节列表失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('//section/', methods=['GET']) +def get_single_section(report_id: str, section_index: int): + """ + 获取单个章节内容 + + 返回: + { + "success": true, + "data": { + "filename": "section_01.md", + "content": "## 执行摘要\\n\\n..." + } + } + """ + try: + section_path = ReportManager._get_section_path(report_id, section_index) + + if not os.path.exists(section_path): + return jsonify({ + "success": False, + "error": f"章节不存在: section_{section_index:02d}.md" + }), 404 + + with open(section_path, 'r', encoding='utf-8') as f: + content = f.read() + + return jsonify({ + "success": True, + "data": { + "filename": f"section_{section_index:02d}.md", + "section_index": section_index, + "content": content + } + }) + + except Exception as e: + logger.error(f"获取章节内容失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ============== 报告状态检查接口 ============== + +@report_bp.route('/check/', methods=['GET']) +def check_report_status(simulation_id: str): + """ + 检查模拟是否有报告,以及报告状态 + + 用于前端判断是否解锁Interview功能 + + 返回: + { + "success": true, + "data": { + "simulation_id": "sim_xxxx", + "has_report": true, + "report_status": "completed", + "report_id": "report_xxxx", + "interview_unlocked": true + } + } + """ + try: + report = ReportManager.get_report_by_simulation(simulation_id) + + has_report = report is not None + report_status = report.status.value if report else None + report_id = report.report_id if report else None + + # 只有报告完成后才解锁interview + interview_unlocked = has_report and report.status == ReportStatus.COMPLETED + + return jsonify({ + "success": True, + "data": { + "simulation_id": simulation_id, + "has_report": has_report, + "report_status": report_status, + "report_id": report_id, + "interview_unlocked": interview_unlocked + } + }) + + except Exception as e: + logger.error(f"检查报告状态失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ============== 工具调用接口(供调试使用)============== + +@report_bp.route('/tools/search', methods=['POST']) +def search_graph_tool(): + """ + 图谱搜索工具接口(供调试使用) + + 请求(JSON): + { + "graph_id": "mirofish_xxxx", + "query": "搜索查询", + "limit": 10 + } + """ + try: + data = request.get_json() or {} + + graph_id = data.get('graph_id') + query = data.get('query') + limit = data.get('limit', 10) + + if not graph_id or not query: + return jsonify({ + "success": False, + "error": "请提供 graph_id 和 query" + }), 400 + + from ..services.zep_tools import ZepToolsService + + tools = ZepToolsService() + result = tools.search_graph( + graph_id=graph_id, + query=query, + limit=limit + ) + + return jsonify({ + "success": True, + "data": result.to_dict() + }) + + except Exception as e: + logger.error(f"图谱搜索失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@report_bp.route('/tools/statistics', methods=['POST']) +def get_graph_statistics_tool(): + """ + 图谱统计工具接口(供调试使用) + + 请求(JSON): + { + "graph_id": "mirofish_xxxx" + } + """ + try: + data = request.get_json() or {} + + graph_id = data.get('graph_id') + + if not graph_id: + return jsonify({ + "success": False, + "error": "请提供 graph_id" + }), 400 + + from ..services.zep_tools import ZepToolsService + + tools = ZepToolsService() + result = tools.get_graph_statistics(graph_id) + + return jsonify({ + "success": True, + "data": result + }) + + except Exception as e: + logger.error(f"获取图谱统计失败: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 diff --git a/backend/app/config.py b/backend/app/config.py index 10b18f7..3dd3f51 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -58,6 +58,11 @@ class Config: 'TREND', 'REFRESH', 'DO_NOTHING', 'FOLLOW', 'MUTE' ] + # Report Agent配置 + REPORT_AGENT_MAX_TOOL_CALLS = int(os.environ.get('REPORT_AGENT_MAX_TOOL_CALLS', '5')) + REPORT_AGENT_MAX_REFLECTION_ROUNDS = int(os.environ.get('REPORT_AGENT_MAX_REFLECTION_ROUNDS', '2')) + REPORT_AGENT_TEMPERATURE = float(os.environ.get('REPORT_AGENT_TEMPERATURE', '0.5')) + @classmethod def validate(cls): """验证必要配置""" diff --git a/backend/app/services/report_agent.py b/backend/app/services/report_agent.py new file mode 100644 index 0000000..3ec4be3 --- /dev/null +++ b/backend/app/services/report_agent.py @@ -0,0 +1,1296 @@ +""" +Report Agent服务 +使用LangChain + Zep实现ReACT模式的模拟报告生成 + +功能: +1. 根据模拟需求和Zep图谱信息生成报告 +2. 先规划目录结构,然后分段生成 +3. 每段采用ReACT多轮思考与反思模式 +4. 支持与用户对话,在对话中自主调用检索工具 +""" + +import os +import json +import time +import re +from typing import Dict, Any, List, Optional, Callable +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum + +from ..config import Config +from ..utils.llm_client import LLMClient +from ..utils.logger import get_logger +from .zep_tools import ZepToolsService, SearchResult + +logger = get_logger('mirofish.report_agent') + + +class ReportStatus(str, Enum): + """报告状态""" + PENDING = "pending" + PLANNING = "planning" + GENERATING = "generating" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class ReportSection: + """报告章节""" + title: str + content: str = "" + subsections: List['ReportSection'] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return { + "title": self.title, + "content": self.content, + "subsections": [s.to_dict() for s in self.subsections] + } + + def to_markdown(self, level: int = 2) -> str: + """转换为Markdown格式""" + md = f"{'#' * level} {self.title}\n\n" + if self.content: + md += f"{self.content}\n\n" + for sub in self.subsections: + md += sub.to_markdown(level + 1) + return md + + +@dataclass +class ReportOutline: + """报告大纲""" + title: str + summary: str + sections: List[ReportSection] + + def to_dict(self) -> Dict[str, Any]: + return { + "title": self.title, + "summary": self.summary, + "sections": [s.to_dict() for s in self.sections] + } + + def to_markdown(self) -> str: + """转换为Markdown格式""" + md = f"# {self.title}\n\n" + md += f"> {self.summary}\n\n" + for section in self.sections: + md += section.to_markdown() + return md + + +@dataclass +class Report: + """完整报告""" + report_id: str + simulation_id: str + graph_id: str + simulation_requirement: str + status: ReportStatus + outline: Optional[ReportOutline] = None + markdown_content: str = "" + created_at: str = "" + completed_at: str = "" + error: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "report_id": self.report_id, + "simulation_id": self.simulation_id, + "graph_id": self.graph_id, + "simulation_requirement": self.simulation_requirement, + "status": self.status.value, + "outline": self.outline.to_dict() if self.outline else None, + "markdown_content": self.markdown_content, + "created_at": self.created_at, + "completed_at": self.completed_at, + "error": self.error + } + + +class ReportAgent: + """ + Report Agent - 模拟报告生成Agent + + 采用ReACT(Reasoning + Acting)模式: + 1. 规划阶段:分析模拟需求,规划报告目录结构 + 2. 生成阶段:逐章节生成内容,每章节可多次调用工具获取信息 + 3. 反思阶段:检查内容完整性和准确性 + + 工具(MCP封装): + - search_graph: 图谱语义搜索 + - get_graph_statistics: 获取图谱统计 + - get_entity_summary: 获取实体摘要 + - get_simulation_context: 获取模拟上下文 + """ + + # 最大工具调用次数(每个章节) + MAX_TOOL_CALLS_PER_SECTION = 5 + + # 最大反思轮数 + MAX_REFLECTION_ROUNDS = 2 + + def __init__( + self, + graph_id: str, + simulation_id: str, + simulation_requirement: str, + llm_client: Optional[LLMClient] = None, + zep_tools: Optional[ZepToolsService] = None + ): + """ + 初始化Report Agent + + Args: + graph_id: 图谱ID + simulation_id: 模拟ID + simulation_requirement: 模拟需求描述 + llm_client: LLM客户端(可选) + zep_tools: Zep工具服务(可选) + """ + self.graph_id = graph_id + self.simulation_id = simulation_id + self.simulation_requirement = simulation_requirement + + self.llm = llm_client or LLMClient() + self.zep_tools = zep_tools or ZepToolsService() + + # 工具定义 + self.tools = self._define_tools() + + logger.info(f"ReportAgent 初始化完成: graph_id={graph_id}, simulation_id={simulation_id}") + + def _define_tools(self) -> Dict[str, Dict[str, Any]]: + """定义可用工具""" + return { + "search_graph": { + "name": "search_graph", + "description": "在知识图谱中搜索相关信息。输入搜索查询,返回与查询相关的事实和关系。", + "parameters": { + "query": "搜索查询字符串", + "limit": "返回结果数量(可选,默认10)" + } + }, + "get_graph_statistics": { + "name": "get_graph_statistics", + "description": "获取知识图谱的统计信息,包括节点数量、边数量、实体类型分布等。", + "parameters": {} + }, + "get_entity_summary": { + "name": "get_entity_summary", + "description": "获取指定实体的详细信息和关系摘要。", + "parameters": { + "entity_name": "实体名称" + } + }, + "get_simulation_context": { + "name": "get_simulation_context", + "description": "获取与模拟需求相关的上下文信息,包括相关事实、实体列表等。", + "parameters": { + "query": "额外的查询条件(可选)" + } + }, + "get_entities_by_type": { + "name": "get_entities_by_type", + "description": "按类型获取实体列表,如获取所有Student类型或PublicFigure类型的实体。", + "parameters": { + "entity_type": "实体类型名称" + } + } + } + + def _execute_tool(self, tool_name: str, parameters: Dict[str, Any]) -> str: + """ + 执行工具调用 + + Args: + tool_name: 工具名称 + parameters: 工具参数 + + Returns: + 工具执行结果(文本格式) + """ + logger.info(f"执行工具: {tool_name}, 参数: {parameters}") + + try: + if tool_name == "search_graph": + query = parameters.get("query", "") + limit = parameters.get("limit", 10) + result = self.zep_tools.search_graph( + graph_id=self.graph_id, + query=query, + limit=limit + ) + return result.to_text() + + elif tool_name == "get_graph_statistics": + result = self.zep_tools.get_graph_statistics(self.graph_id) + return json.dumps(result, ensure_ascii=False, indent=2) + + elif tool_name == "get_entity_summary": + entity_name = parameters.get("entity_name", "") + result = self.zep_tools.get_entity_summary( + graph_id=self.graph_id, + entity_name=entity_name + ) + return json.dumps(result, ensure_ascii=False, indent=2) + + elif tool_name == "get_simulation_context": + query = parameters.get("query", self.simulation_requirement) + result = self.zep_tools.get_simulation_context( + graph_id=self.graph_id, + simulation_requirement=query + ) + return json.dumps(result, ensure_ascii=False, indent=2) + + elif tool_name == "get_entities_by_type": + entity_type = parameters.get("entity_type", "") + nodes = self.zep_tools.get_entities_by_type( + graph_id=self.graph_id, + entity_type=entity_type + ) + result = [n.to_dict() for n in nodes] + return json.dumps(result, ensure_ascii=False, indent=2) + + else: + return f"未知工具: {tool_name}" + + except Exception as e: + logger.error(f"工具执行失败: {tool_name}, 错误: {str(e)}") + return f"工具执行失败: {str(e)}" + + def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]: + """ + 从LLM响应中解析工具调用 + + 支持的格式: + + {"name": "tool_name", "parameters": {"param1": "value1"}} + + + 或者: + [TOOL_CALL] tool_name(param1="value1", param2="value2") + """ + tool_calls = [] + + # 格式1: XML风格 + xml_pattern = r'\s*(\{.*?\})\s*' + for match in re.finditer(xml_pattern, response, re.DOTALL): + try: + call_data = json.loads(match.group(1)) + tool_calls.append(call_data) + except json.JSONDecodeError: + pass + + # 格式2: 函数调用风格 + func_pattern = r'\[TOOL_CALL\]\s*(\w+)\s*\((.*?)\)' + for match in re.finditer(func_pattern, response, re.DOTALL): + tool_name = match.group(1) + params_str = match.group(2) + + # 解析参数 + params = {} + for param_match in re.finditer(r'(\w+)\s*=\s*["\']([^"\']*)["\']', params_str): + params[param_match.group(1)] = param_match.group(2) + + tool_calls.append({ + "name": tool_name, + "parameters": params + }) + + return tool_calls + + def _get_tools_description(self) -> str: + """生成工具描述文本""" + desc_parts = ["可用工具:"] + for name, tool in self.tools.items(): + params_desc = ", ".join([f"{k}: {v}" for k, v in tool["parameters"].items()]) + desc_parts.append(f"- {name}: {tool['description']}") + if params_desc: + desc_parts.append(f" 参数: {params_desc}") + return "\n".join(desc_parts) + + def plan_outline( + self, + progress_callback: Optional[Callable] = None + ) -> ReportOutline: + """ + 规划报告大纲 + + 使用LLM分析模拟需求,规划报告的目录结构 + + Args: + progress_callback: 进度回调函数 + + Returns: + ReportOutline: 报告大纲 + """ + logger.info("开始规划报告大纲...") + + if progress_callback: + progress_callback("planning", 0, "正在分析模拟需求...") + + # 首先获取模拟上下文 + context = self.zep_tools.get_simulation_context( + graph_id=self.graph_id, + simulation_requirement=self.simulation_requirement + ) + + if progress_callback: + progress_callback("planning", 30, "正在生成报告大纲...") + + # 构建规划prompt + system_prompt = """你是一个专业的舆情分析报告撰写专家。你需要根据用户的模拟需求和已有的知识图谱信息,规划一份精炼的模拟分析报告大纲。 + +【重要】报告章节数量限制: +- 报告最多包含5个主章节 +- 每个章节可以有0-2个子章节 +- 内容要精炼,避免冗余 + +报告应聚焦以下核心内容(选择最相关的3-5项): +1. 执行摘要 - 简要总结模拟结果和关键发现 +2. 模拟背景 - 描述模拟的初始条件和场景设定 +3. 关键发现 - 分析模拟中的重要发现和趋势 +4. 舆情分析 - 分析舆论走向、情绪变化、关键意见领袖等 +5. 建议与展望 - 基于分析结果提出建议 + +请输出JSON格式的报告大纲,格式如下: +{ + "title": "报告标题", + "summary": "报告摘要(一句话概括)", + "sections": [ + { + "title": "章节标题", + "description": "章节内容描述", + "subsections": [ + {"title": "子章节标题", "description": "子章节描述"} + ] + } + ] +} + +注意:sections数组最多包含5个元素!""" + + user_prompt = f"""模拟需求: +{self.simulation_requirement} + +已有的知识图谱信息: +- 总节点数: {context.get('graph_statistics', {}).get('total_nodes', 0)} +- 总边数: {context.get('graph_statistics', {}).get('total_edges', 0)} +- 实体类型: {list(context.get('graph_statistics', {}).get('entity_types', {}).keys())} +- 实体数量: {context.get('total_entities', 0)} + +相关事实: +{json.dumps(context.get('related_facts', [])[:10], ensure_ascii=False, indent=2)} + +请根据以上信息,生成一份针对此模拟场景的报告大纲。 + +【再次提醒】报告必须控制在最多5个章节以内,内容要精炼聚焦。""" + + try: + response = self.llm.chat_json( + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ], + temperature=0.3 + ) + + if progress_callback: + progress_callback("planning", 80, "正在解析大纲结构...") + + # 解析大纲 + sections = [] + for section_data in response.get("sections", []): + subsections = [] + for sub_data in section_data.get("subsections", []): + subsections.append(ReportSection( + title=sub_data.get("title", ""), + content="" + )) + + sections.append(ReportSection( + title=section_data.get("title", ""), + content="", + subsections=subsections + )) + + outline = ReportOutline( + title=response.get("title", "模拟分析报告"), + summary=response.get("summary", ""), + sections=sections + ) + + if progress_callback: + progress_callback("planning", 100, "大纲规划完成") + + logger.info(f"大纲规划完成: {len(sections)} 个章节") + return outline + + except Exception as e: + logger.error(f"大纲规划失败: {str(e)}") + # 返回默认大纲(5个章节) + return ReportOutline( + title="模拟分析报告", + summary="基于模拟结果的分析报告", + sections=[ + ReportSection(title="执行摘要"), + ReportSection(title="模拟背景与场景设定"), + ReportSection(title="关键发现与趋势分析"), + ReportSection(title="舆情走向与情绪演化"), + ReportSection(title="总结与建议") + ] + ) + + def _generate_section_react( + self, + section: ReportSection, + outline: ReportOutline, + previous_sections: List[str], + progress_callback: Optional[Callable] = None + ) -> str: + """ + 使用ReACT模式生成单个章节内容 + + ReACT循环: + 1. Thought(思考)- 分析需要什么信息 + 2. Action(行动)- 调用工具获取信息 + 3. Observation(观察)- 分析工具返回结果 + 4. 重复直到信息足够或达到最大次数 + 5. Final Answer(最终回答)- 生成章节内容 + + Args: + section: 要生成的章节 + outline: 完整大纲 + previous_sections: 之前章节的内容(用于保持连贯性) + progress_callback: 进度回调 + + Returns: + 章节内容(Markdown格式) + """ + logger.info(f"ReACT生成章节: {section.title}") + + # 构建系统prompt + system_prompt = f"""你是一个专业的舆情分析报告撰写专家,正在撰写报告的一个章节。 + +报告标题: {outline.title} +报告摘要: {outline.summary} +模拟需求: {self.simulation_requirement} + +当前要撰写的章节: {section.title} + +你可以使用以下工具来获取信息,每次最多调用{self.MAX_TOOL_CALLS_PER_SECTION}次: + +{self._get_tools_description()} + +请按照以下ReACT格式进行思考和行动: + +Thought: [分析当前需要什么信息来撰写这个章节] +Action: [如果需要信息,调用工具] + +{{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}} + + +当收集到足够信息后,输出: +Final Answer: +[章节的完整Markdown内容] + +注意: +1. 内容要专业、客观、有深度 +2. 引用具体的数据和事实 +3. 保持与其他章节的逻辑连贯性 +4. 使用适当的Markdown格式(列表、强调等) +5. 不要重复前面章节已经详细描述的内容""" + + # 构建用户prompt + previous_content = "\n\n".join(previous_sections) if previous_sections else "(这是第一个章节)" + user_prompt = f"""已完成的章节内容: +{previous_content[:2000]} + +现在请撰写章节: {section.title} + +首先思考需要什么信息,然后调用工具获取,最后生成内容。""" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + + # ReACT循环 + tool_calls_count = 0 + max_iterations = self.MAX_TOOL_CALLS_PER_SECTION + 2 + + for iteration in range(max_iterations): + if progress_callback: + progress_callback( + "generating", + int((iteration / max_iterations) * 100), + f"思考与行动中 ({tool_calls_count}/{self.MAX_TOOL_CALLS_PER_SECTION})" + ) + + # 调用LLM + response = self.llm.chat( + messages=messages, + temperature=0.5, + max_tokens=4096 + ) + + logger.debug(f"LLM响应: {response[:200]}...") + + # 检查是否有最终答案 + if "Final Answer:" in response: + # 提取最终答案 + final_answer = response.split("Final Answer:")[-1].strip() + logger.info(f"章节 {section.title} 生成完成") + return final_answer + + # 解析工具调用 + tool_calls = self._parse_tool_calls(response) + + if not tool_calls: + # 没有工具调用也没有最终答案,提示生成最终答案 + messages.append({"role": "assistant", "content": response}) + messages.append({ + "role": "user", + "content": "请基于已有信息,输出 Final Answer: 并生成章节内容。" + }) + continue + + # 执行工具调用 + tool_results = [] + for call in tool_calls: + if tool_calls_count >= self.MAX_TOOL_CALLS_PER_SECTION: + break + + result = self._execute_tool(call["name"], call.get("parameters", {})) + tool_results.append(f"工具 {call['name']} 返回:\n{result}") + tool_calls_count += 1 + + # 将结果添加到消息 + messages.append({"role": "assistant", "content": response}) + messages.append({ + "role": "user", + "content": f"Observation:\n" + "\n\n".join(tool_results) + "\n\n请继续思考或输出 Final Answer:" + }) + + # 达到最大迭代次数,强制生成内容 + logger.warning(f"章节 {section.title} 达到最大迭代次数,强制生成") + messages.append({ + "role": "user", + "content": "已达到工具调用限制,请直接输出 Final Answer: 并生成章节内容。" + }) + + response = self.llm.chat( + messages=messages, + temperature=0.5, + max_tokens=4096 + ) + + if "Final Answer:" in response: + return response.split("Final Answer:")[-1].strip() + + return response + + def generate_report( + self, + progress_callback: Optional[Callable[[str, int, str], None]] = None + ) -> Report: + """ + 生成完整报告(分章节实时输出) + + 每个章节生成完成后立即保存到文件夹,不需要等待整个报告完成。 + 文件结构: + reports/{report_id}/ + meta.json - 报告元信息 + outline.json - 报告大纲 + progress.json - 生成进度 + section_01.md - 第1章节 + section_02.md - 第2章节 + ... + full_report.md - 完整报告 + + Args: + progress_callback: 进度回调函数 (stage, progress, message) + + Returns: + Report: 完整报告 + """ + import uuid + + report_id = f"report_{uuid.uuid4().hex[:12]}" + + report = Report( + report_id=report_id, + simulation_id=self.simulation_id, + graph_id=self.graph_id, + simulation_requirement=self.simulation_requirement, + status=ReportStatus.PENDING, + created_at=datetime.now().isoformat() + ) + + # 已完成的章节标题列表(用于进度追踪) + completed_section_titles = [] + + try: + # 初始化:创建报告文件夹并保存初始状态 + ReportManager._ensure_report_folder(report_id) + ReportManager.update_progress( + report_id, "pending", 0, "初始化报告...", + completed_sections=[] + ) + ReportManager.save_report(report) + + # 阶段1: 规划大纲 + report.status = ReportStatus.PLANNING + ReportManager.update_progress( + report_id, "planning", 5, "开始规划报告大纲...", + completed_sections=[] + ) + + if progress_callback: + progress_callback("planning", 0, "开始规划报告大纲...") + + outline = self.plan_outline( + progress_callback=lambda stage, prog, msg: + progress_callback(stage, prog // 5, msg) if progress_callback else None + ) + report.outline = outline + + # 保存大纲到文件 + ReportManager.save_outline(report_id, outline) + ReportManager.update_progress( + report_id, "planning", 15, f"大纲规划完成,共{len(outline.sections)}个章节", + completed_sections=[] + ) + ReportManager.save_report(report) + + logger.info(f"大纲已保存到文件: {report_id}/outline.json") + + # 阶段2: 逐章节生成(分章节保存) + report.status = ReportStatus.GENERATING + + total_sections = len(outline.sections) + generated_sections = [] # 保存内容用于上下文 + + for i, section in enumerate(outline.sections): + section_num = i + 1 + base_progress = 20 + int((i / total_sections) * 70) + + # 更新进度 + ReportManager.update_progress( + report_id, "generating", base_progress, + f"正在生成章节: {section.title} ({section_num}/{total_sections})", + current_section=section.title, + completed_sections=completed_section_titles + ) + + if progress_callback: + progress_callback( + "generating", + base_progress, + f"正在生成章节: {section.title} ({section_num}/{total_sections})" + ) + + # 生成章节内容 + section_content = self._generate_section_react( + section=section, + outline=outline, + previous_sections=generated_sections, + progress_callback=lambda stage, prog, msg: + progress_callback( + stage, + base_progress + int(prog * 0.7 / total_sections), + msg + ) if progress_callback else None + ) + + section.content = section_content + generated_sections.append(f"## {section.title}\n\n{section_content}") + + # 【关键】立即保存章节到文件 + ReportManager.save_section(report_id, section_num, section) + completed_section_titles.append(section.title) + + logger.info(f"章节已保存: {report_id}/section_{section_num:02d}.md") + + # 更新进度 + ReportManager.update_progress( + report_id, "generating", + base_progress + int(70 / total_sections), + f"章节 {section.title} 已完成", + current_section=None, + completed_sections=completed_section_titles + ) + + # 生成并保存子章节 + for j, subsection in enumerate(section.subsections): + subsection_num = j + 1 + + if progress_callback: + progress_callback( + "generating", + base_progress + int(((j + 1) / len(section.subsections)) * 5), + f"正在生成子章节: {subsection.title}" + ) + + ReportManager.update_progress( + report_id, "generating", + base_progress + int(((j + 1) / len(section.subsections)) * 5), + f"正在生成子章节: {subsection.title}", + current_section=subsection.title, + completed_sections=completed_section_titles + ) + + subsection_content = self._generate_section_react( + section=subsection, + outline=outline, + previous_sections=generated_sections, + progress_callback=None + ) + subsection.content = subsection_content + generated_sections.append(f"### {subsection.title}\n\n{subsection_content}") + + # 【关键】立即保存子章节到文件 + ReportManager.save_section( + report_id, subsection_num, subsection, + is_subsection=True, parent_index=section_num + ) + completed_section_titles.append(f" └─ {subsection.title}") + + logger.info(f"子章节已保存: {report_id}/section_{section_num:02d}_{subsection_num:02d}.md") + + # 阶段3: 组装完整报告 + if progress_callback: + progress_callback("generating", 95, "正在组装完整报告...") + + ReportManager.update_progress( + report_id, "generating", 95, "正在组装完整报告...", + completed_sections=completed_section_titles + ) + + # 使用ReportManager组装完整报告 + report.markdown_content = ReportManager.assemble_full_report(report_id, outline) + report.status = ReportStatus.COMPLETED + report.completed_at = datetime.now().isoformat() + + # 保存最终报告 + ReportManager.save_report(report) + ReportManager.update_progress( + report_id, "completed", 100, "报告生成完成", + completed_sections=completed_section_titles + ) + + if progress_callback: + progress_callback("completed", 100, "报告生成完成") + + logger.info(f"报告生成完成: {report_id}") + return report + + except Exception as e: + logger.error(f"报告生成失败: {str(e)}") + report.status = ReportStatus.FAILED + report.error = str(e) + + # 保存失败状态 + try: + ReportManager.save_report(report) + ReportManager.update_progress( + report_id, "failed", -1, f"报告生成失败: {str(e)}", + completed_sections=completed_section_titles + ) + except Exception: + pass # 忽略保存失败的错误 + + return report + + def chat( + self, + message: str, + chat_history: List[Dict[str, str]] = None + ) -> Dict[str, Any]: + """ + 与Report Agent对话 + + 在对话中Agent可以自主调用检索工具来回答问题 + + Args: + message: 用户消息 + chat_history: 对话历史 + + Returns: + { + "response": "Agent回复", + "tool_calls": [调用的工具列表], + "sources": [信息来源] + } + """ + logger.info(f"Report Agent对话: {message[:50]}...") + + chat_history = chat_history or [] + + system_prompt = f"""你是一个专业的舆情分析助手,负责回答关于模拟分析报告的问题。 + +模拟需求: {self.simulation_requirement} +图谱ID: {self.graph_id} + +你可以使用以下工具来获取信息: + +{self._get_tools_description()} + +工具调用格式: + +{{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}} + + +回答要求: +1. 基于事实和数据回答 +2. 引用具体信息来源 +3. 如果不确定,说明信息限制 +4. 保持专业和客观""" + + # 构建消息 + messages = [{"role": "system", "content": system_prompt}] + + # 添加历史对话 + for h in chat_history[-10:]: # 限制历史长度 + messages.append(h) + + messages.append({"role": "user", "content": message}) + + # ReACT循环 + tool_calls_made = [] + max_iterations = 3 + + for iteration in range(max_iterations): + response = self.llm.chat( + messages=messages, + temperature=0.5, + max_tokens=2048 + ) + + # 解析工具调用 + tool_calls = self._parse_tool_calls(response) + + if not tool_calls: + # 没有工具调用,返回响应 + # 清理响应中的工具调用标记 + clean_response = re.sub(r'.*?', '', response, flags=re.DOTALL) + clean_response = re.sub(r'\[TOOL_CALL\].*?\)', '', clean_response) + + return { + "response": clean_response.strip(), + "tool_calls": tool_calls_made, + "sources": [] + } + + # 执行工具调用 + tool_results = [] + for call in tool_calls: + result = self._execute_tool(call["name"], call.get("parameters", {})) + tool_results.append({ + "tool": call["name"], + "result": result[:1000] # 限制长度 + }) + tool_calls_made.append(call) + + # 将结果添加到消息 + messages.append({"role": "assistant", "content": response}) + observation = "工具调用结果:\n" + "\n\n".join([ + f"[{r['tool']}]: {r['result']}" for r in tool_results + ]) + messages.append({"role": "user", "content": observation + "\n\n请基于以上信息回答问题。"}) + + # 达到最大迭代,获取最终响应 + final_response = self.llm.chat( + messages=messages, + temperature=0.5, + max_tokens=2048 + ) + + return { + "response": final_response, + "tool_calls": tool_calls_made, + "sources": [] + } + + +class ReportManager: + """ + 报告管理器 + + 负责报告的持久化存储和检索 + + 文件结构(分章节输出): + reports/ + {report_id}/ + meta.json - 报告元信息和状态 + outline.json - 报告大纲 + progress.json - 生成进度 + section_01.md - 第1章节 + section_02.md - 第2章节 + ... + full_report.md - 完整报告 + """ + + # 报告存储目录 + REPORTS_DIR = os.path.join(Config.UPLOAD_FOLDER, 'reports') + + @classmethod + def _ensure_reports_dir(cls): + """确保报告根目录存在""" + os.makedirs(cls.REPORTS_DIR, exist_ok=True) + + @classmethod + def _get_report_folder(cls, report_id: str) -> str: + """获取报告文件夹路径""" + return os.path.join(cls.REPORTS_DIR, report_id) + + @classmethod + def _ensure_report_folder(cls, report_id: str) -> str: + """确保报告文件夹存在并返回路径""" + folder = cls._get_report_folder(report_id) + os.makedirs(folder, exist_ok=True) + return folder + + @classmethod + def _get_report_path(cls, report_id: str) -> str: + """获取报告元信息文件路径""" + return os.path.join(cls._get_report_folder(report_id), "meta.json") + + @classmethod + def _get_report_markdown_path(cls, report_id: str) -> str: + """获取完整报告Markdown文件路径""" + return os.path.join(cls._get_report_folder(report_id), "full_report.md") + + @classmethod + def _get_outline_path(cls, report_id: str) -> str: + """获取大纲文件路径""" + return os.path.join(cls._get_report_folder(report_id), "outline.json") + + @classmethod + def _get_progress_path(cls, report_id: str) -> str: + """获取进度文件路径""" + return os.path.join(cls._get_report_folder(report_id), "progress.json") + + @classmethod + def _get_section_path(cls, report_id: str, section_index: int) -> str: + """获取章节Markdown文件路径""" + return os.path.join(cls._get_report_folder(report_id), f"section_{section_index:02d}.md") + + @classmethod + def save_outline(cls, report_id: str, outline: ReportOutline) -> None: + """ + 保存报告大纲 + + 在规划阶段完成后立即调用 + """ + cls._ensure_report_folder(report_id) + + with open(cls._get_outline_path(report_id), 'w', encoding='utf-8') as f: + json.dump(outline.to_dict(), f, ensure_ascii=False, indent=2) + + logger.info(f"大纲已保存: {report_id}") + + @classmethod + def save_section( + cls, + report_id: str, + section_index: int, + section: ReportSection, + is_subsection: bool = False, + parent_index: int = None + ) -> str: + """ + 保存单个章节 + + 在每个章节生成完成后立即调用,实现分章节输出 + + Args: + report_id: 报告ID + section_index: 章节索引(从1开始) + section: 章节对象 + is_subsection: 是否是子章节 + parent_index: 父章节索引(子章节时使用) + + Returns: + 保存的文件路径 + """ + cls._ensure_report_folder(report_id) + + # 确定章节级别和标题格式 + if is_subsection and parent_index is not None: + level = "###" + file_suffix = f"section_{parent_index:02d}_{section_index:02d}.md" + else: + level = "##" + file_suffix = f"section_{section_index:02d}.md" + + # 构建章节Markdown内容 + md_content = f"{level} {section.title}\n\n" + if section.content: + md_content += f"{section.content}\n\n" + + # 保存文件 + file_path = os.path.join(cls._get_report_folder(report_id), file_suffix) + with open(file_path, 'w', encoding='utf-8') as f: + f.write(md_content) + + logger.info(f"章节已保存: {report_id}/{file_suffix}") + return file_path + + @classmethod + def update_progress( + cls, + report_id: str, + status: str, + progress: int, + message: str, + current_section: str = None, + completed_sections: List[str] = None + ) -> None: + """ + 更新报告生成进度 + + 前端可以通过读取progress.json获取实时进度 + """ + cls._ensure_report_folder(report_id) + + progress_data = { + "status": status, + "progress": progress, + "message": message, + "current_section": current_section, + "completed_sections": completed_sections or [], + "updated_at": datetime.now().isoformat() + } + + with open(cls._get_progress_path(report_id), 'w', encoding='utf-8') as f: + json.dump(progress_data, f, ensure_ascii=False, indent=2) + + @classmethod + def get_progress(cls, report_id: str) -> Optional[Dict[str, Any]]: + """获取报告生成进度""" + path = cls._get_progress_path(report_id) + + if not os.path.exists(path): + return None + + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + + @classmethod + def get_generated_sections(cls, report_id: str) -> List[Dict[str, Any]]: + """ + 获取已生成的章节列表 + + 返回所有已保存的章节文件信息 + """ + folder = cls._get_report_folder(report_id) + + if not os.path.exists(folder): + return [] + + sections = [] + for filename in sorted(os.listdir(folder)): + if filename.startswith('section_') and filename.endswith('.md'): + file_path = os.path.join(folder, filename) + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # 从文件名解析章节索引 + parts = filename.replace('.md', '').split('_') + section_index = int(parts[1]) + subsection_index = int(parts[2]) if len(parts) > 2 else None + + sections.append({ + "filename": filename, + "section_index": section_index, + "subsection_index": subsection_index, + "content": content, + "is_subsection": subsection_index is not None + }) + + return sections + + @classmethod + def assemble_full_report(cls, report_id: str, outline: ReportOutline) -> str: + """ + 组装完整报告 + + 从已保存的章节文件组装完整报告 + """ + folder = cls._get_report_folder(report_id) + + # 构建报告头部 + md_content = f"# {outline.title}\n\n" + md_content += f"> {outline.summary}\n\n" + md_content += f"---\n\n" + + # 按顺序读取所有章节文件 + sections = cls.get_generated_sections(report_id) + for section_info in sections: + md_content += section_info["content"] + + # 保存完整报告 + full_path = cls._get_report_markdown_path(report_id) + with open(full_path, 'w', encoding='utf-8') as f: + f.write(md_content) + + logger.info(f"完整报告已组装: {report_id}") + return md_content + + @classmethod + def save_report(cls, report: Report) -> None: + """保存报告元信息和完整报告""" + cls._ensure_report_folder(report.report_id) + + # 保存元信息JSON + with open(cls._get_report_path(report.report_id), 'w', encoding='utf-8') as f: + json.dump(report.to_dict(), f, ensure_ascii=False, indent=2) + + # 保存大纲 + if report.outline: + cls.save_outline(report.report_id, report.outline) + + # 保存完整Markdown报告 + if report.markdown_content: + with open(cls._get_report_markdown_path(report.report_id), 'w', encoding='utf-8') as f: + f.write(report.markdown_content) + + logger.info(f"报告已保存: {report.report_id}") + + @classmethod + def get_report(cls, report_id: str) -> Optional[Report]: + """获取报告""" + path = cls._get_report_path(report_id) + + if not os.path.exists(path): + # 兼容旧格式:检查直接存储在reports目录下的文件 + old_path = os.path.join(cls.REPORTS_DIR, f"{report_id}.json") + if os.path.exists(old_path): + path = old_path + else: + return None + + with open(path, 'r', encoding='utf-8') as f: + data = json.load(f) + + # 重建Report对象 + outline = None + if data.get('outline'): + outline_data = data['outline'] + sections = [] + for s in outline_data.get('sections', []): + subsections = [ + ReportSection(title=sub['title'], content=sub.get('content', '')) + for sub in s.get('subsections', []) + ] + sections.append(ReportSection( + title=s['title'], + content=s.get('content', ''), + subsections=subsections + )) + outline = ReportOutline( + title=outline_data['title'], + summary=outline_data['summary'], + sections=sections + ) + + # 如果markdown_content为空,尝试从full_report.md读取 + markdown_content = data.get('markdown_content', '') + if not markdown_content: + full_report_path = cls._get_report_markdown_path(report_id) + if os.path.exists(full_report_path): + with open(full_report_path, 'r', encoding='utf-8') as f: + markdown_content = f.read() + + return Report( + report_id=data['report_id'], + simulation_id=data['simulation_id'], + graph_id=data['graph_id'], + simulation_requirement=data['simulation_requirement'], + status=ReportStatus(data['status']), + outline=outline, + markdown_content=markdown_content, + created_at=data.get('created_at', ''), + completed_at=data.get('completed_at', ''), + error=data.get('error') + ) + + @classmethod + def get_report_by_simulation(cls, simulation_id: str) -> Optional[Report]: + """根据模拟ID获取报告""" + cls._ensure_reports_dir() + + for item in os.listdir(cls.REPORTS_DIR): + item_path = os.path.join(cls.REPORTS_DIR, item) + # 新格式:文件夹 + if os.path.isdir(item_path): + report = cls.get_report(item) + if report and report.simulation_id == simulation_id: + return report + # 兼容旧格式:JSON文件 + elif item.endswith('.json'): + report_id = item[:-5] + report = cls.get_report(report_id) + if report and report.simulation_id == simulation_id: + return report + + return None + + @classmethod + def list_reports(cls, simulation_id: Optional[str] = None, limit: int = 50) -> List[Report]: + """列出报告""" + cls._ensure_reports_dir() + + reports = [] + for item in os.listdir(cls.REPORTS_DIR): + item_path = os.path.join(cls.REPORTS_DIR, item) + # 新格式:文件夹 + if os.path.isdir(item_path): + report = cls.get_report(item) + if report: + if simulation_id is None or report.simulation_id == simulation_id: + reports.append(report) + # 兼容旧格式:JSON文件 + elif item.endswith('.json'): + report_id = item[:-5] + report = cls.get_report(report_id) + if report: + if simulation_id is None or report.simulation_id == simulation_id: + reports.append(report) + + # 按创建时间倒序 + reports.sort(key=lambda r: r.created_at, reverse=True) + + return reports[:limit] + + @classmethod + def delete_report(cls, report_id: str) -> bool: + """删除报告(整个文件夹)""" + import shutil + + folder_path = cls._get_report_folder(report_id) + + # 新格式:删除整个文件夹 + if os.path.exists(folder_path) and os.path.isdir(folder_path): + shutil.rmtree(folder_path) + logger.info(f"报告文件夹已删除: {report_id}") + return True + + # 兼容旧格式:删除单独的文件 + deleted = False + old_json_path = os.path.join(cls.REPORTS_DIR, f"{report_id}.json") + old_md_path = os.path.join(cls.REPORTS_DIR, f"{report_id}.md") + + if os.path.exists(old_json_path): + os.remove(old_json_path) + deleted = True + if os.path.exists(old_md_path): + os.remove(old_md_path) + deleted = True + + return deleted diff --git a/backend/app/services/zep_tools.py b/backend/app/services/zep_tools.py new file mode 100644 index 0000000..166cc97 --- /dev/null +++ b/backend/app/services/zep_tools.py @@ -0,0 +1,621 @@ +""" +Zep检索工具服务 +封装图谱搜索、节点读取、边查询等工具,供Report Agent使用 +""" + +import time +from typing import Dict, Any, List, Optional +from dataclasses import dataclass + +from zep_cloud.client import Zep + +from ..config import Config +from ..utils.logger import get_logger + +logger = get_logger('mirofish.zep_tools') + + +@dataclass +class SearchResult: + """搜索结果""" + facts: List[str] + edges: List[Dict[str, Any]] + nodes: List[Dict[str, Any]] + query: str + total_count: int + + def to_dict(self) -> Dict[str, Any]: + return { + "facts": self.facts, + "edges": self.edges, + "nodes": self.nodes, + "query": self.query, + "total_count": self.total_count + } + + def to_text(self) -> str: + """转换为文本格式,供LLM理解""" + text_parts = [f"搜索查询: {self.query}", f"找到 {self.total_count} 条相关信息"] + + if self.facts: + text_parts.append("\n### 相关事实:") + for i, fact in enumerate(self.facts, 1): + text_parts.append(f"{i}. {fact}") + + return "\n".join(text_parts) + + +@dataclass +class NodeInfo: + """节点信息""" + uuid: str + name: str + labels: List[str] + summary: str + attributes: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + return { + "uuid": self.uuid, + "name": self.name, + "labels": self.labels, + "summary": self.summary, + "attributes": self.attributes + } + + def to_text(self) -> str: + """转换为文本格式""" + entity_type = next((l for l in self.labels if l not in ["Entity", "Node"]), "未知类型") + return f"实体: {self.name} (类型: {entity_type})\n摘要: {self.summary}" + + +@dataclass +class EdgeInfo: + """边信息""" + uuid: str + name: str + fact: str + source_node_uuid: str + target_node_uuid: str + source_node_name: Optional[str] = None + target_node_name: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "uuid": self.uuid, + "name": self.name, + "fact": self.fact, + "source_node_uuid": self.source_node_uuid, + "target_node_uuid": self.target_node_uuid, + "source_node_name": self.source_node_name, + "target_node_name": self.target_node_name + } + + def to_text(self) -> str: + """转换为文本格式""" + source = self.source_node_name or self.source_node_uuid[:8] + target = self.target_node_name or self.target_node_uuid[:8] + return f"关系: {source} --[{self.name}]--> {target}\n事实: {self.fact}" + + +class ZepToolsService: + """ + Zep检索工具服务 + + 提供多种图谱检索工具,可以被Report Agent调用: + 1. search_graph - 图谱语义搜索 + 2. get_all_nodes - 获取图谱所有节点 + 3. get_all_edges - 获取图谱所有边 + 4. get_node_detail - 获取节点详细信息 + 5. get_node_edges - 获取节点相关的边 + 6. get_entities_by_type - 按类型获取实体 + 7. get_entity_summary - 获取实体的关系摘要 + """ + + # 重试配置 + MAX_RETRIES = 3 + RETRY_DELAY = 2.0 + + def __init__(self, api_key: Optional[str] = None): + self.api_key = api_key or Config.ZEP_API_KEY + if not self.api_key: + raise ValueError("ZEP_API_KEY 未配置") + + self.client = Zep(api_key=self.api_key) + logger.info("ZepToolsService 初始化完成") + + def _call_with_retry(self, func, operation_name: str, max_retries: int = None): + """带重试机制的API调用""" + max_retries = max_retries or self.MAX_RETRIES + last_exception = None + delay = self.RETRY_DELAY + + for attempt in range(max_retries): + try: + return func() + except Exception as e: + last_exception = e + if attempt < max_retries - 1: + logger.warning( + f"Zep {operation_name} 第 {attempt + 1} 次尝试失败: {str(e)[:100]}, " + f"{delay:.1f}秒后重试..." + ) + time.sleep(delay) + delay *= 2 + else: + logger.error(f"Zep {operation_name} 在 {max_retries} 次尝试后仍失败: {str(e)}") + + raise last_exception + + def search_graph( + self, + graph_id: str, + query: str, + limit: int = 10, + scope: str = "edges" + ) -> SearchResult: + """ + 图谱语义搜索 + + 使用混合搜索(语义+BM25)在图谱中搜索相关信息。 + 如果Zep Cloud的search API不可用,则降级为本地关键词匹配。 + + Args: + graph_id: 图谱ID (Standalone Graph) + query: 搜索查询 + limit: 返回结果数量 + scope: 搜索范围,"edges" 或 "nodes" + + Returns: + SearchResult: 搜索结果 + """ + logger.info(f"图谱搜索: graph_id={graph_id}, query={query[:50]}...") + + # 尝试使用Zep Cloud Search API + try: + search_results = self._call_with_retry( + func=lambda: self.client.graph.search( + graph_id=graph_id, + query=query, + limit=limit, + scope=scope, + reranker="cross_encoder" + ), + operation_name=f"图谱搜索(graph={graph_id})" + ) + + facts = [] + edges = [] + nodes = [] + + # 解析边搜索结果 + if hasattr(search_results, 'edges') and search_results.edges: + for edge in search_results.edges: + if hasattr(edge, 'fact') and edge.fact: + facts.append(edge.fact) + edges.append({ + "uuid": getattr(edge, 'uuid_', None) or getattr(edge, 'uuid', ''), + "name": getattr(edge, 'name', ''), + "fact": getattr(edge, 'fact', ''), + "source_node_uuid": getattr(edge, 'source_node_uuid', ''), + "target_node_uuid": getattr(edge, 'target_node_uuid', ''), + }) + + # 解析节点搜索结果 + if hasattr(search_results, 'nodes') and search_results.nodes: + for node in search_results.nodes: + nodes.append({ + "uuid": getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''), + "name": getattr(node, 'name', ''), + "labels": getattr(node, 'labels', []), + "summary": getattr(node, 'summary', ''), + }) + # 节点摘要也算作事实 + if hasattr(node, 'summary') and node.summary: + facts.append(f"[{node.name}]: {node.summary}") + + logger.info(f"搜索完成: 找到 {len(facts)} 条相关事实") + + return SearchResult( + facts=facts, + edges=edges, + nodes=nodes, + query=query, + total_count=len(facts) + ) + + except Exception as e: + logger.warning(f"Zep Search API失败,降级为本地搜索: {str(e)}") + # 降级:使用本地关键词匹配搜索 + return self._local_search(graph_id, query, limit, scope) + + def _local_search( + self, + graph_id: str, + query: str, + limit: int = 10, + scope: str = "edges" + ) -> SearchResult: + """ + 本地关键词匹配搜索(作为Zep Search API的降级方案) + + 获取所有边/节点,然后在本地进行关键词匹配 + + Args: + graph_id: 图谱ID + query: 搜索查询 + limit: 返回结果数量 + scope: 搜索范围 + + Returns: + SearchResult: 搜索结果 + """ + logger.info(f"使用本地搜索: query={query[:30]}...") + + facts = [] + edges_result = [] + nodes_result = [] + + # 提取查询关键词(简单分词) + query_lower = query.lower() + keywords = [w.strip() for w in query_lower.replace(',', ' ').replace(',', ' ').split() if len(w.strip()) > 1] + + def match_score(text: str) -> int: + """计算文本与查询的匹配分数""" + if not text: + return 0 + text_lower = text.lower() + # 完全匹配查询 + if query_lower in text_lower: + return 100 + # 关键词匹配 + score = 0 + for keyword in keywords: + if keyword in text_lower: + score += 10 + return score + + try: + if scope in ["edges", "both"]: + # 获取所有边并匹配 + all_edges = self.get_all_edges(graph_id) + scored_edges = [] + for edge in all_edges: + score = match_score(edge.fact) + match_score(edge.name) + if score > 0: + scored_edges.append((score, edge)) + + # 按分数排序 + scored_edges.sort(key=lambda x: x[0], reverse=True) + + for score, edge in scored_edges[:limit]: + if edge.fact: + facts.append(edge.fact) + edges_result.append({ + "uuid": edge.uuid, + "name": edge.name, + "fact": edge.fact, + "source_node_uuid": edge.source_node_uuid, + "target_node_uuid": edge.target_node_uuid, + }) + + if scope in ["nodes", "both"]: + # 获取所有节点并匹配 + all_nodes = self.get_all_nodes(graph_id) + scored_nodes = [] + for node in all_nodes: + score = match_score(node.name) + match_score(node.summary) + if score > 0: + scored_nodes.append((score, node)) + + scored_nodes.sort(key=lambda x: x[0], reverse=True) + + for score, node in scored_nodes[:limit]: + nodes_result.append({ + "uuid": node.uuid, + "name": node.name, + "labels": node.labels, + "summary": node.summary, + }) + if node.summary: + facts.append(f"[{node.name}]: {node.summary}") + + logger.info(f"本地搜索完成: 找到 {len(facts)} 条相关事实") + + except Exception as e: + logger.error(f"本地搜索失败: {str(e)}") + + return SearchResult( + facts=facts, + edges=edges_result, + nodes=nodes_result, + query=query, + total_count=len(facts) + ) + + def get_all_nodes(self, graph_id: str) -> List[NodeInfo]: + """ + 获取图谱的所有节点 + + Args: + graph_id: 图谱ID + + Returns: + 节点列表 + """ + logger.info(f"获取图谱 {graph_id} 的所有节点...") + + nodes = self._call_with_retry( + func=lambda: self.client.graph.node.get_by_graph_id(graph_id=graph_id), + operation_name=f"获取节点(graph={graph_id})" + ) + + result = [] + for node in nodes: + result.append(NodeInfo( + uuid=getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''), + name=node.name or "", + labels=node.labels or [], + summary=node.summary or "", + attributes=node.attributes or {} + )) + + logger.info(f"获取到 {len(result)} 个节点") + return result + + def get_all_edges(self, graph_id: str) -> List[EdgeInfo]: + """ + 获取图谱的所有边 + + Args: + graph_id: 图谱ID + + Returns: + 边列表 + """ + logger.info(f"获取图谱 {graph_id} 的所有边...") + + edges = self._call_with_retry( + func=lambda: self.client.graph.edge.get_by_graph_id(graph_id=graph_id), + operation_name=f"获取边(graph={graph_id})" + ) + + result = [] + for edge in edges: + result.append(EdgeInfo( + uuid=getattr(edge, 'uuid_', None) or getattr(edge, 'uuid', ''), + name=edge.name or "", + fact=edge.fact or "", + source_node_uuid=edge.source_node_uuid or "", + target_node_uuid=edge.target_node_uuid or "" + )) + + logger.info(f"获取到 {len(result)} 条边") + return result + + def get_node_detail(self, node_uuid: str) -> Optional[NodeInfo]: + """ + 获取单个节点的详细信息 + + Args: + node_uuid: 节点UUID + + Returns: + 节点信息或None + """ + logger.info(f"获取节点详情: {node_uuid[:8]}...") + + try: + node = self._call_with_retry( + func=lambda: self.client.graph.node.get(uuid_=node_uuid), + operation_name=f"获取节点详情(uuid={node_uuid[:8]}...)" + ) + + if not node: + return None + + return NodeInfo( + uuid=getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''), + name=node.name or "", + labels=node.labels or [], + summary=node.summary or "", + attributes=node.attributes or {} + ) + except Exception as e: + logger.error(f"获取节点详情失败: {str(e)}") + return None + + def get_node_edges(self, graph_id: str, node_uuid: str) -> List[EdgeInfo]: + """ + 获取节点相关的所有边 + + 通过获取图谱所有边,然后过滤出与指定节点相关的边 + + Args: + graph_id: 图谱ID + node_uuid: 节点UUID + + Returns: + 边列表 + """ + logger.info(f"获取节点 {node_uuid[:8]}... 的相关边") + + try: + # 获取图谱所有边,然后过滤 + all_edges = self.get_all_edges(graph_id) + + result = [] + for edge in all_edges: + # 检查边是否与指定节点相关(作为源或目标) + if edge.source_node_uuid == node_uuid or edge.target_node_uuid == node_uuid: + result.append(edge) + + logger.info(f"找到 {len(result)} 条与节点相关的边") + return result + + except Exception as e: + logger.warning(f"获取节点边失败: {str(e)}") + return [] + + def get_entities_by_type( + self, + graph_id: str, + entity_type: str + ) -> List[NodeInfo]: + """ + 按类型获取实体 + + Args: + graph_id: 图谱ID + entity_type: 实体类型(如 Student, PublicFigure 等) + + Returns: + 符合类型的实体列表 + """ + logger.info(f"获取类型为 {entity_type} 的实体...") + + all_nodes = self.get_all_nodes(graph_id) + + filtered = [] + for node in all_nodes: + # 检查labels是否包含指定类型 + if entity_type in node.labels: + filtered.append(node) + + logger.info(f"找到 {len(filtered)} 个 {entity_type} 类型的实体") + return filtered + + def get_entity_summary( + self, + graph_id: str, + entity_name: str + ) -> Dict[str, Any]: + """ + 获取指定实体的关系摘要 + + 搜索与该实体相关的所有信息,并生成摘要 + + Args: + graph_id: 图谱ID + entity_name: 实体名称 + + Returns: + 实体摘要信息 + """ + logger.info(f"获取实体 {entity_name} 的关系摘要...") + + # 先搜索该实体相关的信息 + search_result = self.search_graph( + graph_id=graph_id, + query=entity_name, + limit=20 + ) + + # 尝试在所有节点中找到该实体 + all_nodes = self.get_all_nodes(graph_id) + entity_node = None + for node in all_nodes: + if node.name.lower() == entity_name.lower(): + entity_node = node + break + + related_edges = [] + if entity_node: + # 传入graph_id参数 + related_edges = self.get_node_edges(graph_id, entity_node.uuid) + + return { + "entity_name": entity_name, + "entity_info": entity_node.to_dict() if entity_node else None, + "related_facts": search_result.facts, + "related_edges": [e.to_dict() for e in related_edges], + "total_relations": len(related_edges) + } + + def get_graph_statistics(self, graph_id: str) -> Dict[str, Any]: + """ + 获取图谱的统计信息 + + Args: + graph_id: 图谱ID + + Returns: + 统计信息 + """ + logger.info(f"获取图谱 {graph_id} 的统计信息...") + + nodes = self.get_all_nodes(graph_id) + edges = self.get_all_edges(graph_id) + + # 统计实体类型分布 + entity_types = {} + for node in nodes: + for label in node.labels: + if label not in ["Entity", "Node"]: + entity_types[label] = entity_types.get(label, 0) + 1 + + # 统计关系类型分布 + relation_types = {} + for edge in edges: + relation_types[edge.name] = relation_types.get(edge.name, 0) + 1 + + return { + "graph_id": graph_id, + "total_nodes": len(nodes), + "total_edges": len(edges), + "entity_types": entity_types, + "relation_types": relation_types + } + + def get_simulation_context( + self, + graph_id: str, + simulation_requirement: str, + limit: int = 30 + ) -> Dict[str, Any]: + """ + 获取模拟相关的上下文信息 + + 综合搜索与模拟需求相关的所有信息 + + Args: + graph_id: 图谱ID + simulation_requirement: 模拟需求描述 + limit: 每类信息的数量限制 + + Returns: + 模拟上下文信息 + """ + logger.info(f"获取模拟上下文: {simulation_requirement[:50]}...") + + # 搜索与模拟需求相关的信息 + search_result = self.search_graph( + graph_id=graph_id, + query=simulation_requirement, + limit=limit + ) + + # 获取图谱统计 + stats = self.get_graph_statistics(graph_id) + + # 获取所有实体节点 + all_nodes = self.get_all_nodes(graph_id) + + # 筛选有实际类型的实体(非纯Entity节点) + entities = [] + for node in all_nodes: + custom_labels = [l for l in node.labels if l not in ["Entity", "Node"]] + if custom_labels: + entities.append({ + "name": node.name, + "type": custom_labels[0], + "summary": node.summary + }) + + return { + "simulation_requirement": simulation_requirement, + "related_facts": search_result.facts, + "graph_statistics": stats, + "entities": entities[:limit], # 限制数量 + "total_entities": len(entities) + } diff --git a/backend/requirements.txt b/backend/requirements.txt index 1a0d82d..80cc5fa 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -24,3 +24,8 @@ werkzeug>=3.0.0 oasis-ai>=0.1.0 camel-ai>=0.2.0 +# LangChain框架(用于Report Agent) +langchain>=0.2.0 +langchain-core>=0.2.0 +langchain-openai>=0.1.0 +