From 5ece3f670bef791884f21677fe8d69e81abe1b3c Mon Sep 17 00:00:00 2001
From: 666ghj <670939375@qq.com>
Date: Tue, 9 Dec 2025 15:10:55 +0800
Subject: [PATCH] Implement Report Agent for automated report generation and
interaction
- Introduced the Report Agent module to facilitate the automatic generation of simulation analysis reports using LangChain and Zep, following the ReACT model.
- Added functionality for report outline planning, segmented content generation, and user interaction through a dialogue interface.
- Implemented new API endpoints for report generation, status checking, and retrieval, enhancing the overall reporting capabilities.
- Updated README.md to include detailed instructions on the new report generation features and API usage.
- Enhanced the project structure to accommodate the new report management functionalities, including report storage and retrieval mechanisms.
---
backend/README.md | 693 +++++++++++++-
backend/app/__init__.py | 3 +-
backend/app/api/__init__.py | 2 +
backend/app/api/report.py | 829 ++++++++++++++++
backend/app/config.py | 5 +
backend/app/services/report_agent.py | 1296 ++++++++++++++++++++++++++
backend/app/services/zep_tools.py | 621 ++++++++++++
backend/requirements.txt | 5 +
8 files changed, 3445 insertions(+), 9 deletions(-)
create mode 100644 backend/app/api/report.py
create mode 100644 backend/app/services/report_agent.py
create mode 100644 backend/app/services/zep_tools.py
diff --git a/backend/README.md b/backend/README.md
index 72ece0c..44d437e 100644
--- a/backend/README.md
+++ b/backend/README.md
@@ -28,6 +28,8 @@
4. **模拟配置智能生成**: 使用 LLM 根据需求自动生成模拟参数(时间、活跃度、事件等)
5. **双平台模拟**: 支持 Twitter 和 Reddit 双平台并行舆论模拟(基于 OASIS 框架)
6. **图谱记忆动态更新**: 可选功能,将模拟中Agent的活动实时更新到Zep图谱,让图谱"记住"模拟过程
+7. **智能报告生成**: 使用 LangChain + Zep 实现 ReACT 模式的模拟分析报告自动生成
+8. **Report Agent对话**: 报告生成后可与Report Agent对话,自主调用检索工具回答问题
---
@@ -42,10 +44,11 @@
│ │ API层 │ │ 服务层 │ │ 模型层 │ │
│ │ - graph.py │→ │ - 本体生成 │→ │ - Project │ │
│ │ - simulation │ │ - 图谱构建 │ │ - Task │ │
-│ └────────────────┘ │ - 实体读取 │ └─────────────────┘ │
-│ │ - 人设生成 │ │
+│ │ - report.py │ │ - 实体读取 │ │ - Report │ │
+│ └────────────────┘ │ - 人设生成 │ └─────────────────┘ │
│ │ - 配置生成 │ │
│ │ - 模拟运行 │ │
+│ │ - 报告生成 │ │
│ └──────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 外部服务集成 │
@@ -78,6 +81,16 @@
模拟完成 → 环境进入等待模式 → 发送Interview命令 → Agent回答 → 获取结果 → (可选)关闭环境
```
+5. **报告生成流程**:
+ ```
+ 模拟完成 → 调用Report API → ReACT规划大纲 → 逐章节生成(多次工具调用) → 生成Markdown报告 → 解锁Interview功能
+ ```
+
+6. **Report Agent对话流程**:
+ ```
+ 用户提问 → Agent分析 → 调用Zep检索工具 → 整合信息 → 返回回答
+ ```
+
---
## 技术栈
@@ -89,6 +102,7 @@
### AI & 知识图谱
- **Zep Cloud SDK 2.0+**: 知识图谱构建与管理
- **OpenAI SDK 1.0+**: LLM 调用(支持 OpenAI 兼容接口)
+- **LangChain 0.2+**: Report Agent框架(ReACT模式)
- **OASIS-AI**: 社交媒体模拟框架
- **CAMEL-AI**: Agent 行为模拟
@@ -117,6 +131,10 @@ backend/
│ │ ├── project.json # 项目元数据
│ │ ├── files/ # 上传的文件
│ │ └── extracted_text.txt # 提取的文本
+│ ├── reports/ # 报告数据
+│ │ └── report_xxx/
+│ │ ├── report_xxx.json # 报告元数据
+│ │ └── report_xxx.md # Markdown报告
│ └── simulations/ # 模拟数据
│ └── sim_xxx/
│ ├── state.json # 模拟状态
@@ -142,7 +160,8 @@ backend/
├── api/ # API路由
│ ├── __init__.py
│ ├── graph.py # 图谱相关接口
- │ └── simulation.py # 模拟相关接口
+ │ ├── simulation.py # 模拟相关接口
+ │ └── report.py # 报告相关接口
├── models/ # 数据模型
│ ├── __init__.py
│ ├── project.py # 项目模型
@@ -153,12 +172,14 @@ backend/
│ ├── graph_builder.py # 图谱构建
│ ├── text_processor.py # 文本处理
│ ├── zep_entity_reader.py # 实体读取
+ │ ├── zep_tools.py # Zep检索工具服务
│ ├── oasis_profile_generator.py # 人设生成
│ ├── simulation_config_generator.py # 配置生成
│ ├── simulation_manager.py # 模拟管理
│ ├── simulation_runner.py # 模拟运行
│ ├── simulation_ipc.py # 模拟IPC通信(Interview功能)
- │ └── zep_graph_memory_updater.py # 图谱记忆动态更新
+ │ ├── zep_graph_memory_updater.py # 图谱记忆动态更新
+ │ └── report_agent.py # 报告生成Agent(ReACT模式)
└── utils/ # 工具类
├── __init__.py
├── file_parser.py # 文件解析
@@ -240,6 +261,55 @@ backend/
- `SimulationIPCClient`: IPC客户端(Flask端使用)
- `SimulationIPCServer`: IPC服务器(模拟脚本端使用)
+### 5. Report Agent模块(报告生成)
+
+**功能**: 模拟完成后自动生成分析报告,支持与用户对话
+
+**特点**:
+- **ReACT模式**: Reasoning + Acting,多轮思考与工具调用
+- **大纲规划**: LLM分析模拟需求,自动规划报告目录结构
+- **分段生成**: 逐章节生成,每章节可多次调用Zep检索工具
+- **Markdown输出**: 生成专业的Markdown格式报告
+- **对话功能**: 报告完成后可与Report Agent对话,自主调用工具回答问题
+
+**工具(MCP封装)**:
+- `search_graph`: 图谱语义搜索
+- `get_graph_statistics`: 获取图谱统计信息
+- `get_entity_summary`: 获取实体关系摘要
+- `get_simulation_context`: 获取模拟上下文
+- `get_entities_by_type`: 按类型获取实体
+
+**核心服务**:
+- `ZepToolsService`: Zep检索工具封装
+- `ReportAgent`: 报告生成Agent(ReACT模式)
+- `ReportManager`: 报告持久化管理
+
+**工作原理**:
+```
+┌─────────────────────────────────────────────────────────────┐
+│ Report Agent (ReACT) │
+├─────────────────────────────────────────────────────────────┤
+│ │
+│ 1. 规划阶段 │
+│ ┌─────────────────────────────────────────────────┐ │
+│ │ LLM分析模拟需求 → 获取图谱上下文 → 生成报告大纲 │ │
+│ └─────────────────────────────────────────────────┘ │
+│ │
+│ 2. 生成阶段 (每章节) │
+│ ┌─────────────────────────────────────────────────┐ │
+│ │ Thought → Action → Observation → ... → Final │ │
+│ │ ↓ ↓ ↓ │ │
+│ │ 分析需求 调用工具 分析结果 生成内容 │ │
+│ └─────────────────────────────────────────────────┘ │
+│ │
+│ 3. 输出阶段 │
+│ ┌─────────────────────────────────────────────────┐ │
+│ │ 组装章节 → 生成Markdown → 保存JSON/MD文件 │ │
+│ └─────────────────────────────────────────────────┘ │
+│ │
+└─────────────────────────────────────────────────────────────┘
+```
+
**工作原理**:
```
Flask后端 模拟脚本
@@ -964,6 +1034,245 @@ Flask后端 模拟脚本
- `/stop`: 强制终止模拟进程
- `/close-env`: 优雅地关闭环境,让模拟进程正常退出
+---
+
+### Report 报告接口
+
+> **说明**: 报告生成完成后才能解锁Interview功能。Report Agent使用ReACT模式,可以在对话中自主调用Zep检索工具。
+
+#### 1. 生成报告
+
+**接口**: `POST /api/report/generate`
+
+**请求参数**:
+```json
+{
+ "simulation_id": "sim_xxxx",
+ "force_regenerate": false
+}
+```
+
+| 参数 | 类型 | 必填 | 默认值 | 说明 |
+|------|------|------|--------|------|
+| simulation_id | String | 是 | - | 模拟ID |
+| force_regenerate | Boolean | 否 | false | 强制重新生成 |
+
+**返回示例**:
+```json
+{
+ "success": true,
+ "data": {
+ "simulation_id": "sim_xxxx",
+ "task_id": "task_xxxx",
+ "status": "generating",
+ "message": "报告生成任务已启动",
+ "already_generated": false
+ }
+}
+```
+
+**如果报告已存在**:
+```json
+{
+ "success": true,
+ "data": {
+ "simulation_id": "sim_xxxx",
+ "report_id": "report_xxxx",
+ "status": "completed",
+ "message": "报告已存在",
+ "already_generated": true
+ }
+}
+```
+
+---
+
+#### 2. 查询生成进度
+
+**接口**: `POST /api/report/generate/status`
+
+**请求参数**:
+```json
+{
+ "task_id": "task_xxxx",
+ "simulation_id": "sim_xxxx"
+}
+```
+
+**返回示例**:
+```json
+{
+ "success": true,
+ "data": {
+ "task_id": "task_xxxx",
+ "status": "processing",
+ "progress": 45,
+ "message": "[generating] 正在生成章节: 关键发现 (3/5)"
+ }
+}
+```
+
+---
+
+#### 3. 获取报告
+
+**接口**: `GET /api/report/{report_id}`
+
+**返回示例**:
+```json
+{
+ "success": true,
+ "data": {
+ "report_id": "report_xxxx",
+ "simulation_id": "sim_xxxx",
+ "graph_id": "mirofish_xxxx",
+ "simulation_requirement": "模拟武汉大学撤销处分后的舆情走向",
+ "status": "completed",
+ "outline": {
+ "title": "武汉大学撤销处分事件舆情分析报告",
+ "summary": "基于模拟结果的全面舆情分析",
+ "sections": [
+ {"title": "执行摘要", "content": "..."},
+ {"title": "模拟背景", "content": "..."},
+ {"title": "关键发现", "content": "..."},
+ {"title": "舆情分析", "content": "..."},
+ {"title": "建议与展望", "content": "..."}
+ ]
+ },
+ "markdown_content": "# 武汉大学撤销处分事件舆情分析报告\n\n...",
+ "created_at": "2025-12-09T10:00:00",
+ "completed_at": "2025-12-09T10:05:00"
+ }
+}
+```
+
+---
+
+#### 4. 根据模拟ID获取报告
+
+**接口**: `GET /api/report/by-simulation/{simulation_id}`
+
+**返回示例**:
+```json
+{
+ "success": true,
+ "data": {...},
+ "has_report": true
+}
+```
+
+---
+
+#### 5. 下载报告
+
+**接口**: `GET /api/report/{report_id}/download`
+
+**返回**: Markdown文件下载
+
+---
+
+#### 6. 与Report Agent对话
+
+**接口**: `POST /api/report/chat`
+
+**请求参数**:
+```json
+{
+ "simulation_id": "sim_xxxx",
+ "message": "请详细解释一下舆情的主要趋势",
+ "chat_history": [
+ {"role": "user", "content": "报告提到了哪些关键人物?"},
+ {"role": "assistant", "content": "根据分析,关键人物包括..."}
+ ]
+}
+```
+
+| 参数 | 类型 | 必填 | 默认值 | 说明 |
+|------|------|------|--------|------|
+| simulation_id | String | 是 | - | 模拟ID |
+| message | String | 是 | - | 用户消息 |
+| chat_history | Array | 否 | [] | 对话历史(用于上下文) |
+
+**返回示例**:
+```json
+{
+ "success": true,
+ "data": {
+ "response": "根据模拟数据分析,舆情的主要趋势表现为...\n\n1. **初期阶段**:...\n2. **发酵阶段**:...\n3. **高峰阶段**:...",
+ "tool_calls": [
+ {"name": "search_graph", "parameters": {"query": "舆情趋势"}},
+ {"name": "get_graph_statistics", "parameters": {}}
+ ],
+ "sources": []
+ }
+}
+```
+
+---
+
+#### 7. 检查报告状态
+
+**接口**: `GET /api/report/check/{simulation_id}`
+
+**用途**: 判断是否解锁Interview功能
+
+**返回示例**:
+```json
+{
+ "success": true,
+ "data": {
+ "simulation_id": "sim_xxxx",
+ "has_report": true,
+ "report_status": "completed",
+ "report_id": "report_xxxx",
+ "interview_unlocked": true
+ }
+}
+```
+
+---
+
+#### 8. 列出所有报告
+
+**接口**: `GET /api/report/list?simulation_id=sim_xxxx&limit=50`
+
+**返回示例**:
+```json
+{
+ "success": true,
+ "data": [...],
+ "count": 5
+}
+```
+
+---
+
+#### 9. 删除报告
+
+**接口**: `DELETE /api/report/{report_id}`
+
+---
+
+#### 10. 工具调试接口
+
+**图谱搜索**: `POST /api/report/tools/search`
+```json
+{
+ "graph_id": "mirofish_xxxx",
+ "query": "舆情走向",
+ "limit": 10
+}
+```
+
+**图谱统计**: `POST /api/report/tools/statistics`
+```json
+{
+ "graph_id": "mirofish_xxxx"
+}
+```
+
+---
+
#### 6. 获取运行状态
**接口**: `GET /api/simulation/{simulation_id}/run-status`
@@ -1192,7 +1501,50 @@ created_at: str # 创建时间
---
-### 6. SimulationParameters (模拟参数)
+### 6. Report (报告模型)
+
+**文件**: `app/services/report_agent.py`
+
+**字段**:
+```python
+report_id: str # 报告ID (report_xxx)
+simulation_id: str # 模拟ID
+graph_id: str # 图谱ID
+simulation_requirement: str # 模拟需求
+status: ReportStatus # 状态
+outline: ReportOutline # 报告大纲
+markdown_content: str # Markdown内容
+created_at: str # 创建时间
+completed_at: str # 完成时间
+error: str # 错误信息
+```
+
+**状态枚举**:
+```python
+PENDING = "pending" # 等待中
+PLANNING = "planning" # 规划大纲中
+GENERATING = "generating" # 生成内容中
+COMPLETED = "completed" # 已完成
+FAILED = "failed" # 失败
+```
+
+**ReportOutline字段**:
+```python
+title: str # 报告标题
+summary: str # 报告摘要
+sections: List[ReportSection] # 章节列表
+```
+
+**ReportSection字段**:
+```python
+title: str # 章节标题
+content: str # 章节内容
+subsections: List[ReportSection] # 子章节
+```
+
+---
+
+### 7. SimulationParameters (模拟参数)
**文件**: `app/services/simulation_config_generator.py`
@@ -1815,6 +2167,270 @@ result = SimulationRunner.interview_all_agents(
---
+### 10. ZepToolsService (Zep检索工具服务)
+
+**文件**: `app/services/zep_tools.py`
+
+**功能**: 封装多种Zep图谱检索工具,供Report Agent调用
+
+**核心方法**:
+
+```python
+def search_graph(
+ graph_id: str,
+ query: str,
+ limit: int = 10
+) -> SearchResult:
+ """
+ 图谱语义搜索
+
+ 使用混合搜索(语义+BM25)查找相关信息
+ 返回: facts列表、edges列表、nodes列表
+ """
+
+def get_all_nodes(graph_id: str) -> List[NodeInfo]:
+ """获取图谱所有节点"""
+
+def get_all_edges(graph_id: str) -> List[EdgeInfo]:
+ """获取图谱所有边"""
+
+def get_node_detail(node_uuid: str) -> Optional[NodeInfo]:
+ """获取单个节点详情"""
+
+def get_node_edges(node_uuid: str) -> List[EdgeInfo]:
+ """获取节点相关的边"""
+
+def get_entities_by_type(
+ graph_id: str,
+ entity_type: str
+) -> List[NodeInfo]:
+ """按类型获取实体"""
+
+def get_entity_summary(
+ graph_id: str,
+ entity_name: str
+) -> Dict[str, Any]:
+ """获取实体关系摘要"""
+
+def get_graph_statistics(graph_id: str) -> Dict[str, Any]:
+ """
+ 获取图谱统计信息
+
+ 返回:
+ - total_nodes: 节点总数
+ - total_edges: 边总数
+ - entity_types: 实体类型分布
+ - relation_types: 关系类型分布
+ """
+
+def get_simulation_context(
+ graph_id: str,
+ simulation_requirement: str,
+ limit: int = 30
+) -> Dict[str, Any]:
+ """
+ 获取模拟相关上下文
+
+ 综合搜索与模拟需求相关的所有信息
+ """
+```
+
+**容错机制**:
+- 所有API调用带3次重试
+- 指数退避策略
+- 搜索失败返回空结果而非抛出异常
+
+---
+
+### 11. ReportAgent (报告生成Agent)
+
+**文件**: `app/services/report_agent.py`
+
+**功能**: 使用ReACT模式生成模拟分析报告
+
+**核心类**:
+
+```python
+class ReportAgent:
+ """
+ Report Agent - 模拟报告生成Agent
+
+ 采用ReACT(Reasoning + Acting)模式:
+ 1. 规划阶段:分析模拟需求,规划报告目录结构
+ 2. 生成阶段:逐章节生成内容,每章节可多次调用工具获取信息
+ 3. 对话阶段:支持与用户对话,自主调用检索工具
+ """
+
+ # 配置
+ MAX_TOOL_CALLS_PER_SECTION = 5 # 每章节最大工具调用次数
+ MAX_REFLECTION_ROUNDS = 2 # 最大反思轮数
+```
+
+**核心方法**:
+
+```python
+def plan_outline(
+ progress_callback: Optional[Callable] = None
+) -> ReportOutline:
+ """
+ 规划报告大纲
+
+ 步骤:
+ 1. 获取模拟上下文(图谱统计、相关事实)
+ 2. 使用LLM分析并生成大纲结构
+ 3. 返回包含章节列表的大纲对象
+ """
+
+def _generate_section_react(
+ section: ReportSection,
+ outline: ReportOutline,
+ previous_sections: List[str],
+ progress_callback: Optional[Callable] = None
+) -> str:
+ """
+ 使用ReACT模式生成单个章节
+
+ ReACT循环:
+ 1. Thought(思考)- 分析需要什么信息
+ 2. Action(行动)- 调用工具获取信息
+ 3. Observation(观察)- 分析工具返回结果
+ 4. 重复直到信息足够或达到最大次数
+ 5. Final Answer(最终回答)- 生成章节内容
+ """
+
+def generate_report(
+ progress_callback: Optional[Callable] = None
+) -> Report:
+ """
+ 生成完整报告
+
+ 步骤:
+ 1. 规划大纲
+ 2. 逐章节生成(ReACT模式)
+ 3. 组装Markdown报告
+ 4. 保存报告文件
+ """
+
+def chat(
+ message: str,
+ chat_history: List[Dict[str, str]] = None
+) -> Dict[str, Any]:
+ """
+ 与Report Agent对话
+
+ 在对话中Agent可以自主调用检索工具来回答问题
+
+ Returns:
+ {
+ "response": "Agent回复",
+ "tool_calls": [调用的工具列表],
+ "sources": [信息来源]
+ }
+ """
+```
+
+**工具调用格式**:
+
+Agent使用以下格式调用工具:
+
+```
+
+{"name": "search_graph", "parameters": {"query": "舆情走向", "limit": 10}}
+
+```
+
+或者:
+
+```
+[TOOL_CALL] search_graph(query="舆情走向", limit="10")
+```
+
+**报告大纲结构示例**:
+
+```json
+{
+ "title": "武汉大学撤销处分事件舆情分析报告",
+ "summary": "基于模拟结果的全面舆情分析",
+ "sections": [
+ {
+ "title": "执行摘要",
+ "description": "简要总结模拟结果和关键发现"
+ },
+ {
+ "title": "模拟背景",
+ "description": "描述模拟的初始条件和场景设定"
+ },
+ {
+ "title": "关键发现",
+ "description": "分析模拟中的重要发现和趋势"
+ },
+ {
+ "title": "舆情分析",
+ "description": "分析舆论走向、情绪变化、关键意见领袖"
+ },
+ {
+ "title": "影响评估",
+ "description": "评估事件的影响范围和程度"
+ },
+ {
+ "title": "建议与展望",
+ "description": "基于分析结果提出建议"
+ }
+ ]
+}
+```
+
+---
+
+### 12. ReportManager (报告管理器)
+
+**文件**: `app/services/report_agent.py`
+
+**功能**: 报告的持久化存储和检索
+
+**核心方法**:
+
+```python
+@classmethod
+def save_report(cls, report: Report) -> None:
+ """
+ 保存报告
+
+ 同时保存:
+ - JSON文件(报告元数据)
+ - Markdown文件(报告内容)
+ """
+
+@classmethod
+def get_report(cls, report_id: str) -> Optional[Report]:
+ """获取报告"""
+
+@classmethod
+def get_report_by_simulation(cls, simulation_id: str) -> Optional[Report]:
+ """根据模拟ID获取报告"""
+
+@classmethod
+def list_reports(
+ cls,
+ simulation_id: Optional[str] = None,
+ limit: int = 50
+) -> List[Report]:
+ """列出报告"""
+
+@classmethod
+def delete_report(cls, report_id: str) -> bool:
+ """删除报告"""
+```
+
+**存储结构**:
+```
+uploads/reports/
+├── report_abc123.json # 报告元数据
+└── report_abc123.md # Markdown报告
+```
+
+---
+
## 工具类
### 1. FileParser (文件解析器)
@@ -1962,6 +2578,11 @@ ZEP_API_KEY=z_xxx
# OASIS模拟配置
OASIS_DEFAULT_MAX_ROUNDS=10
+
+# Report Agent配置(可选)
+REPORT_AGENT_MAX_TOOL_CALLS=5
+REPORT_AGENT_MAX_REFLECTION_ROUNDS=2
+REPORT_AGENT_TEMPERATURE=0.5
```
### 配置项说明
@@ -1977,6 +2598,9 @@ OASIS_DEFAULT_MAX_ROUNDS=10
| LLM_MODEL_NAME | String | gpt-4o-mini | LLM模型名称 |
| ZEP_API_KEY | String | - | Zep API密钥(必填) |
| OASIS_DEFAULT_MAX_ROUNDS | Integer | 10 | 默认模拟轮数 |
+| REPORT_AGENT_MAX_TOOL_CALLS | Integer | 5 | 每章节最大工具调用次数 |
+| REPORT_AGENT_MAX_REFLECTION_ROUNDS | Integer | 2 | 最大反思轮数 |
+| REPORT_AGENT_TEMPERATURE | Float | 0.5 | 报告生成温度参数 |
---
@@ -2102,7 +2726,40 @@ curl -X POST http://localhost:5001/api/simulation/{sim_xxx}/interview/all \
# Step 12: 获取Interview历史
curl http://localhost:5001/api/simulation/{sim_xxx}/interview/history
-# Step 13: 关闭模拟环境(优雅退出)
+# Step 13: 生成模拟分析报告
+curl -X POST http://localhost:5001/api/report/generate \
+ -H "Content-Type: application/json" \
+ -d '{
+ "simulation_id": "sim_xxx"
+ }'
+
+# 返回: task_id
+
+# Step 14: 查询报告生成进度
+curl -X POST http://localhost:5001/api/report/generate/status \
+ -H "Content-Type: application/json" \
+ -d '{
+ "task_id": "task_xxx",
+ "simulation_id": "sim_xxx"
+ }'
+
+# 等待status=completed
+
+# Step 15: 获取报告
+curl http://localhost:5001/api/report/by-simulation/sim_xxx
+
+# Step 16: 与Report Agent对话
+curl -X POST http://localhost:5001/api/report/chat \
+ -H "Content-Type: application/json" \
+ -d '{
+ "simulation_id": "sim_xxx",
+ "message": "请解释一下舆情的主要趋势"
+ }'
+
+# Step 17: 下载Markdown报告
+curl -O http://localhost:5001/api/report/{report_id}/download
+
+# Step 18: 关闭模拟环境(优雅退出)
curl -X POST http://localhost:5001/api/simulation/close-env \
-H "Content-Type: application/json" \
-d '{
@@ -2278,11 +2935,31 @@ MIT License
---
-**最后更新**: 2025-12-08
-**版本**: v1.2.0
+**最后更新**: 2025-12-09
+**版本**: v1.3.0
### 更新日志
+**v1.3.0 (2025-12-09)**:
+- 新增 Report Agent 模拟报告生成功能
+ - 使用 LangChain + Zep 实现 ReACT 模式
+ - 自动规划报告大纲,分段生成内容
+ - 每章节可多次调用Zep检索工具获取信息
+ - 生成专业的Markdown格式报告
+- 新增 Report Agent 对话功能
+ - 报告完成后可与Agent对话
+ - Agent自主调用检索工具回答问题
+- 新增 Zep 检索工具服务
+ - 封装图谱搜索、节点读取、边查询等工具
+ - 支持语义搜索、统计分析、上下文获取
+- 新增报告管理接口
+ - 报告生成、查询、下载、删除
+ - 报告状态检查(解锁Interview功能)
+- 依赖更新
+ - 新增 langchain>=0.2.0
+ - 新增 langchain-core>=0.2.0
+ - 新增 langchain-openai>=0.1.0
+
**v1.2.0 (2025-12-08)**:
- 新增 Interview 采访功能
- 支持单个Agent采访
diff --git a/backend/app/__init__.py b/backend/app/__init__.py
index 5b7f1fc..aba624b 100644
--- a/backend/app/__init__.py
+++ b/backend/app/__init__.py
@@ -63,9 +63,10 @@ def create_app(config_class=Config):
return response
# 注册蓝图
- from .api import graph_bp, simulation_bp
+ from .api import graph_bp, simulation_bp, report_bp
app.register_blueprint(graph_bp, url_prefix='/api/graph')
app.register_blueprint(simulation_bp, url_prefix='/api/simulation')
+ app.register_blueprint(report_bp, url_prefix='/api/report')
# 健康检查
@app.route('/health')
diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py
index ad7a722..ffda743 100644
--- a/backend/app/api/__init__.py
+++ b/backend/app/api/__init__.py
@@ -6,7 +6,9 @@ from flask import Blueprint
graph_bp = Blueprint('graph', __name__)
simulation_bp = Blueprint('simulation', __name__)
+report_bp = Blueprint('report', __name__)
from . import graph # noqa: E402, F401
from . import simulation # noqa: E402, F401
+from . import report # noqa: E402, F401
diff --git a/backend/app/api/report.py b/backend/app/api/report.py
new file mode 100644
index 0000000..ce44db9
--- /dev/null
+++ b/backend/app/api/report.py
@@ -0,0 +1,829 @@
+"""
+Report API路由
+提供模拟报告生成、获取、对话等接口
+"""
+
+import os
+import traceback
+import threading
+from flask import request, jsonify, send_file
+
+from . import report_bp
+from ..config import Config
+from ..services.report_agent import ReportAgent, ReportManager, ReportStatus
+from ..services.simulation_manager import SimulationManager
+from ..models.project import ProjectManager
+from ..models.task import TaskManager, TaskStatus
+from ..utils.logger import get_logger
+
+logger = get_logger('mirofish.api.report')
+
+
+# ============== 报告生成接口 ==============
+
+@report_bp.route('/generate', methods=['POST'])
+def generate_report():
+ """
+ 生成模拟分析报告(异步任务)
+
+ 这是一个耗时操作,接口会立即返回task_id,
+ 使用 GET /api/report/generate/status 查询进度
+
+ 请求(JSON):
+ {
+ "simulation_id": "sim_xxxx", // 必填,模拟ID
+ "force_regenerate": false // 可选,强制重新生成
+ }
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "simulation_id": "sim_xxxx",
+ "task_id": "task_xxxx",
+ "status": "generating",
+ "message": "报告生成任务已启动"
+ }
+ }
+ """
+ try:
+ data = request.get_json() or {}
+
+ simulation_id = data.get('simulation_id')
+ if not simulation_id:
+ return jsonify({
+ "success": False,
+ "error": "请提供 simulation_id"
+ }), 400
+
+ force_regenerate = data.get('force_regenerate', False)
+
+ # 获取模拟信息
+ manager = SimulationManager()
+ state = manager.get_simulation(simulation_id)
+
+ if not state:
+ return jsonify({
+ "success": False,
+ "error": f"模拟不存在: {simulation_id}"
+ }), 404
+
+ # 检查是否已有报告
+ if not force_regenerate:
+ existing_report = ReportManager.get_report_by_simulation(simulation_id)
+ if existing_report and existing_report.status == ReportStatus.COMPLETED:
+ return jsonify({
+ "success": True,
+ "data": {
+ "simulation_id": simulation_id,
+ "report_id": existing_report.report_id,
+ "status": "completed",
+ "message": "报告已存在",
+ "already_generated": True
+ }
+ })
+
+ # 获取项目信息
+ project = ProjectManager.get_project(state.project_id)
+ if not project:
+ return jsonify({
+ "success": False,
+ "error": f"项目不存在: {state.project_id}"
+ }), 404
+
+ graph_id = state.graph_id or project.graph_id
+ if not graph_id:
+ return jsonify({
+ "success": False,
+ "error": "缺少图谱ID,请确保已构建图谱"
+ }), 400
+
+ simulation_requirement = project.simulation_requirement
+ if not simulation_requirement:
+ return jsonify({
+ "success": False,
+ "error": "缺少模拟需求描述"
+ }), 400
+
+ # 创建异步任务
+ task_manager = TaskManager()
+ task_id = task_manager.create_task(
+ task_type="report_generate",
+ metadata={
+ "simulation_id": simulation_id,
+ "graph_id": graph_id
+ }
+ )
+
+ # 定义后台任务
+ def run_generate():
+ try:
+ task_manager.update_task(
+ task_id,
+ status=TaskStatus.PROCESSING,
+ progress=0,
+ message="初始化Report Agent..."
+ )
+
+ # 创建Report Agent
+ agent = ReportAgent(
+ graph_id=graph_id,
+ simulation_id=simulation_id,
+ simulation_requirement=simulation_requirement
+ )
+
+ # 进度回调
+ def progress_callback(stage, progress, message):
+ task_manager.update_task(
+ task_id,
+ progress=progress,
+ message=f"[{stage}] {message}"
+ )
+
+ # 生成报告
+ report = agent.generate_report(progress_callback=progress_callback)
+
+ # 保存报告
+ ReportManager.save_report(report)
+
+ if report.status == ReportStatus.COMPLETED:
+ task_manager.complete_task(
+ task_id,
+ result={
+ "report_id": report.report_id,
+ "simulation_id": simulation_id,
+ "status": "completed"
+ }
+ )
+ else:
+ task_manager.fail_task(task_id, report.error or "报告生成失败")
+
+ except Exception as e:
+ logger.error(f"报告生成失败: {str(e)}")
+ task_manager.fail_task(task_id, str(e))
+
+ # 启动后台线程
+ thread = threading.Thread(target=run_generate, daemon=True)
+ thread.start()
+
+ return jsonify({
+ "success": True,
+ "data": {
+ "simulation_id": simulation_id,
+ "task_id": task_id,
+ "status": "generating",
+ "message": "报告生成任务已启动,请通过 /api/report/generate/status 查询进度",
+ "already_generated": False
+ }
+ })
+
+ except Exception as e:
+ logger.error(f"启动报告生成任务失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('/generate/status', methods=['POST'])
+def get_generate_status():
+ """
+ 查询报告生成任务进度
+
+ 请求(JSON):
+ {
+ "task_id": "task_xxxx", // 可选,generate返回的task_id
+ "simulation_id": "sim_xxxx" // 可选,模拟ID
+ }
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "task_id": "task_xxxx",
+ "status": "processing|completed|failed",
+ "progress": 45,
+ "message": "..."
+ }
+ }
+ """
+ try:
+ data = request.get_json() or {}
+
+ task_id = data.get('task_id')
+ simulation_id = data.get('simulation_id')
+
+ # 如果提供了simulation_id,先检查是否已有完成的报告
+ if simulation_id:
+ existing_report = ReportManager.get_report_by_simulation(simulation_id)
+ if existing_report and existing_report.status == ReportStatus.COMPLETED:
+ return jsonify({
+ "success": True,
+ "data": {
+ "simulation_id": simulation_id,
+ "report_id": existing_report.report_id,
+ "status": "completed",
+ "progress": 100,
+ "message": "报告已生成",
+ "already_completed": True
+ }
+ })
+
+ if not task_id:
+ return jsonify({
+ "success": False,
+ "error": "请提供 task_id 或 simulation_id"
+ }), 400
+
+ task_manager = TaskManager()
+ task = task_manager.get_task(task_id)
+
+ if not task:
+ return jsonify({
+ "success": False,
+ "error": f"任务不存在: {task_id}"
+ }), 404
+
+ return jsonify({
+ "success": True,
+ "data": task.to_dict()
+ })
+
+ except Exception as e:
+ logger.error(f"查询任务状态失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e)
+ }), 500
+
+
+# ============== 报告获取接口 ==============
+
+@report_bp.route('/', methods=['GET'])
+def get_report(report_id: str):
+ """
+ 获取报告详情
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "report_id": "report_xxxx",
+ "simulation_id": "sim_xxxx",
+ "status": "completed",
+ "outline": {...},
+ "markdown_content": "...",
+ "created_at": "...",
+ "completed_at": "..."
+ }
+ }
+ """
+ try:
+ report = ReportManager.get_report(report_id)
+
+ if not report:
+ return jsonify({
+ "success": False,
+ "error": f"报告不存在: {report_id}"
+ }), 404
+
+ return jsonify({
+ "success": True,
+ "data": report.to_dict()
+ })
+
+ except Exception as e:
+ logger.error(f"获取报告失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('/by-simulation/', methods=['GET'])
+def get_report_by_simulation(simulation_id: str):
+ """
+ 根据模拟ID获取报告
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "report_id": "report_xxxx",
+ ...
+ }
+ }
+ """
+ try:
+ report = ReportManager.get_report_by_simulation(simulation_id)
+
+ if not report:
+ return jsonify({
+ "success": False,
+ "error": f"该模拟暂无报告: {simulation_id}",
+ "has_report": False
+ }), 404
+
+ return jsonify({
+ "success": True,
+ "data": report.to_dict(),
+ "has_report": True
+ })
+
+ except Exception as e:
+ logger.error(f"获取报告失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('/list', methods=['GET'])
+def list_reports():
+ """
+ 列出所有报告
+
+ Query参数:
+ simulation_id: 按模拟ID过滤(可选)
+ limit: 返回数量限制(默认50)
+
+ 返回:
+ {
+ "success": true,
+ "data": [...],
+ "count": 10
+ }
+ """
+ try:
+ simulation_id = request.args.get('simulation_id')
+ limit = request.args.get('limit', 50, type=int)
+
+ reports = ReportManager.list_reports(
+ simulation_id=simulation_id,
+ limit=limit
+ )
+
+ return jsonify({
+ "success": True,
+ "data": [r.to_dict() for r in reports],
+ "count": len(reports)
+ })
+
+ except Exception as e:
+ logger.error(f"列出报告失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('//download', methods=['GET'])
+def download_report(report_id: str):
+ """
+ 下载报告(Markdown格式)
+
+ 返回Markdown文件
+ """
+ try:
+ report = ReportManager.get_report(report_id)
+
+ if not report:
+ return jsonify({
+ "success": False,
+ "error": f"报告不存在: {report_id}"
+ }), 404
+
+ md_path = ReportManager._get_report_markdown_path(report_id)
+
+ if not os.path.exists(md_path):
+ # 如果MD文件不存在,生成一个临时文件
+ import tempfile
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
+ f.write(report.markdown_content)
+ temp_path = f.name
+
+ return send_file(
+ temp_path,
+ as_attachment=True,
+ download_name=f"{report_id}.md"
+ )
+
+ return send_file(
+ md_path,
+ as_attachment=True,
+ download_name=f"{report_id}.md"
+ )
+
+ except Exception as e:
+ logger.error(f"下载报告失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('/', methods=['DELETE'])
+def delete_report(report_id: str):
+ """删除报告"""
+ try:
+ success = ReportManager.delete_report(report_id)
+
+ if not success:
+ return jsonify({
+ "success": False,
+ "error": f"报告不存在: {report_id}"
+ }), 404
+
+ return jsonify({
+ "success": True,
+ "message": f"报告已删除: {report_id}"
+ })
+
+ except Exception as e:
+ logger.error(f"删除报告失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+# ============== Report Agent对话接口 ==============
+
+@report_bp.route('/chat', methods=['POST'])
+def chat_with_report_agent():
+ """
+ 与Report Agent对话
+
+ Report Agent可以在对话中自主调用检索工具来回答问题
+
+ 请求(JSON):
+ {
+ "simulation_id": "sim_xxxx", // 必填,模拟ID
+ "message": "请解释一下舆情走向", // 必填,用户消息
+ "chat_history": [ // 可选,对话历史
+ {"role": "user", "content": "..."},
+ {"role": "assistant", "content": "..."}
+ ]
+ }
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "response": "Agent回复...",
+ "tool_calls": [调用的工具列表],
+ "sources": [信息来源]
+ }
+ }
+ """
+ try:
+ data = request.get_json() or {}
+
+ simulation_id = data.get('simulation_id')
+ message = data.get('message')
+ chat_history = data.get('chat_history', [])
+
+ if not simulation_id:
+ return jsonify({
+ "success": False,
+ "error": "请提供 simulation_id"
+ }), 400
+
+ if not message:
+ return jsonify({
+ "success": False,
+ "error": "请提供 message"
+ }), 400
+
+ # 获取模拟和项目信息
+ manager = SimulationManager()
+ state = manager.get_simulation(simulation_id)
+
+ if not state:
+ return jsonify({
+ "success": False,
+ "error": f"模拟不存在: {simulation_id}"
+ }), 404
+
+ project = ProjectManager.get_project(state.project_id)
+ if not project:
+ return jsonify({
+ "success": False,
+ "error": f"项目不存在: {state.project_id}"
+ }), 404
+
+ graph_id = state.graph_id or project.graph_id
+ if not graph_id:
+ return jsonify({
+ "success": False,
+ "error": "缺少图谱ID"
+ }), 400
+
+ simulation_requirement = project.simulation_requirement or ""
+
+ # 创建Agent并进行对话
+ agent = ReportAgent(
+ graph_id=graph_id,
+ simulation_id=simulation_id,
+ simulation_requirement=simulation_requirement
+ )
+
+ result = agent.chat(message=message, chat_history=chat_history)
+
+ return jsonify({
+ "success": True,
+ "data": result
+ })
+
+ except Exception as e:
+ logger.error(f"对话失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+# ============== 报告进度与分章节接口 ==============
+
+@report_bp.route('//progress', methods=['GET'])
+def get_report_progress(report_id: str):
+ """
+ 获取报告生成进度(实时)
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "status": "generating",
+ "progress": 45,
+ "message": "正在生成章节: 关键发现",
+ "current_section": "关键发现",
+ "completed_sections": ["执行摘要", "模拟背景"],
+ "updated_at": "2025-12-09T..."
+ }
+ }
+ """
+ try:
+ progress = ReportManager.get_progress(report_id)
+
+ if not progress:
+ return jsonify({
+ "success": False,
+ "error": f"报告不存在或进度信息不可用: {report_id}"
+ }), 404
+
+ return jsonify({
+ "success": True,
+ "data": progress
+ })
+
+ except Exception as e:
+ logger.error(f"获取报告进度失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('//sections', methods=['GET'])
+def get_report_sections(report_id: str):
+ """
+ 获取已生成的章节列表(分章节输出)
+
+ 前端可以轮询此接口获取已生成的章节内容,无需等待整个报告完成
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "report_id": "report_xxxx",
+ "sections": [
+ {
+ "filename": "section_01.md",
+ "section_index": 1,
+ "content": "## 执行摘要\\n\\n..."
+ },
+ ...
+ ],
+ "total_sections": 3,
+ "is_complete": false
+ }
+ }
+ """
+ try:
+ sections = ReportManager.get_generated_sections(report_id)
+
+ # 获取报告状态
+ report = ReportManager.get_report(report_id)
+ is_complete = report is not None and report.status == ReportStatus.COMPLETED
+
+ return jsonify({
+ "success": True,
+ "data": {
+ "report_id": report_id,
+ "sections": sections,
+ "total_sections": len(sections),
+ "is_complete": is_complete
+ }
+ })
+
+ except Exception as e:
+ logger.error(f"获取章节列表失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('//section/', methods=['GET'])
+def get_single_section(report_id: str, section_index: int):
+ """
+ 获取单个章节内容
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "filename": "section_01.md",
+ "content": "## 执行摘要\\n\\n..."
+ }
+ }
+ """
+ try:
+ section_path = ReportManager._get_section_path(report_id, section_index)
+
+ if not os.path.exists(section_path):
+ return jsonify({
+ "success": False,
+ "error": f"章节不存在: section_{section_index:02d}.md"
+ }), 404
+
+ with open(section_path, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ return jsonify({
+ "success": True,
+ "data": {
+ "filename": f"section_{section_index:02d}.md",
+ "section_index": section_index,
+ "content": content
+ }
+ })
+
+ except Exception as e:
+ logger.error(f"获取章节内容失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+# ============== 报告状态检查接口 ==============
+
+@report_bp.route('/check/', methods=['GET'])
+def check_report_status(simulation_id: str):
+ """
+ 检查模拟是否有报告,以及报告状态
+
+ 用于前端判断是否解锁Interview功能
+
+ 返回:
+ {
+ "success": true,
+ "data": {
+ "simulation_id": "sim_xxxx",
+ "has_report": true,
+ "report_status": "completed",
+ "report_id": "report_xxxx",
+ "interview_unlocked": true
+ }
+ }
+ """
+ try:
+ report = ReportManager.get_report_by_simulation(simulation_id)
+
+ has_report = report is not None
+ report_status = report.status.value if report else None
+ report_id = report.report_id if report else None
+
+ # 只有报告完成后才解锁interview
+ interview_unlocked = has_report and report.status == ReportStatus.COMPLETED
+
+ return jsonify({
+ "success": True,
+ "data": {
+ "simulation_id": simulation_id,
+ "has_report": has_report,
+ "report_status": report_status,
+ "report_id": report_id,
+ "interview_unlocked": interview_unlocked
+ }
+ })
+
+ except Exception as e:
+ logger.error(f"检查报告状态失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+# ============== 工具调用接口(供调试使用)==============
+
+@report_bp.route('/tools/search', methods=['POST'])
+def search_graph_tool():
+ """
+ 图谱搜索工具接口(供调试使用)
+
+ 请求(JSON):
+ {
+ "graph_id": "mirofish_xxxx",
+ "query": "搜索查询",
+ "limit": 10
+ }
+ """
+ try:
+ data = request.get_json() or {}
+
+ graph_id = data.get('graph_id')
+ query = data.get('query')
+ limit = data.get('limit', 10)
+
+ if not graph_id or not query:
+ return jsonify({
+ "success": False,
+ "error": "请提供 graph_id 和 query"
+ }), 400
+
+ from ..services.zep_tools import ZepToolsService
+
+ tools = ZepToolsService()
+ result = tools.search_graph(
+ graph_id=graph_id,
+ query=query,
+ limit=limit
+ )
+
+ return jsonify({
+ "success": True,
+ "data": result.to_dict()
+ })
+
+ except Exception as e:
+ logger.error(f"图谱搜索失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
+
+
+@report_bp.route('/tools/statistics', methods=['POST'])
+def get_graph_statistics_tool():
+ """
+ 图谱统计工具接口(供调试使用)
+
+ 请求(JSON):
+ {
+ "graph_id": "mirofish_xxxx"
+ }
+ """
+ try:
+ data = request.get_json() or {}
+
+ graph_id = data.get('graph_id')
+
+ if not graph_id:
+ return jsonify({
+ "success": False,
+ "error": "请提供 graph_id"
+ }), 400
+
+ from ..services.zep_tools import ZepToolsService
+
+ tools = ZepToolsService()
+ result = tools.get_graph_statistics(graph_id)
+
+ return jsonify({
+ "success": True,
+ "data": result
+ })
+
+ except Exception as e:
+ logger.error(f"获取图谱统计失败: {str(e)}")
+ return jsonify({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }), 500
diff --git a/backend/app/config.py b/backend/app/config.py
index 10b18f7..3dd3f51 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -58,6 +58,11 @@ class Config:
'TREND', 'REFRESH', 'DO_NOTHING', 'FOLLOW', 'MUTE'
]
+ # Report Agent配置
+ REPORT_AGENT_MAX_TOOL_CALLS = int(os.environ.get('REPORT_AGENT_MAX_TOOL_CALLS', '5'))
+ REPORT_AGENT_MAX_REFLECTION_ROUNDS = int(os.environ.get('REPORT_AGENT_MAX_REFLECTION_ROUNDS', '2'))
+ REPORT_AGENT_TEMPERATURE = float(os.environ.get('REPORT_AGENT_TEMPERATURE', '0.5'))
+
@classmethod
def validate(cls):
"""验证必要配置"""
diff --git a/backend/app/services/report_agent.py b/backend/app/services/report_agent.py
new file mode 100644
index 0000000..3ec4be3
--- /dev/null
+++ b/backend/app/services/report_agent.py
@@ -0,0 +1,1296 @@
+"""
+Report Agent服务
+使用LangChain + Zep实现ReACT模式的模拟报告生成
+
+功能:
+1. 根据模拟需求和Zep图谱信息生成报告
+2. 先规划目录结构,然后分段生成
+3. 每段采用ReACT多轮思考与反思模式
+4. 支持与用户对话,在对话中自主调用检索工具
+"""
+
+import os
+import json
+import time
+import re
+from typing import Dict, Any, List, Optional, Callable
+from dataclasses import dataclass, field
+from datetime import datetime
+from enum import Enum
+
+from ..config import Config
+from ..utils.llm_client import LLMClient
+from ..utils.logger import get_logger
+from .zep_tools import ZepToolsService, SearchResult
+
+logger = get_logger('mirofish.report_agent')
+
+
+class ReportStatus(str, Enum):
+ """报告状态"""
+ PENDING = "pending"
+ PLANNING = "planning"
+ GENERATING = "generating"
+ COMPLETED = "completed"
+ FAILED = "failed"
+
+
+@dataclass
+class ReportSection:
+ """报告章节"""
+ title: str
+ content: str = ""
+ subsections: List['ReportSection'] = field(default_factory=list)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "title": self.title,
+ "content": self.content,
+ "subsections": [s.to_dict() for s in self.subsections]
+ }
+
+ def to_markdown(self, level: int = 2) -> str:
+ """转换为Markdown格式"""
+ md = f"{'#' * level} {self.title}\n\n"
+ if self.content:
+ md += f"{self.content}\n\n"
+ for sub in self.subsections:
+ md += sub.to_markdown(level + 1)
+ return md
+
+
+@dataclass
+class ReportOutline:
+ """报告大纲"""
+ title: str
+ summary: str
+ sections: List[ReportSection]
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "title": self.title,
+ "summary": self.summary,
+ "sections": [s.to_dict() for s in self.sections]
+ }
+
+ def to_markdown(self) -> str:
+ """转换为Markdown格式"""
+ md = f"# {self.title}\n\n"
+ md += f"> {self.summary}\n\n"
+ for section in self.sections:
+ md += section.to_markdown()
+ return md
+
+
+@dataclass
+class Report:
+ """完整报告"""
+ report_id: str
+ simulation_id: str
+ graph_id: str
+ simulation_requirement: str
+ status: ReportStatus
+ outline: Optional[ReportOutline] = None
+ markdown_content: str = ""
+ created_at: str = ""
+ completed_at: str = ""
+ error: Optional[str] = None
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "report_id": self.report_id,
+ "simulation_id": self.simulation_id,
+ "graph_id": self.graph_id,
+ "simulation_requirement": self.simulation_requirement,
+ "status": self.status.value,
+ "outline": self.outline.to_dict() if self.outline else None,
+ "markdown_content": self.markdown_content,
+ "created_at": self.created_at,
+ "completed_at": self.completed_at,
+ "error": self.error
+ }
+
+
+class ReportAgent:
+ """
+ Report Agent - 模拟报告生成Agent
+
+ 采用ReACT(Reasoning + Acting)模式:
+ 1. 规划阶段:分析模拟需求,规划报告目录结构
+ 2. 生成阶段:逐章节生成内容,每章节可多次调用工具获取信息
+ 3. 反思阶段:检查内容完整性和准确性
+
+ 工具(MCP封装):
+ - search_graph: 图谱语义搜索
+ - get_graph_statistics: 获取图谱统计
+ - get_entity_summary: 获取实体摘要
+ - get_simulation_context: 获取模拟上下文
+ """
+
+ # 最大工具调用次数(每个章节)
+ MAX_TOOL_CALLS_PER_SECTION = 5
+
+ # 最大反思轮数
+ MAX_REFLECTION_ROUNDS = 2
+
+ def __init__(
+ self,
+ graph_id: str,
+ simulation_id: str,
+ simulation_requirement: str,
+ llm_client: Optional[LLMClient] = None,
+ zep_tools: Optional[ZepToolsService] = None
+ ):
+ """
+ 初始化Report Agent
+
+ Args:
+ graph_id: 图谱ID
+ simulation_id: 模拟ID
+ simulation_requirement: 模拟需求描述
+ llm_client: LLM客户端(可选)
+ zep_tools: Zep工具服务(可选)
+ """
+ self.graph_id = graph_id
+ self.simulation_id = simulation_id
+ self.simulation_requirement = simulation_requirement
+
+ self.llm = llm_client or LLMClient()
+ self.zep_tools = zep_tools or ZepToolsService()
+
+ # 工具定义
+ self.tools = self._define_tools()
+
+ logger.info(f"ReportAgent 初始化完成: graph_id={graph_id}, simulation_id={simulation_id}")
+
+ def _define_tools(self) -> Dict[str, Dict[str, Any]]:
+ """定义可用工具"""
+ return {
+ "search_graph": {
+ "name": "search_graph",
+ "description": "在知识图谱中搜索相关信息。输入搜索查询,返回与查询相关的事实和关系。",
+ "parameters": {
+ "query": "搜索查询字符串",
+ "limit": "返回结果数量(可选,默认10)"
+ }
+ },
+ "get_graph_statistics": {
+ "name": "get_graph_statistics",
+ "description": "获取知识图谱的统计信息,包括节点数量、边数量、实体类型分布等。",
+ "parameters": {}
+ },
+ "get_entity_summary": {
+ "name": "get_entity_summary",
+ "description": "获取指定实体的详细信息和关系摘要。",
+ "parameters": {
+ "entity_name": "实体名称"
+ }
+ },
+ "get_simulation_context": {
+ "name": "get_simulation_context",
+ "description": "获取与模拟需求相关的上下文信息,包括相关事实、实体列表等。",
+ "parameters": {
+ "query": "额外的查询条件(可选)"
+ }
+ },
+ "get_entities_by_type": {
+ "name": "get_entities_by_type",
+ "description": "按类型获取实体列表,如获取所有Student类型或PublicFigure类型的实体。",
+ "parameters": {
+ "entity_type": "实体类型名称"
+ }
+ }
+ }
+
+ def _execute_tool(self, tool_name: str, parameters: Dict[str, Any]) -> str:
+ """
+ 执行工具调用
+
+ Args:
+ tool_name: 工具名称
+ parameters: 工具参数
+
+ Returns:
+ 工具执行结果(文本格式)
+ """
+ logger.info(f"执行工具: {tool_name}, 参数: {parameters}")
+
+ try:
+ if tool_name == "search_graph":
+ query = parameters.get("query", "")
+ limit = parameters.get("limit", 10)
+ result = self.zep_tools.search_graph(
+ graph_id=self.graph_id,
+ query=query,
+ limit=limit
+ )
+ return result.to_text()
+
+ elif tool_name == "get_graph_statistics":
+ result = self.zep_tools.get_graph_statistics(self.graph_id)
+ return json.dumps(result, ensure_ascii=False, indent=2)
+
+ elif tool_name == "get_entity_summary":
+ entity_name = parameters.get("entity_name", "")
+ result = self.zep_tools.get_entity_summary(
+ graph_id=self.graph_id,
+ entity_name=entity_name
+ )
+ return json.dumps(result, ensure_ascii=False, indent=2)
+
+ elif tool_name == "get_simulation_context":
+ query = parameters.get("query", self.simulation_requirement)
+ result = self.zep_tools.get_simulation_context(
+ graph_id=self.graph_id,
+ simulation_requirement=query
+ )
+ return json.dumps(result, ensure_ascii=False, indent=2)
+
+ elif tool_name == "get_entities_by_type":
+ entity_type = parameters.get("entity_type", "")
+ nodes = self.zep_tools.get_entities_by_type(
+ graph_id=self.graph_id,
+ entity_type=entity_type
+ )
+ result = [n.to_dict() for n in nodes]
+ return json.dumps(result, ensure_ascii=False, indent=2)
+
+ else:
+ return f"未知工具: {tool_name}"
+
+ except Exception as e:
+ logger.error(f"工具执行失败: {tool_name}, 错误: {str(e)}")
+ return f"工具执行失败: {str(e)}"
+
+ def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]:
+ """
+ 从LLM响应中解析工具调用
+
+ 支持的格式:
+
+ {"name": "tool_name", "parameters": {"param1": "value1"}}
+
+
+ 或者:
+ [TOOL_CALL] tool_name(param1="value1", param2="value2")
+ """
+ tool_calls = []
+
+ # 格式1: XML风格
+ xml_pattern = r'\s*(\{.*?\})\s*'
+ for match in re.finditer(xml_pattern, response, re.DOTALL):
+ try:
+ call_data = json.loads(match.group(1))
+ tool_calls.append(call_data)
+ except json.JSONDecodeError:
+ pass
+
+ # 格式2: 函数调用风格
+ func_pattern = r'\[TOOL_CALL\]\s*(\w+)\s*\((.*?)\)'
+ for match in re.finditer(func_pattern, response, re.DOTALL):
+ tool_name = match.group(1)
+ params_str = match.group(2)
+
+ # 解析参数
+ params = {}
+ for param_match in re.finditer(r'(\w+)\s*=\s*["\']([^"\']*)["\']', params_str):
+ params[param_match.group(1)] = param_match.group(2)
+
+ tool_calls.append({
+ "name": tool_name,
+ "parameters": params
+ })
+
+ return tool_calls
+
+ def _get_tools_description(self) -> str:
+ """生成工具描述文本"""
+ desc_parts = ["可用工具:"]
+ for name, tool in self.tools.items():
+ params_desc = ", ".join([f"{k}: {v}" for k, v in tool["parameters"].items()])
+ desc_parts.append(f"- {name}: {tool['description']}")
+ if params_desc:
+ desc_parts.append(f" 参数: {params_desc}")
+ return "\n".join(desc_parts)
+
+ def plan_outline(
+ self,
+ progress_callback: Optional[Callable] = None
+ ) -> ReportOutline:
+ """
+ 规划报告大纲
+
+ 使用LLM分析模拟需求,规划报告的目录结构
+
+ Args:
+ progress_callback: 进度回调函数
+
+ Returns:
+ ReportOutline: 报告大纲
+ """
+ logger.info("开始规划报告大纲...")
+
+ if progress_callback:
+ progress_callback("planning", 0, "正在分析模拟需求...")
+
+ # 首先获取模拟上下文
+ context = self.zep_tools.get_simulation_context(
+ graph_id=self.graph_id,
+ simulation_requirement=self.simulation_requirement
+ )
+
+ if progress_callback:
+ progress_callback("planning", 30, "正在生成报告大纲...")
+
+ # 构建规划prompt
+ system_prompt = """你是一个专业的舆情分析报告撰写专家。你需要根据用户的模拟需求和已有的知识图谱信息,规划一份精炼的模拟分析报告大纲。
+
+【重要】报告章节数量限制:
+- 报告最多包含5个主章节
+- 每个章节可以有0-2个子章节
+- 内容要精炼,避免冗余
+
+报告应聚焦以下核心内容(选择最相关的3-5项):
+1. 执行摘要 - 简要总结模拟结果和关键发现
+2. 模拟背景 - 描述模拟的初始条件和场景设定
+3. 关键发现 - 分析模拟中的重要发现和趋势
+4. 舆情分析 - 分析舆论走向、情绪变化、关键意见领袖等
+5. 建议与展望 - 基于分析结果提出建议
+
+请输出JSON格式的报告大纲,格式如下:
+{
+ "title": "报告标题",
+ "summary": "报告摘要(一句话概括)",
+ "sections": [
+ {
+ "title": "章节标题",
+ "description": "章节内容描述",
+ "subsections": [
+ {"title": "子章节标题", "description": "子章节描述"}
+ ]
+ }
+ ]
+}
+
+注意:sections数组最多包含5个元素!"""
+
+ user_prompt = f"""模拟需求:
+{self.simulation_requirement}
+
+已有的知识图谱信息:
+- 总节点数: {context.get('graph_statistics', {}).get('total_nodes', 0)}
+- 总边数: {context.get('graph_statistics', {}).get('total_edges', 0)}
+- 实体类型: {list(context.get('graph_statistics', {}).get('entity_types', {}).keys())}
+- 实体数量: {context.get('total_entities', 0)}
+
+相关事实:
+{json.dumps(context.get('related_facts', [])[:10], ensure_ascii=False, indent=2)}
+
+请根据以上信息,生成一份针对此模拟场景的报告大纲。
+
+【再次提醒】报告必须控制在最多5个章节以内,内容要精炼聚焦。"""
+
+ try:
+ response = self.llm.chat_json(
+ messages=[
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_prompt}
+ ],
+ temperature=0.3
+ )
+
+ if progress_callback:
+ progress_callback("planning", 80, "正在解析大纲结构...")
+
+ # 解析大纲
+ sections = []
+ for section_data in response.get("sections", []):
+ subsections = []
+ for sub_data in section_data.get("subsections", []):
+ subsections.append(ReportSection(
+ title=sub_data.get("title", ""),
+ content=""
+ ))
+
+ sections.append(ReportSection(
+ title=section_data.get("title", ""),
+ content="",
+ subsections=subsections
+ ))
+
+ outline = ReportOutline(
+ title=response.get("title", "模拟分析报告"),
+ summary=response.get("summary", ""),
+ sections=sections
+ )
+
+ if progress_callback:
+ progress_callback("planning", 100, "大纲规划完成")
+
+ logger.info(f"大纲规划完成: {len(sections)} 个章节")
+ return outline
+
+ except Exception as e:
+ logger.error(f"大纲规划失败: {str(e)}")
+ # 返回默认大纲(5个章节)
+ return ReportOutline(
+ title="模拟分析报告",
+ summary="基于模拟结果的分析报告",
+ sections=[
+ ReportSection(title="执行摘要"),
+ ReportSection(title="模拟背景与场景设定"),
+ ReportSection(title="关键发现与趋势分析"),
+ ReportSection(title="舆情走向与情绪演化"),
+ ReportSection(title="总结与建议")
+ ]
+ )
+
+ def _generate_section_react(
+ self,
+ section: ReportSection,
+ outline: ReportOutline,
+ previous_sections: List[str],
+ progress_callback: Optional[Callable] = None
+ ) -> str:
+ """
+ 使用ReACT模式生成单个章节内容
+
+ ReACT循环:
+ 1. Thought(思考)- 分析需要什么信息
+ 2. Action(行动)- 调用工具获取信息
+ 3. Observation(观察)- 分析工具返回结果
+ 4. 重复直到信息足够或达到最大次数
+ 5. Final Answer(最终回答)- 生成章节内容
+
+ Args:
+ section: 要生成的章节
+ outline: 完整大纲
+ previous_sections: 之前章节的内容(用于保持连贯性)
+ progress_callback: 进度回调
+
+ Returns:
+ 章节内容(Markdown格式)
+ """
+ logger.info(f"ReACT生成章节: {section.title}")
+
+ # 构建系统prompt
+ system_prompt = f"""你是一个专业的舆情分析报告撰写专家,正在撰写报告的一个章节。
+
+报告标题: {outline.title}
+报告摘要: {outline.summary}
+模拟需求: {self.simulation_requirement}
+
+当前要撰写的章节: {section.title}
+
+你可以使用以下工具来获取信息,每次最多调用{self.MAX_TOOL_CALLS_PER_SECTION}次:
+
+{self._get_tools_description()}
+
+请按照以下ReACT格式进行思考和行动:
+
+Thought: [分析当前需要什么信息来撰写这个章节]
+Action: [如果需要信息,调用工具]
+
+{{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}}
+
+
+当收集到足够信息后,输出:
+Final Answer:
+[章节的完整Markdown内容]
+
+注意:
+1. 内容要专业、客观、有深度
+2. 引用具体的数据和事实
+3. 保持与其他章节的逻辑连贯性
+4. 使用适当的Markdown格式(列表、强调等)
+5. 不要重复前面章节已经详细描述的内容"""
+
+ # 构建用户prompt
+ previous_content = "\n\n".join(previous_sections) if previous_sections else "(这是第一个章节)"
+ user_prompt = f"""已完成的章节内容:
+{previous_content[:2000]}
+
+现在请撰写章节: {section.title}
+
+首先思考需要什么信息,然后调用工具获取,最后生成内容。"""
+
+ messages = [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_prompt}
+ ]
+
+ # ReACT循环
+ tool_calls_count = 0
+ max_iterations = self.MAX_TOOL_CALLS_PER_SECTION + 2
+
+ for iteration in range(max_iterations):
+ if progress_callback:
+ progress_callback(
+ "generating",
+ int((iteration / max_iterations) * 100),
+ f"思考与行动中 ({tool_calls_count}/{self.MAX_TOOL_CALLS_PER_SECTION})"
+ )
+
+ # 调用LLM
+ response = self.llm.chat(
+ messages=messages,
+ temperature=0.5,
+ max_tokens=4096
+ )
+
+ logger.debug(f"LLM响应: {response[:200]}...")
+
+ # 检查是否有最终答案
+ if "Final Answer:" in response:
+ # 提取最终答案
+ final_answer = response.split("Final Answer:")[-1].strip()
+ logger.info(f"章节 {section.title} 生成完成")
+ return final_answer
+
+ # 解析工具调用
+ tool_calls = self._parse_tool_calls(response)
+
+ if not tool_calls:
+ # 没有工具调用也没有最终答案,提示生成最终答案
+ messages.append({"role": "assistant", "content": response})
+ messages.append({
+ "role": "user",
+ "content": "请基于已有信息,输出 Final Answer: 并生成章节内容。"
+ })
+ continue
+
+ # 执行工具调用
+ tool_results = []
+ for call in tool_calls:
+ if tool_calls_count >= self.MAX_TOOL_CALLS_PER_SECTION:
+ break
+
+ result = self._execute_tool(call["name"], call.get("parameters", {}))
+ tool_results.append(f"工具 {call['name']} 返回:\n{result}")
+ tool_calls_count += 1
+
+ # 将结果添加到消息
+ messages.append({"role": "assistant", "content": response})
+ messages.append({
+ "role": "user",
+ "content": f"Observation:\n" + "\n\n".join(tool_results) + "\n\n请继续思考或输出 Final Answer:"
+ })
+
+ # 达到最大迭代次数,强制生成内容
+ logger.warning(f"章节 {section.title} 达到最大迭代次数,强制生成")
+ messages.append({
+ "role": "user",
+ "content": "已达到工具调用限制,请直接输出 Final Answer: 并生成章节内容。"
+ })
+
+ response = self.llm.chat(
+ messages=messages,
+ temperature=0.5,
+ max_tokens=4096
+ )
+
+ if "Final Answer:" in response:
+ return response.split("Final Answer:")[-1].strip()
+
+ return response
+
+ def generate_report(
+ self,
+ progress_callback: Optional[Callable[[str, int, str], None]] = None
+ ) -> Report:
+ """
+ 生成完整报告(分章节实时输出)
+
+ 每个章节生成完成后立即保存到文件夹,不需要等待整个报告完成。
+ 文件结构:
+ reports/{report_id}/
+ meta.json - 报告元信息
+ outline.json - 报告大纲
+ progress.json - 生成进度
+ section_01.md - 第1章节
+ section_02.md - 第2章节
+ ...
+ full_report.md - 完整报告
+
+ Args:
+ progress_callback: 进度回调函数 (stage, progress, message)
+
+ Returns:
+ Report: 完整报告
+ """
+ import uuid
+
+ report_id = f"report_{uuid.uuid4().hex[:12]}"
+
+ report = Report(
+ report_id=report_id,
+ simulation_id=self.simulation_id,
+ graph_id=self.graph_id,
+ simulation_requirement=self.simulation_requirement,
+ status=ReportStatus.PENDING,
+ created_at=datetime.now().isoformat()
+ )
+
+ # 已完成的章节标题列表(用于进度追踪)
+ completed_section_titles = []
+
+ try:
+ # 初始化:创建报告文件夹并保存初始状态
+ ReportManager._ensure_report_folder(report_id)
+ ReportManager.update_progress(
+ report_id, "pending", 0, "初始化报告...",
+ completed_sections=[]
+ )
+ ReportManager.save_report(report)
+
+ # 阶段1: 规划大纲
+ report.status = ReportStatus.PLANNING
+ ReportManager.update_progress(
+ report_id, "planning", 5, "开始规划报告大纲...",
+ completed_sections=[]
+ )
+
+ if progress_callback:
+ progress_callback("planning", 0, "开始规划报告大纲...")
+
+ outline = self.plan_outline(
+ progress_callback=lambda stage, prog, msg:
+ progress_callback(stage, prog // 5, msg) if progress_callback else None
+ )
+ report.outline = outline
+
+ # 保存大纲到文件
+ ReportManager.save_outline(report_id, outline)
+ ReportManager.update_progress(
+ report_id, "planning", 15, f"大纲规划完成,共{len(outline.sections)}个章节",
+ completed_sections=[]
+ )
+ ReportManager.save_report(report)
+
+ logger.info(f"大纲已保存到文件: {report_id}/outline.json")
+
+ # 阶段2: 逐章节生成(分章节保存)
+ report.status = ReportStatus.GENERATING
+
+ total_sections = len(outline.sections)
+ generated_sections = [] # 保存内容用于上下文
+
+ for i, section in enumerate(outline.sections):
+ section_num = i + 1
+ base_progress = 20 + int((i / total_sections) * 70)
+
+ # 更新进度
+ ReportManager.update_progress(
+ report_id, "generating", base_progress,
+ f"正在生成章节: {section.title} ({section_num}/{total_sections})",
+ current_section=section.title,
+ completed_sections=completed_section_titles
+ )
+
+ if progress_callback:
+ progress_callback(
+ "generating",
+ base_progress,
+ f"正在生成章节: {section.title} ({section_num}/{total_sections})"
+ )
+
+ # 生成章节内容
+ section_content = self._generate_section_react(
+ section=section,
+ outline=outline,
+ previous_sections=generated_sections,
+ progress_callback=lambda stage, prog, msg:
+ progress_callback(
+ stage,
+ base_progress + int(prog * 0.7 / total_sections),
+ msg
+ ) if progress_callback else None
+ )
+
+ section.content = section_content
+ generated_sections.append(f"## {section.title}\n\n{section_content}")
+
+ # 【关键】立即保存章节到文件
+ ReportManager.save_section(report_id, section_num, section)
+ completed_section_titles.append(section.title)
+
+ logger.info(f"章节已保存: {report_id}/section_{section_num:02d}.md")
+
+ # 更新进度
+ ReportManager.update_progress(
+ report_id, "generating",
+ base_progress + int(70 / total_sections),
+ f"章节 {section.title} 已完成",
+ current_section=None,
+ completed_sections=completed_section_titles
+ )
+
+ # 生成并保存子章节
+ for j, subsection in enumerate(section.subsections):
+ subsection_num = j + 1
+
+ if progress_callback:
+ progress_callback(
+ "generating",
+ base_progress + int(((j + 1) / len(section.subsections)) * 5),
+ f"正在生成子章节: {subsection.title}"
+ )
+
+ ReportManager.update_progress(
+ report_id, "generating",
+ base_progress + int(((j + 1) / len(section.subsections)) * 5),
+ f"正在生成子章节: {subsection.title}",
+ current_section=subsection.title,
+ completed_sections=completed_section_titles
+ )
+
+ subsection_content = self._generate_section_react(
+ section=subsection,
+ outline=outline,
+ previous_sections=generated_sections,
+ progress_callback=None
+ )
+ subsection.content = subsection_content
+ generated_sections.append(f"### {subsection.title}\n\n{subsection_content}")
+
+ # 【关键】立即保存子章节到文件
+ ReportManager.save_section(
+ report_id, subsection_num, subsection,
+ is_subsection=True, parent_index=section_num
+ )
+ completed_section_titles.append(f" └─ {subsection.title}")
+
+ logger.info(f"子章节已保存: {report_id}/section_{section_num:02d}_{subsection_num:02d}.md")
+
+ # 阶段3: 组装完整报告
+ if progress_callback:
+ progress_callback("generating", 95, "正在组装完整报告...")
+
+ ReportManager.update_progress(
+ report_id, "generating", 95, "正在组装完整报告...",
+ completed_sections=completed_section_titles
+ )
+
+ # 使用ReportManager组装完整报告
+ report.markdown_content = ReportManager.assemble_full_report(report_id, outline)
+ report.status = ReportStatus.COMPLETED
+ report.completed_at = datetime.now().isoformat()
+
+ # 保存最终报告
+ ReportManager.save_report(report)
+ ReportManager.update_progress(
+ report_id, "completed", 100, "报告生成完成",
+ completed_sections=completed_section_titles
+ )
+
+ if progress_callback:
+ progress_callback("completed", 100, "报告生成完成")
+
+ logger.info(f"报告生成完成: {report_id}")
+ return report
+
+ except Exception as e:
+ logger.error(f"报告生成失败: {str(e)}")
+ report.status = ReportStatus.FAILED
+ report.error = str(e)
+
+ # 保存失败状态
+ try:
+ ReportManager.save_report(report)
+ ReportManager.update_progress(
+ report_id, "failed", -1, f"报告生成失败: {str(e)}",
+ completed_sections=completed_section_titles
+ )
+ except Exception:
+ pass # 忽略保存失败的错误
+
+ return report
+
+ def chat(
+ self,
+ message: str,
+ chat_history: List[Dict[str, str]] = None
+ ) -> Dict[str, Any]:
+ """
+ 与Report Agent对话
+
+ 在对话中Agent可以自主调用检索工具来回答问题
+
+ Args:
+ message: 用户消息
+ chat_history: 对话历史
+
+ Returns:
+ {
+ "response": "Agent回复",
+ "tool_calls": [调用的工具列表],
+ "sources": [信息来源]
+ }
+ """
+ logger.info(f"Report Agent对话: {message[:50]}...")
+
+ chat_history = chat_history or []
+
+ system_prompt = f"""你是一个专业的舆情分析助手,负责回答关于模拟分析报告的问题。
+
+模拟需求: {self.simulation_requirement}
+图谱ID: {self.graph_id}
+
+你可以使用以下工具来获取信息:
+
+{self._get_tools_description()}
+
+工具调用格式:
+
+{{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}}
+
+
+回答要求:
+1. 基于事实和数据回答
+2. 引用具体信息来源
+3. 如果不确定,说明信息限制
+4. 保持专业和客观"""
+
+ # 构建消息
+ messages = [{"role": "system", "content": system_prompt}]
+
+ # 添加历史对话
+ for h in chat_history[-10:]: # 限制历史长度
+ messages.append(h)
+
+ messages.append({"role": "user", "content": message})
+
+ # ReACT循环
+ tool_calls_made = []
+ max_iterations = 3
+
+ for iteration in range(max_iterations):
+ response = self.llm.chat(
+ messages=messages,
+ temperature=0.5,
+ max_tokens=2048
+ )
+
+ # 解析工具调用
+ tool_calls = self._parse_tool_calls(response)
+
+ if not tool_calls:
+ # 没有工具调用,返回响应
+ # 清理响应中的工具调用标记
+ clean_response = re.sub(r'.*?', '', response, flags=re.DOTALL)
+ clean_response = re.sub(r'\[TOOL_CALL\].*?\)', '', clean_response)
+
+ return {
+ "response": clean_response.strip(),
+ "tool_calls": tool_calls_made,
+ "sources": []
+ }
+
+ # 执行工具调用
+ tool_results = []
+ for call in tool_calls:
+ result = self._execute_tool(call["name"], call.get("parameters", {}))
+ tool_results.append({
+ "tool": call["name"],
+ "result": result[:1000] # 限制长度
+ })
+ tool_calls_made.append(call)
+
+ # 将结果添加到消息
+ messages.append({"role": "assistant", "content": response})
+ observation = "工具调用结果:\n" + "\n\n".join([
+ f"[{r['tool']}]: {r['result']}" for r in tool_results
+ ])
+ messages.append({"role": "user", "content": observation + "\n\n请基于以上信息回答问题。"})
+
+ # 达到最大迭代,获取最终响应
+ final_response = self.llm.chat(
+ messages=messages,
+ temperature=0.5,
+ max_tokens=2048
+ )
+
+ return {
+ "response": final_response,
+ "tool_calls": tool_calls_made,
+ "sources": []
+ }
+
+
+class ReportManager:
+ """
+ 报告管理器
+
+ 负责报告的持久化存储和检索
+
+ 文件结构(分章节输出):
+ reports/
+ {report_id}/
+ meta.json - 报告元信息和状态
+ outline.json - 报告大纲
+ progress.json - 生成进度
+ section_01.md - 第1章节
+ section_02.md - 第2章节
+ ...
+ full_report.md - 完整报告
+ """
+
+ # 报告存储目录
+ REPORTS_DIR = os.path.join(Config.UPLOAD_FOLDER, 'reports')
+
+ @classmethod
+ def _ensure_reports_dir(cls):
+ """确保报告根目录存在"""
+ os.makedirs(cls.REPORTS_DIR, exist_ok=True)
+
+ @classmethod
+ def _get_report_folder(cls, report_id: str) -> str:
+ """获取报告文件夹路径"""
+ return os.path.join(cls.REPORTS_DIR, report_id)
+
+ @classmethod
+ def _ensure_report_folder(cls, report_id: str) -> str:
+ """确保报告文件夹存在并返回路径"""
+ folder = cls._get_report_folder(report_id)
+ os.makedirs(folder, exist_ok=True)
+ return folder
+
+ @classmethod
+ def _get_report_path(cls, report_id: str) -> str:
+ """获取报告元信息文件路径"""
+ return os.path.join(cls._get_report_folder(report_id), "meta.json")
+
+ @classmethod
+ def _get_report_markdown_path(cls, report_id: str) -> str:
+ """获取完整报告Markdown文件路径"""
+ return os.path.join(cls._get_report_folder(report_id), "full_report.md")
+
+ @classmethod
+ def _get_outline_path(cls, report_id: str) -> str:
+ """获取大纲文件路径"""
+ return os.path.join(cls._get_report_folder(report_id), "outline.json")
+
+ @classmethod
+ def _get_progress_path(cls, report_id: str) -> str:
+ """获取进度文件路径"""
+ return os.path.join(cls._get_report_folder(report_id), "progress.json")
+
+ @classmethod
+ def _get_section_path(cls, report_id: str, section_index: int) -> str:
+ """获取章节Markdown文件路径"""
+ return os.path.join(cls._get_report_folder(report_id), f"section_{section_index:02d}.md")
+
+ @classmethod
+ def save_outline(cls, report_id: str, outline: ReportOutline) -> None:
+ """
+ 保存报告大纲
+
+ 在规划阶段完成后立即调用
+ """
+ cls._ensure_report_folder(report_id)
+
+ with open(cls._get_outline_path(report_id), 'w', encoding='utf-8') as f:
+ json.dump(outline.to_dict(), f, ensure_ascii=False, indent=2)
+
+ logger.info(f"大纲已保存: {report_id}")
+
+ @classmethod
+ def save_section(
+ cls,
+ report_id: str,
+ section_index: int,
+ section: ReportSection,
+ is_subsection: bool = False,
+ parent_index: int = None
+ ) -> str:
+ """
+ 保存单个章节
+
+ 在每个章节生成完成后立即调用,实现分章节输出
+
+ Args:
+ report_id: 报告ID
+ section_index: 章节索引(从1开始)
+ section: 章节对象
+ is_subsection: 是否是子章节
+ parent_index: 父章节索引(子章节时使用)
+
+ Returns:
+ 保存的文件路径
+ """
+ cls._ensure_report_folder(report_id)
+
+ # 确定章节级别和标题格式
+ if is_subsection and parent_index is not None:
+ level = "###"
+ file_suffix = f"section_{parent_index:02d}_{section_index:02d}.md"
+ else:
+ level = "##"
+ file_suffix = f"section_{section_index:02d}.md"
+
+ # 构建章节Markdown内容
+ md_content = f"{level} {section.title}\n\n"
+ if section.content:
+ md_content += f"{section.content}\n\n"
+
+ # 保存文件
+ file_path = os.path.join(cls._get_report_folder(report_id), file_suffix)
+ with open(file_path, 'w', encoding='utf-8') as f:
+ f.write(md_content)
+
+ logger.info(f"章节已保存: {report_id}/{file_suffix}")
+ return file_path
+
+ @classmethod
+ def update_progress(
+ cls,
+ report_id: str,
+ status: str,
+ progress: int,
+ message: str,
+ current_section: str = None,
+ completed_sections: List[str] = None
+ ) -> None:
+ """
+ 更新报告生成进度
+
+ 前端可以通过读取progress.json获取实时进度
+ """
+ cls._ensure_report_folder(report_id)
+
+ progress_data = {
+ "status": status,
+ "progress": progress,
+ "message": message,
+ "current_section": current_section,
+ "completed_sections": completed_sections or [],
+ "updated_at": datetime.now().isoformat()
+ }
+
+ with open(cls._get_progress_path(report_id), 'w', encoding='utf-8') as f:
+ json.dump(progress_data, f, ensure_ascii=False, indent=2)
+
+ @classmethod
+ def get_progress(cls, report_id: str) -> Optional[Dict[str, Any]]:
+ """获取报告生成进度"""
+ path = cls._get_progress_path(report_id)
+
+ if not os.path.exists(path):
+ return None
+
+ with open(path, 'r', encoding='utf-8') as f:
+ return json.load(f)
+
+ @classmethod
+ def get_generated_sections(cls, report_id: str) -> List[Dict[str, Any]]:
+ """
+ 获取已生成的章节列表
+
+ 返回所有已保存的章节文件信息
+ """
+ folder = cls._get_report_folder(report_id)
+
+ if not os.path.exists(folder):
+ return []
+
+ sections = []
+ for filename in sorted(os.listdir(folder)):
+ if filename.startswith('section_') and filename.endswith('.md'):
+ file_path = os.path.join(folder, filename)
+ with open(file_path, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ # 从文件名解析章节索引
+ parts = filename.replace('.md', '').split('_')
+ section_index = int(parts[1])
+ subsection_index = int(parts[2]) if len(parts) > 2 else None
+
+ sections.append({
+ "filename": filename,
+ "section_index": section_index,
+ "subsection_index": subsection_index,
+ "content": content,
+ "is_subsection": subsection_index is not None
+ })
+
+ return sections
+
+ @classmethod
+ def assemble_full_report(cls, report_id: str, outline: ReportOutline) -> str:
+ """
+ 组装完整报告
+
+ 从已保存的章节文件组装完整报告
+ """
+ folder = cls._get_report_folder(report_id)
+
+ # 构建报告头部
+ md_content = f"# {outline.title}\n\n"
+ md_content += f"> {outline.summary}\n\n"
+ md_content += f"---\n\n"
+
+ # 按顺序读取所有章节文件
+ sections = cls.get_generated_sections(report_id)
+ for section_info in sections:
+ md_content += section_info["content"]
+
+ # 保存完整报告
+ full_path = cls._get_report_markdown_path(report_id)
+ with open(full_path, 'w', encoding='utf-8') as f:
+ f.write(md_content)
+
+ logger.info(f"完整报告已组装: {report_id}")
+ return md_content
+
+ @classmethod
+ def save_report(cls, report: Report) -> None:
+ """保存报告元信息和完整报告"""
+ cls._ensure_report_folder(report.report_id)
+
+ # 保存元信息JSON
+ with open(cls._get_report_path(report.report_id), 'w', encoding='utf-8') as f:
+ json.dump(report.to_dict(), f, ensure_ascii=False, indent=2)
+
+ # 保存大纲
+ if report.outline:
+ cls.save_outline(report.report_id, report.outline)
+
+ # 保存完整Markdown报告
+ if report.markdown_content:
+ with open(cls._get_report_markdown_path(report.report_id), 'w', encoding='utf-8') as f:
+ f.write(report.markdown_content)
+
+ logger.info(f"报告已保存: {report.report_id}")
+
+ @classmethod
+ def get_report(cls, report_id: str) -> Optional[Report]:
+ """获取报告"""
+ path = cls._get_report_path(report_id)
+
+ if not os.path.exists(path):
+ # 兼容旧格式:检查直接存储在reports目录下的文件
+ old_path = os.path.join(cls.REPORTS_DIR, f"{report_id}.json")
+ if os.path.exists(old_path):
+ path = old_path
+ else:
+ return None
+
+ with open(path, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+
+ # 重建Report对象
+ outline = None
+ if data.get('outline'):
+ outline_data = data['outline']
+ sections = []
+ for s in outline_data.get('sections', []):
+ subsections = [
+ ReportSection(title=sub['title'], content=sub.get('content', ''))
+ for sub in s.get('subsections', [])
+ ]
+ sections.append(ReportSection(
+ title=s['title'],
+ content=s.get('content', ''),
+ subsections=subsections
+ ))
+ outline = ReportOutline(
+ title=outline_data['title'],
+ summary=outline_data['summary'],
+ sections=sections
+ )
+
+ # 如果markdown_content为空,尝试从full_report.md读取
+ markdown_content = data.get('markdown_content', '')
+ if not markdown_content:
+ full_report_path = cls._get_report_markdown_path(report_id)
+ if os.path.exists(full_report_path):
+ with open(full_report_path, 'r', encoding='utf-8') as f:
+ markdown_content = f.read()
+
+ return Report(
+ report_id=data['report_id'],
+ simulation_id=data['simulation_id'],
+ graph_id=data['graph_id'],
+ simulation_requirement=data['simulation_requirement'],
+ status=ReportStatus(data['status']),
+ outline=outline,
+ markdown_content=markdown_content,
+ created_at=data.get('created_at', ''),
+ completed_at=data.get('completed_at', ''),
+ error=data.get('error')
+ )
+
+ @classmethod
+ def get_report_by_simulation(cls, simulation_id: str) -> Optional[Report]:
+ """根据模拟ID获取报告"""
+ cls._ensure_reports_dir()
+
+ for item in os.listdir(cls.REPORTS_DIR):
+ item_path = os.path.join(cls.REPORTS_DIR, item)
+ # 新格式:文件夹
+ if os.path.isdir(item_path):
+ report = cls.get_report(item)
+ if report and report.simulation_id == simulation_id:
+ return report
+ # 兼容旧格式:JSON文件
+ elif item.endswith('.json'):
+ report_id = item[:-5]
+ report = cls.get_report(report_id)
+ if report and report.simulation_id == simulation_id:
+ return report
+
+ return None
+
+ @classmethod
+ def list_reports(cls, simulation_id: Optional[str] = None, limit: int = 50) -> List[Report]:
+ """列出报告"""
+ cls._ensure_reports_dir()
+
+ reports = []
+ for item in os.listdir(cls.REPORTS_DIR):
+ item_path = os.path.join(cls.REPORTS_DIR, item)
+ # 新格式:文件夹
+ if os.path.isdir(item_path):
+ report = cls.get_report(item)
+ if report:
+ if simulation_id is None or report.simulation_id == simulation_id:
+ reports.append(report)
+ # 兼容旧格式:JSON文件
+ elif item.endswith('.json'):
+ report_id = item[:-5]
+ report = cls.get_report(report_id)
+ if report:
+ if simulation_id is None or report.simulation_id == simulation_id:
+ reports.append(report)
+
+ # 按创建时间倒序
+ reports.sort(key=lambda r: r.created_at, reverse=True)
+
+ return reports[:limit]
+
+ @classmethod
+ def delete_report(cls, report_id: str) -> bool:
+ """删除报告(整个文件夹)"""
+ import shutil
+
+ folder_path = cls._get_report_folder(report_id)
+
+ # 新格式:删除整个文件夹
+ if os.path.exists(folder_path) and os.path.isdir(folder_path):
+ shutil.rmtree(folder_path)
+ logger.info(f"报告文件夹已删除: {report_id}")
+ return True
+
+ # 兼容旧格式:删除单独的文件
+ deleted = False
+ old_json_path = os.path.join(cls.REPORTS_DIR, f"{report_id}.json")
+ old_md_path = os.path.join(cls.REPORTS_DIR, f"{report_id}.md")
+
+ if os.path.exists(old_json_path):
+ os.remove(old_json_path)
+ deleted = True
+ if os.path.exists(old_md_path):
+ os.remove(old_md_path)
+ deleted = True
+
+ return deleted
diff --git a/backend/app/services/zep_tools.py b/backend/app/services/zep_tools.py
new file mode 100644
index 0000000..166cc97
--- /dev/null
+++ b/backend/app/services/zep_tools.py
@@ -0,0 +1,621 @@
+"""
+Zep检索工具服务
+封装图谱搜索、节点读取、边查询等工具,供Report Agent使用
+"""
+
+import time
+from typing import Dict, Any, List, Optional
+from dataclasses import dataclass
+
+from zep_cloud.client import Zep
+
+from ..config import Config
+from ..utils.logger import get_logger
+
+logger = get_logger('mirofish.zep_tools')
+
+
+@dataclass
+class SearchResult:
+ """搜索结果"""
+ facts: List[str]
+ edges: List[Dict[str, Any]]
+ nodes: List[Dict[str, Any]]
+ query: str
+ total_count: int
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "facts": self.facts,
+ "edges": self.edges,
+ "nodes": self.nodes,
+ "query": self.query,
+ "total_count": self.total_count
+ }
+
+ def to_text(self) -> str:
+ """转换为文本格式,供LLM理解"""
+ text_parts = [f"搜索查询: {self.query}", f"找到 {self.total_count} 条相关信息"]
+
+ if self.facts:
+ text_parts.append("\n### 相关事实:")
+ for i, fact in enumerate(self.facts, 1):
+ text_parts.append(f"{i}. {fact}")
+
+ return "\n".join(text_parts)
+
+
+@dataclass
+class NodeInfo:
+ """节点信息"""
+ uuid: str
+ name: str
+ labels: List[str]
+ summary: str
+ attributes: Dict[str, Any]
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "uuid": self.uuid,
+ "name": self.name,
+ "labels": self.labels,
+ "summary": self.summary,
+ "attributes": self.attributes
+ }
+
+ def to_text(self) -> str:
+ """转换为文本格式"""
+ entity_type = next((l for l in self.labels if l not in ["Entity", "Node"]), "未知类型")
+ return f"实体: {self.name} (类型: {entity_type})\n摘要: {self.summary}"
+
+
+@dataclass
+class EdgeInfo:
+ """边信息"""
+ uuid: str
+ name: str
+ fact: str
+ source_node_uuid: str
+ target_node_uuid: str
+ source_node_name: Optional[str] = None
+ target_node_name: Optional[str] = None
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "uuid": self.uuid,
+ "name": self.name,
+ "fact": self.fact,
+ "source_node_uuid": self.source_node_uuid,
+ "target_node_uuid": self.target_node_uuid,
+ "source_node_name": self.source_node_name,
+ "target_node_name": self.target_node_name
+ }
+
+ def to_text(self) -> str:
+ """转换为文本格式"""
+ source = self.source_node_name or self.source_node_uuid[:8]
+ target = self.target_node_name or self.target_node_uuid[:8]
+ return f"关系: {source} --[{self.name}]--> {target}\n事实: {self.fact}"
+
+
+class ZepToolsService:
+ """
+ Zep检索工具服务
+
+ 提供多种图谱检索工具,可以被Report Agent调用:
+ 1. search_graph - 图谱语义搜索
+ 2. get_all_nodes - 获取图谱所有节点
+ 3. get_all_edges - 获取图谱所有边
+ 4. get_node_detail - 获取节点详细信息
+ 5. get_node_edges - 获取节点相关的边
+ 6. get_entities_by_type - 按类型获取实体
+ 7. get_entity_summary - 获取实体的关系摘要
+ """
+
+ # 重试配置
+ MAX_RETRIES = 3
+ RETRY_DELAY = 2.0
+
+ def __init__(self, api_key: Optional[str] = None):
+ self.api_key = api_key or Config.ZEP_API_KEY
+ if not self.api_key:
+ raise ValueError("ZEP_API_KEY 未配置")
+
+ self.client = Zep(api_key=self.api_key)
+ logger.info("ZepToolsService 初始化完成")
+
+ def _call_with_retry(self, func, operation_name: str, max_retries: int = None):
+ """带重试机制的API调用"""
+ max_retries = max_retries or self.MAX_RETRIES
+ last_exception = None
+ delay = self.RETRY_DELAY
+
+ for attempt in range(max_retries):
+ try:
+ return func()
+ except Exception as e:
+ last_exception = e
+ if attempt < max_retries - 1:
+ logger.warning(
+ f"Zep {operation_name} 第 {attempt + 1} 次尝试失败: {str(e)[:100]}, "
+ f"{delay:.1f}秒后重试..."
+ )
+ time.sleep(delay)
+ delay *= 2
+ else:
+ logger.error(f"Zep {operation_name} 在 {max_retries} 次尝试后仍失败: {str(e)}")
+
+ raise last_exception
+
+ def search_graph(
+ self,
+ graph_id: str,
+ query: str,
+ limit: int = 10,
+ scope: str = "edges"
+ ) -> SearchResult:
+ """
+ 图谱语义搜索
+
+ 使用混合搜索(语义+BM25)在图谱中搜索相关信息。
+ 如果Zep Cloud的search API不可用,则降级为本地关键词匹配。
+
+ Args:
+ graph_id: 图谱ID (Standalone Graph)
+ query: 搜索查询
+ limit: 返回结果数量
+ scope: 搜索范围,"edges" 或 "nodes"
+
+ Returns:
+ SearchResult: 搜索结果
+ """
+ logger.info(f"图谱搜索: graph_id={graph_id}, query={query[:50]}...")
+
+ # 尝试使用Zep Cloud Search API
+ try:
+ search_results = self._call_with_retry(
+ func=lambda: self.client.graph.search(
+ graph_id=graph_id,
+ query=query,
+ limit=limit,
+ scope=scope,
+ reranker="cross_encoder"
+ ),
+ operation_name=f"图谱搜索(graph={graph_id})"
+ )
+
+ facts = []
+ edges = []
+ nodes = []
+
+ # 解析边搜索结果
+ if hasattr(search_results, 'edges') and search_results.edges:
+ for edge in search_results.edges:
+ if hasattr(edge, 'fact') and edge.fact:
+ facts.append(edge.fact)
+ edges.append({
+ "uuid": getattr(edge, 'uuid_', None) or getattr(edge, 'uuid', ''),
+ "name": getattr(edge, 'name', ''),
+ "fact": getattr(edge, 'fact', ''),
+ "source_node_uuid": getattr(edge, 'source_node_uuid', ''),
+ "target_node_uuid": getattr(edge, 'target_node_uuid', ''),
+ })
+
+ # 解析节点搜索结果
+ if hasattr(search_results, 'nodes') and search_results.nodes:
+ for node in search_results.nodes:
+ nodes.append({
+ "uuid": getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''),
+ "name": getattr(node, 'name', ''),
+ "labels": getattr(node, 'labels', []),
+ "summary": getattr(node, 'summary', ''),
+ })
+ # 节点摘要也算作事实
+ if hasattr(node, 'summary') and node.summary:
+ facts.append(f"[{node.name}]: {node.summary}")
+
+ logger.info(f"搜索完成: 找到 {len(facts)} 条相关事实")
+
+ return SearchResult(
+ facts=facts,
+ edges=edges,
+ nodes=nodes,
+ query=query,
+ total_count=len(facts)
+ )
+
+ except Exception as e:
+ logger.warning(f"Zep Search API失败,降级为本地搜索: {str(e)}")
+ # 降级:使用本地关键词匹配搜索
+ return self._local_search(graph_id, query, limit, scope)
+
+ def _local_search(
+ self,
+ graph_id: str,
+ query: str,
+ limit: int = 10,
+ scope: str = "edges"
+ ) -> SearchResult:
+ """
+ 本地关键词匹配搜索(作为Zep Search API的降级方案)
+
+ 获取所有边/节点,然后在本地进行关键词匹配
+
+ Args:
+ graph_id: 图谱ID
+ query: 搜索查询
+ limit: 返回结果数量
+ scope: 搜索范围
+
+ Returns:
+ SearchResult: 搜索结果
+ """
+ logger.info(f"使用本地搜索: query={query[:30]}...")
+
+ facts = []
+ edges_result = []
+ nodes_result = []
+
+ # 提取查询关键词(简单分词)
+ query_lower = query.lower()
+ keywords = [w.strip() for w in query_lower.replace(',', ' ').replace(',', ' ').split() if len(w.strip()) > 1]
+
+ def match_score(text: str) -> int:
+ """计算文本与查询的匹配分数"""
+ if not text:
+ return 0
+ text_lower = text.lower()
+ # 完全匹配查询
+ if query_lower in text_lower:
+ return 100
+ # 关键词匹配
+ score = 0
+ for keyword in keywords:
+ if keyword in text_lower:
+ score += 10
+ return score
+
+ try:
+ if scope in ["edges", "both"]:
+ # 获取所有边并匹配
+ all_edges = self.get_all_edges(graph_id)
+ scored_edges = []
+ for edge in all_edges:
+ score = match_score(edge.fact) + match_score(edge.name)
+ if score > 0:
+ scored_edges.append((score, edge))
+
+ # 按分数排序
+ scored_edges.sort(key=lambda x: x[0], reverse=True)
+
+ for score, edge in scored_edges[:limit]:
+ if edge.fact:
+ facts.append(edge.fact)
+ edges_result.append({
+ "uuid": edge.uuid,
+ "name": edge.name,
+ "fact": edge.fact,
+ "source_node_uuid": edge.source_node_uuid,
+ "target_node_uuid": edge.target_node_uuid,
+ })
+
+ if scope in ["nodes", "both"]:
+ # 获取所有节点并匹配
+ all_nodes = self.get_all_nodes(graph_id)
+ scored_nodes = []
+ for node in all_nodes:
+ score = match_score(node.name) + match_score(node.summary)
+ if score > 0:
+ scored_nodes.append((score, node))
+
+ scored_nodes.sort(key=lambda x: x[0], reverse=True)
+
+ for score, node in scored_nodes[:limit]:
+ nodes_result.append({
+ "uuid": node.uuid,
+ "name": node.name,
+ "labels": node.labels,
+ "summary": node.summary,
+ })
+ if node.summary:
+ facts.append(f"[{node.name}]: {node.summary}")
+
+ logger.info(f"本地搜索完成: 找到 {len(facts)} 条相关事实")
+
+ except Exception as e:
+ logger.error(f"本地搜索失败: {str(e)}")
+
+ return SearchResult(
+ facts=facts,
+ edges=edges_result,
+ nodes=nodes_result,
+ query=query,
+ total_count=len(facts)
+ )
+
+ def get_all_nodes(self, graph_id: str) -> List[NodeInfo]:
+ """
+ 获取图谱的所有节点
+
+ Args:
+ graph_id: 图谱ID
+
+ Returns:
+ 节点列表
+ """
+ logger.info(f"获取图谱 {graph_id} 的所有节点...")
+
+ nodes = self._call_with_retry(
+ func=lambda: self.client.graph.node.get_by_graph_id(graph_id=graph_id),
+ operation_name=f"获取节点(graph={graph_id})"
+ )
+
+ result = []
+ for node in nodes:
+ result.append(NodeInfo(
+ uuid=getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''),
+ name=node.name or "",
+ labels=node.labels or [],
+ summary=node.summary or "",
+ attributes=node.attributes or {}
+ ))
+
+ logger.info(f"获取到 {len(result)} 个节点")
+ return result
+
+ def get_all_edges(self, graph_id: str) -> List[EdgeInfo]:
+ """
+ 获取图谱的所有边
+
+ Args:
+ graph_id: 图谱ID
+
+ Returns:
+ 边列表
+ """
+ logger.info(f"获取图谱 {graph_id} 的所有边...")
+
+ edges = self._call_with_retry(
+ func=lambda: self.client.graph.edge.get_by_graph_id(graph_id=graph_id),
+ operation_name=f"获取边(graph={graph_id})"
+ )
+
+ result = []
+ for edge in edges:
+ result.append(EdgeInfo(
+ uuid=getattr(edge, 'uuid_', None) or getattr(edge, 'uuid', ''),
+ name=edge.name or "",
+ fact=edge.fact or "",
+ source_node_uuid=edge.source_node_uuid or "",
+ target_node_uuid=edge.target_node_uuid or ""
+ ))
+
+ logger.info(f"获取到 {len(result)} 条边")
+ return result
+
+ def get_node_detail(self, node_uuid: str) -> Optional[NodeInfo]:
+ """
+ 获取单个节点的详细信息
+
+ Args:
+ node_uuid: 节点UUID
+
+ Returns:
+ 节点信息或None
+ """
+ logger.info(f"获取节点详情: {node_uuid[:8]}...")
+
+ try:
+ node = self._call_with_retry(
+ func=lambda: self.client.graph.node.get(uuid_=node_uuid),
+ operation_name=f"获取节点详情(uuid={node_uuid[:8]}...)"
+ )
+
+ if not node:
+ return None
+
+ return NodeInfo(
+ uuid=getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''),
+ name=node.name or "",
+ labels=node.labels or [],
+ summary=node.summary or "",
+ attributes=node.attributes or {}
+ )
+ except Exception as e:
+ logger.error(f"获取节点详情失败: {str(e)}")
+ return None
+
+ def get_node_edges(self, graph_id: str, node_uuid: str) -> List[EdgeInfo]:
+ """
+ 获取节点相关的所有边
+
+ 通过获取图谱所有边,然后过滤出与指定节点相关的边
+
+ Args:
+ graph_id: 图谱ID
+ node_uuid: 节点UUID
+
+ Returns:
+ 边列表
+ """
+ logger.info(f"获取节点 {node_uuid[:8]}... 的相关边")
+
+ try:
+ # 获取图谱所有边,然后过滤
+ all_edges = self.get_all_edges(graph_id)
+
+ result = []
+ for edge in all_edges:
+ # 检查边是否与指定节点相关(作为源或目标)
+ if edge.source_node_uuid == node_uuid or edge.target_node_uuid == node_uuid:
+ result.append(edge)
+
+ logger.info(f"找到 {len(result)} 条与节点相关的边")
+ return result
+
+ except Exception as e:
+ logger.warning(f"获取节点边失败: {str(e)}")
+ return []
+
+ def get_entities_by_type(
+ self,
+ graph_id: str,
+ entity_type: str
+ ) -> List[NodeInfo]:
+ """
+ 按类型获取实体
+
+ Args:
+ graph_id: 图谱ID
+ entity_type: 实体类型(如 Student, PublicFigure 等)
+
+ Returns:
+ 符合类型的实体列表
+ """
+ logger.info(f"获取类型为 {entity_type} 的实体...")
+
+ all_nodes = self.get_all_nodes(graph_id)
+
+ filtered = []
+ for node in all_nodes:
+ # 检查labels是否包含指定类型
+ if entity_type in node.labels:
+ filtered.append(node)
+
+ logger.info(f"找到 {len(filtered)} 个 {entity_type} 类型的实体")
+ return filtered
+
+ def get_entity_summary(
+ self,
+ graph_id: str,
+ entity_name: str
+ ) -> Dict[str, Any]:
+ """
+ 获取指定实体的关系摘要
+
+ 搜索与该实体相关的所有信息,并生成摘要
+
+ Args:
+ graph_id: 图谱ID
+ entity_name: 实体名称
+
+ Returns:
+ 实体摘要信息
+ """
+ logger.info(f"获取实体 {entity_name} 的关系摘要...")
+
+ # 先搜索该实体相关的信息
+ search_result = self.search_graph(
+ graph_id=graph_id,
+ query=entity_name,
+ limit=20
+ )
+
+ # 尝试在所有节点中找到该实体
+ all_nodes = self.get_all_nodes(graph_id)
+ entity_node = None
+ for node in all_nodes:
+ if node.name.lower() == entity_name.lower():
+ entity_node = node
+ break
+
+ related_edges = []
+ if entity_node:
+ # 传入graph_id参数
+ related_edges = self.get_node_edges(graph_id, entity_node.uuid)
+
+ return {
+ "entity_name": entity_name,
+ "entity_info": entity_node.to_dict() if entity_node else None,
+ "related_facts": search_result.facts,
+ "related_edges": [e.to_dict() for e in related_edges],
+ "total_relations": len(related_edges)
+ }
+
+ def get_graph_statistics(self, graph_id: str) -> Dict[str, Any]:
+ """
+ 获取图谱的统计信息
+
+ Args:
+ graph_id: 图谱ID
+
+ Returns:
+ 统计信息
+ """
+ logger.info(f"获取图谱 {graph_id} 的统计信息...")
+
+ nodes = self.get_all_nodes(graph_id)
+ edges = self.get_all_edges(graph_id)
+
+ # 统计实体类型分布
+ entity_types = {}
+ for node in nodes:
+ for label in node.labels:
+ if label not in ["Entity", "Node"]:
+ entity_types[label] = entity_types.get(label, 0) + 1
+
+ # 统计关系类型分布
+ relation_types = {}
+ for edge in edges:
+ relation_types[edge.name] = relation_types.get(edge.name, 0) + 1
+
+ return {
+ "graph_id": graph_id,
+ "total_nodes": len(nodes),
+ "total_edges": len(edges),
+ "entity_types": entity_types,
+ "relation_types": relation_types
+ }
+
+ def get_simulation_context(
+ self,
+ graph_id: str,
+ simulation_requirement: str,
+ limit: int = 30
+ ) -> Dict[str, Any]:
+ """
+ 获取模拟相关的上下文信息
+
+ 综合搜索与模拟需求相关的所有信息
+
+ Args:
+ graph_id: 图谱ID
+ simulation_requirement: 模拟需求描述
+ limit: 每类信息的数量限制
+
+ Returns:
+ 模拟上下文信息
+ """
+ logger.info(f"获取模拟上下文: {simulation_requirement[:50]}...")
+
+ # 搜索与模拟需求相关的信息
+ search_result = self.search_graph(
+ graph_id=graph_id,
+ query=simulation_requirement,
+ limit=limit
+ )
+
+ # 获取图谱统计
+ stats = self.get_graph_statistics(graph_id)
+
+ # 获取所有实体节点
+ all_nodes = self.get_all_nodes(graph_id)
+
+ # 筛选有实际类型的实体(非纯Entity节点)
+ entities = []
+ for node in all_nodes:
+ custom_labels = [l for l in node.labels if l not in ["Entity", "Node"]]
+ if custom_labels:
+ entities.append({
+ "name": node.name,
+ "type": custom_labels[0],
+ "summary": node.summary
+ })
+
+ return {
+ "simulation_requirement": simulation_requirement,
+ "related_facts": search_result.facts,
+ "graph_statistics": stats,
+ "entities": entities[:limit], # 限制数量
+ "total_entities": len(entities)
+ }
diff --git a/backend/requirements.txt b/backend/requirements.txt
index 1a0d82d..80cc5fa 100644
--- a/backend/requirements.txt
+++ b/backend/requirements.txt
@@ -24,3 +24,8 @@ werkzeug>=3.0.0
oasis-ai>=0.1.0
camel-ai>=0.2.0
+# LangChain框架(用于Report Agent)
+langchain>=0.2.0
+langchain-core>=0.2.0
+langchain-openai>=0.1.0
+