From a0c97b1344141337bfaf077ef720c11e7e763bd6 Mon Sep 17 00:00:00 2001
From: 666ghj <670939375@qq.com>
Date: Tue, 9 Dec 2025 16:06:53 +0800
Subject: [PATCH] Enhance Report Agent and Zep Tools with new search
functionalities
- Introduced new core search tools in the Report Agent: InsightForge for deep insights, PanoramaSearch for comprehensive views, and QuickSearch for rapid queries.
- Updated the Report Agent to prioritize tool usage for data retrieval, ensuring all report content is based on simulation results rather than internal knowledge.
- Enhanced the ZepToolsService with methods for InsightForge and PanoramaSearch, allowing for multi-dimensional queries and historical data retrieval.
- Improved documentation to reflect the new functionalities and usage guidelines for the Report Agent and Zep tools.
---
backend/app/services/report_agent.py | 440 ++++++++++++++++-----
backend/app/services/zep_tools.py | 568 ++++++++++++++++++++++++++-
2 files changed, 889 insertions(+), 119 deletions(-)
diff --git a/backend/app/services/report_agent.py b/backend/app/services/report_agent.py
index 3ec4be3..1c54b5d 100644
--- a/backend/app/services/report_agent.py
+++ b/backend/app/services/report_agent.py
@@ -21,7 +21,12 @@ from enum import Enum
from ..config import Config
from ..utils.llm_client import LLMClient
from ..utils.logger import get_logger
-from .zep_tools import ZepToolsService, SearchResult
+from .zep_tools import (
+ ZepToolsService,
+ SearchResult,
+ InsightForgeResult,
+ PanoramaResult
+)
logger = get_logger('mirofish.report_agent')
@@ -120,19 +125,23 @@ class ReportAgent:
2. 生成阶段:逐章节生成内容,每章节可多次调用工具获取信息
3. 反思阶段:检查内容完整性和准确性
- 工具(MCP封装):
- - search_graph: 图谱语义搜索
- - get_graph_statistics: 获取图谱统计
- - get_entity_summary: 获取实体摘要
- - get_simulation_context: 获取模拟上下文
+ 【核心检索工具 - 优化后】
+ - insight_forge: 深度洞察检索(最强大,自动分解问题,多维度检索)
+ - panorama_search: 广度搜索(获取全貌,包括历史/过期内容)
+ - quick_search: 简单搜索(快速检索)
+
+ 【重要】Report Agent必须优先调用工具获取模拟数据,而非使用自身知识!
"""
- # 最大工具调用次数(每个章节)
- MAX_TOOL_CALLS_PER_SECTION = 5
+ # 最大工具调用次数(每个章节)- 增加上限以鼓励更多检索
+ MAX_TOOL_CALLS_PER_SECTION = 10
# 最大反思轮数
MAX_REFLECTION_ROUNDS = 2
+ # 对话中的最大工具调用次数
+ MAX_TOOL_CALLS_PER_CHAT = 8
+
def __init__(
self,
graph_id: str,
@@ -164,51 +173,88 @@ class ReportAgent:
logger.info(f"ReportAgent 初始化完成: graph_id={graph_id}, simulation_id={simulation_id}")
def _define_tools(self) -> Dict[str, Dict[str, Any]]:
- """定义可用工具"""
+ """
+ 定义可用工具
+
+ 【重要】这三个工具是专门为从模拟图谱中检索信息设计的,
+ 必须优先使用这些工具获取数据,而不是使用LLM自身的知识!
+ """
return {
- "search_graph": {
- "name": "search_graph",
- "description": "在知识图谱中搜索相关信息。输入搜索查询,返回与查询相关的事实和关系。",
+ "insight_forge": {
+ "name": "insight_forge",
+ "description": """【深度洞察检索 - 最强大的检索工具】
+这是我们最强大的检索函数,专为深度分析设计。它会:
+1. 自动将你的问题分解为多个子问题
+2. 从多个维度检索模拟图谱中的信息
+3. 整合语义搜索、实体分析、关系链追踪的结果
+4. 返回最全面、最深度的检索内容
+
+【使用场景】
+- 需要深入分析某个话题
+- 需要了解事件的多个方面
+- 需要获取支撑报告章节的丰富素材
+
+【返回内容】
+- 相关事实原文(可直接引用)
+- 核心实体洞察
+- 关系链分析""",
+ "parameters": {
+ "query": "你想深入分析的问题或话题",
+ "report_context": "当前报告章节的上下文(可选,有助于生成更精准的子问题)"
+ },
+ "priority": "high"
+ },
+ "panorama_search": {
+ "name": "panorama_search",
+ "description": """【广度搜索 - 获取全貌视图】
+这个工具用于获取模拟结果的完整全貌,特别适合了解事件演变过程。它会:
+1. 获取所有相关节点和关系
+2. 区分当前有效的事实和历史/过期的事实
+3. 帮助你了解舆情是如何演变的
+
+【使用场景】
+- 需要了解事件的完整发展脉络
+- 需要对比不同阶段的舆情变化
+- 需要获取全面的实体和关系信息
+
+【返回内容】
+- 当前有效事实(模拟最新结果)
+- 历史/过期事实(演变记录)
+- 所有涉及的实体""",
+ "parameters": {
+ "query": "搜索查询,用于相关性排序",
+ "include_expired": "是否包含过期/历史内容(默认True)"
+ },
+ "priority": "medium"
+ },
+ "quick_search": {
+ "name": "quick_search",
+ "description": """【简单搜索 - 快速检索】
+轻量级的快速检索工具,适合简单、直接的信息查询。
+
+【使用场景】
+- 需要快速查找某个具体信息
+- 需要验证某个事实
+- 简单的信息检索
+
+【返回内容】
+- 与查询最相关的事实列表""",
"parameters": {
"query": "搜索查询字符串",
"limit": "返回结果数量(可选,默认10)"
- }
- },
- "get_graph_statistics": {
- "name": "get_graph_statistics",
- "description": "获取知识图谱的统计信息,包括节点数量、边数量、实体类型分布等。",
- "parameters": {}
- },
- "get_entity_summary": {
- "name": "get_entity_summary",
- "description": "获取指定实体的详细信息和关系摘要。",
- "parameters": {
- "entity_name": "实体名称"
- }
- },
- "get_simulation_context": {
- "name": "get_simulation_context",
- "description": "获取与模拟需求相关的上下文信息,包括相关事实、实体列表等。",
- "parameters": {
- "query": "额外的查询条件(可选)"
- }
- },
- "get_entities_by_type": {
- "name": "get_entities_by_type",
- "description": "按类型获取实体列表,如获取所有Student类型或PublicFigure类型的实体。",
- "parameters": {
- "entity_type": "实体类型名称"
- }
+ },
+ "priority": "low"
}
}
- def _execute_tool(self, tool_name: str, parameters: Dict[str, Any]) -> str:
+ def _execute_tool(self, tool_name: str, parameters: Dict[str, Any], report_context: str = "") -> str:
"""
执行工具调用
Args:
tool_name: 工具名称
parameters: 工具参数
+ report_context: 报告上下文(用于InsightForge)
Returns:
工具执行结果(文本格式)
@@ -216,16 +262,53 @@ class ReportAgent:
logger.info(f"执行工具: {tool_name}, 参数: {parameters}")
try:
- if tool_name == "search_graph":
+ # ========== 核心检索工具(优化后) ==========
+
+ if tool_name == "insight_forge":
+ # 深度洞察检索 - 最强大的工具
+ query = parameters.get("query", "")
+ ctx = parameters.get("report_context", "") or report_context
+ result = self.zep_tools.insight_forge(
+ graph_id=self.graph_id,
+ query=query,
+ simulation_requirement=self.simulation_requirement,
+ report_context=ctx
+ )
+ return result.to_text()
+
+ elif tool_name == "panorama_search":
+ # 广度搜索 - 获取全貌
+ query = parameters.get("query", "")
+ include_expired = parameters.get("include_expired", True)
+ if isinstance(include_expired, str):
+ include_expired = include_expired.lower() in ['true', '1', 'yes']
+ result = self.zep_tools.panorama_search(
+ graph_id=self.graph_id,
+ query=query,
+ include_expired=include_expired
+ )
+ return result.to_text()
+
+ elif tool_name == "quick_search":
+ # 简单搜索 - 快速检索
query = parameters.get("query", "")
limit = parameters.get("limit", 10)
- result = self.zep_tools.search_graph(
+ if isinstance(limit, str):
+ limit = int(limit)
+ result = self.zep_tools.quick_search(
graph_id=self.graph_id,
query=query,
limit=limit
)
return result.to_text()
+ # ========== 向后兼容的旧工具(内部重定向到新工具) ==========
+
+ elif tool_name == "search_graph":
+ # 重定向到 quick_search
+ logger.info("search_graph 已重定向到 quick_search")
+ return self._execute_tool("quick_search", parameters, report_context)
+
elif tool_name == "get_graph_statistics":
result = self.zep_tools.get_graph_statistics(self.graph_id)
return json.dumps(result, ensure_ascii=False, indent=2)
@@ -239,12 +322,10 @@ class ReportAgent:
return json.dumps(result, ensure_ascii=False, indent=2)
elif tool_name == "get_simulation_context":
+ # 重定向到 insight_forge,因为它更强大
+ logger.info("get_simulation_context 已重定向到 insight_forge")
query = parameters.get("query", self.simulation_requirement)
- result = self.zep_tools.get_simulation_context(
- graph_id=self.graph_id,
- simulation_requirement=query
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
+ return self._execute_tool("insight_forge", {"query": query}, report_context)
elif tool_name == "get_entities_by_type":
entity_type = parameters.get("entity_type", "")
@@ -256,7 +337,7 @@ class ReportAgent:
return json.dumps(result, ensure_ascii=False, indent=2)
else:
- return f"未知工具: {tool_name}"
+ return f"未知工具: {tool_name}。请使用以下工具之一: insight_forge, panorama_search, quick_search"
except Exception as e:
logger.error(f"工具执行失败: {tool_name}, 错误: {str(e)}")
@@ -473,7 +554,7 @@ class ReportAgent:
"""
logger.info(f"ReACT生成章节: {section.title}")
- # 构建系统prompt
+ # 构建系统prompt - 优化后强调工具使用和引用原文
system_prompt = f"""你是一个专业的舆情分析报告撰写专家,正在撰写报告的一个章节。
报告标题: {outline.title}
@@ -482,53 +563,103 @@ class ReportAgent:
当前要撰写的章节: {section.title}
-你可以使用以下工具来获取信息,每次最多调用{self.MAX_TOOL_CALLS_PER_SECTION}次:
+═══════════════════════════════════════════════════════════════
+【最重要的规则 - 必须遵守】
+═══════════════════════════════════════════════════════════════
+
+1. 【必须调用工具获取数据】
+ - 你正在撰写的是基于模拟结果的分析报告
+ - 所有内容必须来自模拟图谱中的真实数据
+ - 禁止使用你自己的知识来编写报告内容
+ - 每个章节至少调用1-3次工具获取相关信息
+
+2. 【必须引用模拟结果原文】
+ - 检索到的事实原文是最有价值的内容
+ - 在报告中使用引用格式展示这些原文,例如:
+ > "原文内容..."
+ - 这些原文证明了模拟的真实效果
+
+3. 【尊重模拟结果】
+ - 报告内容必须反映模拟中实际发生的情况
+ - 不要添加模拟中不存在的信息
+ - 如果某方面信息不足,如实说明
+
+═══════════════════════════════════════════════════════════════
+【可用检索工具】(建议每章节调用2-5次)
+═══════════════════════════════════════════════════════════════
{self._get_tools_description()}
-请按照以下ReACT格式进行思考和行动:
+【工具使用建议】
+- insight_forge: 用于深度分析,会自动分解问题并多维度检索
+- panorama_search: 用于了解全貌和演变过程
+- quick_search: 用于快速验证某个具体信息
-Thought: [分析当前需要什么信息来撰写这个章节]
-Action: [如果需要信息,调用工具]
-
-{{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}}
-
+═══════════════════════════════════════════════════════════════
+【ReACT工作流程】
+═══════════════════════════════════════════════════════════════
-当收集到足够信息后,输出:
-Final Answer:
-[章节的完整Markdown内容]
+1. Thought: [分析需要什么信息,规划检索策略]
+2. Action: [调用工具获取信息]
+
+ {{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}}
+
+3. Observation: [分析工具返回的结果]
+4. 重复步骤1-3,直到收集到足够信息(建议2-5轮)
+5. Final Answer: [基于检索结果撰写章节内容]
-注意:
-1. 内容要专业、客观、有深度
-2. 引用具体的数据和事实
-3. 保持与其他章节的逻辑连贯性
-4. 使用适当的Markdown格式(列表、强调等)
-5. 不要重复前面章节已经详细描述的内容"""
+═══════════════════════════════════════════════════════════════
+【章节内容要求】
+═══════════════════════════════════════════════════════════════
- # 构建用户prompt
+1. 内容必须基于工具检索到的模拟数据
+2. 大量引用原文来展示模拟效果
+3. 使用Markdown格式:
+ - 使用 > 引用重要原文
+ - 使用 **粗体** 强调关键信息
+ - 使用列表组织要点
+4. 保持与其他章节的逻辑连贯性
+5. 不要重复前面章节已详细描述的内容"""
+
+ # 构建用户prompt - 强调必须调用工具
previous_content = "\n\n".join(previous_sections) if previous_sections else "(这是第一个章节)"
- user_prompt = f"""已完成的章节内容:
+ user_prompt = f"""已完成的章节内容(参考以保持连贯性):
{previous_content[:2000]}
-现在请撰写章节: {section.title}
+═══════════════════════════════════════════════════════════════
+【当前任务】撰写章节: {section.title}
+═══════════════════════════════════════════════════════════════
-首先思考需要什么信息,然后调用工具获取,最后生成内容。"""
+【重要提醒】
+1. 开始前必须先调用工具获取模拟数据!
+2. 推荐先使用 insight_forge 进行深度检索
+3. 如需了解全貌可使用 panorama_search
+4. 报告内容必须来自检索结果,不要使用自己的知识
+
+请开始:
+1. 首先思考(Thought)这个章节需要什么信息
+2. 然后调用工具(Action)获取模拟数据
+3. 收集足够信息后输出 Final Answer"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
- # ReACT循环
+ # ReACT循环 - 优化后增加工具调用次数
tool_calls_count = 0
- max_iterations = self.MAX_TOOL_CALLS_PER_SECTION + 2
+ max_iterations = self.MAX_TOOL_CALLS_PER_SECTION + 3 # 增加迭代次数
+ min_tool_calls = 2 # 最少工具调用次数
+
+ # 报告上下文,用于InsightForge的子问题生成
+ report_context = f"章节标题: {section.title}\n模拟需求: {self.simulation_requirement}"
for iteration in range(max_iterations):
if progress_callback:
progress_callback(
"generating",
int((iteration / max_iterations) * 100),
- f"思考与行动中 ({tool_calls_count}/{self.MAX_TOOL_CALLS_PER_SECTION})"
+ f"深度检索与撰写中 ({tool_calls_count}/{self.MAX_TOOL_CALLS_PER_SECTION})"
)
# 调用LLM
@@ -542,21 +673,55 @@ Final Answer:
# 检查是否有最终答案
if "Final Answer:" in response:
+ # 如果工具调用次数不足,提醒需要更多检索
+ if tool_calls_count < min_tool_calls:
+ messages.append({"role": "assistant", "content": response})
+ messages.append({
+ "role": "user",
+ "content": f"""【注意】你只调用了{tool_calls_count}次工具,信息可能不够充分。
+
+请再调用1-2次工具来获取更多模拟数据,然后再输出 Final Answer。
+建议:
+- 使用 insight_forge 深度检索更多细节
+- 使用 panorama_search 了解事件全貌
+
+记住:报告内容必须来自模拟结果,而不是你的知识!"""
+ })
+ continue
+
# 提取最终答案
final_answer = response.split("Final Answer:")[-1].strip()
- logger.info(f"章节 {section.title} 生成完成")
+ logger.info(f"章节 {section.title} 生成完成(工具调用: {tool_calls_count}次)")
return final_answer
# 解析工具调用
tool_calls = self._parse_tool_calls(response)
if not tool_calls:
- # 没有工具调用也没有最终答案,提示生成最终答案
+ # 没有工具调用也没有最终答案
messages.append({"role": "assistant", "content": response})
- messages.append({
- "role": "user",
- "content": "请基于已有信息,输出 Final Answer: 并生成章节内容。"
- })
+
+ if tool_calls_count < min_tool_calls:
+ # 还没有足够的工具调用,强烈提示需要调用工具
+ messages.append({
+ "role": "user",
+ "content": f"""【重要】你还没有调用足够的工具来获取模拟数据!
+
+当前只调用了 {tool_calls_count} 次工具,至少需要 {min_tool_calls} 次。
+
+请立即调用工具获取信息:
+
+{{"name": "insight_forge", "parameters": {{"query": "{section.title}相关的模拟结果和分析"}}}}
+
+
+【记住】报告内容必须100%来自模拟结果,不能使用你自己的知识!"""
+ })
+ else:
+ # 已有足够调用,可以生成最终答案
+ messages.append({
+ "role": "user",
+ "content": "你已经获取了足够的模拟数据。请基于检索到的信息,输出 Final Answer: 并撰写章节内容。\n\n【重要】内容必须大量引用检索到的原文,使用 > 格式引用。"
+ })
continue
# 执行工具调用
@@ -565,15 +730,29 @@ Final Answer:
if tool_calls_count >= self.MAX_TOOL_CALLS_PER_SECTION:
break
- result = self._execute_tool(call["name"], call.get("parameters", {}))
- tool_results.append(f"工具 {call['name']} 返回:\n{result}")
+ result = self._execute_tool(
+ call["name"],
+ call.get("parameters", {}),
+ report_context=report_context
+ )
+ tool_results.append(f"═══ 工具 {call['name']} 返回 ═══\n{result}")
tool_calls_count += 1
# 将结果添加到消息
messages.append({"role": "assistant", "content": response})
messages.append({
"role": "user",
- "content": f"Observation:\n" + "\n\n".join(tool_results) + "\n\n请继续思考或输出 Final Answer:"
+ "content": f"""Observation(检索结果):
+
+{"".join(tool_results)}
+
+═══════════════════════════════════════════════════════════════
+【下一步行动】
+- 如果信息充分:输出 Final Answer 并撰写章节内容(必须引用上述原文)
+- 如果需要更多信息:继续调用工具检索
+
+已调用工具 {tool_calls_count}/{self.MAX_TOOL_CALLS_PER_SECTION} 次
+═══════════════════════════════════════════════════════════════"""
})
# 达到最大迭代次数,强制生成内容
@@ -833,23 +1012,52 @@ Final Answer:
system_prompt = f"""你是一个专业的舆情分析助手,负责回答关于模拟分析报告的问题。
+═══════════════════════════════════════════════════════════════
+【背景信息】
+═══════════════════════════════════════════════════════════════
模拟需求: {self.simulation_requirement}
图谱ID: {self.graph_id}
-你可以使用以下工具来获取信息:
+═══════════════════════════════════════════════════════════════
+【最重要的规则 - 必须遵守】
+═══════════════════════════════════════════════════════════════
+
+1. 【必须调用工具获取数据】
+ - 你的回答必须基于模拟图谱中的真实数据
+ - 禁止使用你自己的知识来回答问题
+ - 每次回答前至少调用1次工具获取相关信息
+
+2. 【必须引用模拟结果原文】
+ - 检索到的事实原文是最有价值的内容
+ - 在回答中使用引用格式展示这些原文,例如:
+ > "原文内容..."
+ - 原文引用证明了答案的可靠性
+
+3. 【尊重模拟结果】
+ - 回答必须反映模拟中实际发生的情况
+ - 不要添加模拟中不存在的信息
+ - 如果某方面信息不足,如实说明
+
+═══════════════════════════════════════════════════════════════
+【可用检索工具】
+═══════════════════════════════════════════════════════════════
{self._get_tools_description()}
-工具调用格式:
+【工具调用格式】
{{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}}
-回答要求:
-1. 基于事实和数据回答
-2. 引用具体信息来源
-3. 如果不确定,说明信息限制
-4. 保持专业和客观"""
+═══════════════════════════════════════════════════════════════
+【回答要求】
+═══════════════════════════════════════════════════════════════
+
+1. 先调用工具获取模拟数据,再回答问题
+2. 大量引用检索到的原文
+3. 使用 > 格式引用重要内容
+4. 如果信息不足,如实说明限制
+5. 保持专业和客观"""
# 构建消息
messages = [{"role": "system", "content": system_prompt}]
@@ -858,11 +1066,18 @@ Final Answer:
for h in chat_history[-10:]: # 限制历史长度
messages.append(h)
- messages.append({"role": "user", "content": message})
+ # 添加用户消息,强调需要先检索
+ messages.append({
+ "role": "user",
+ "content": f"""{message}
+
+【提醒】请先调用工具获取模拟数据,再回答问题。推荐使用 insight_forge 进行深度检索。"""
+ })
- # ReACT循环
+ # ReACT循环 - 增加迭代次数以支持更多工具调用
tool_calls_made = []
- max_iterations = 3
+ max_iterations = self.MAX_TOOL_CALLS_PER_CHAT
+ min_tool_calls = 1 # 最少工具调用次数
for iteration in range(max_iterations):
response = self.llm.chat(
@@ -875,33 +1090,54 @@ Final Answer:
tool_calls = self._parse_tool_calls(response)
if not tool_calls:
- # 没有工具调用,返回响应
- # 清理响应中的工具调用标记
+ # 没有工具调用
+ if len(tool_calls_made) < min_tool_calls and iteration < 2:
+ # 还没有调用过工具,强烈提示需要先检索
+ messages.append({"role": "assistant", "content": response})
+ messages.append({
+ "role": "user",
+ "content": f"""【重要】你还没有调用工具获取模拟数据!
+
+请先调用工具检索相关信息:
+
+{{"name": "insight_forge", "parameters": {{"query": "{message[:100]}"}}}}
+
+
+【记住】回答必须基于模拟结果,不能使用你自己的知识!"""
+ })
+ continue
+
+ # 已有工具调用,清理响应并返回
clean_response = re.sub(r'.*?', '', response, flags=re.DOTALL)
clean_response = re.sub(r'\[TOOL_CALL\].*?\)', '', clean_response)
return {
"response": clean_response.strip(),
"tool_calls": tool_calls_made,
- "sources": []
+ "sources": [tc.get("parameters", {}).get("query", "") for tc in tool_calls_made]
}
# 执行工具调用
tool_results = []
for call in tool_calls:
+ if len(tool_calls_made) >= self.MAX_TOOL_CALLS_PER_CHAT:
+ break
result = self._execute_tool(call["name"], call.get("parameters", {}))
tool_results.append({
"tool": call["name"],
- "result": result[:1000] # 限制长度
+ "result": result[:2000] # 增加结果长度限制
})
tool_calls_made.append(call)
# 将结果添加到消息
messages.append({"role": "assistant", "content": response})
- observation = "工具调用结果:\n" + "\n\n".join([
- f"[{r['tool']}]: {r['result']}" for r in tool_results
+ observation = "═══ 检索结果 ═══\n" + "\n\n".join([
+ f"【{r['tool']}】\n{r['result']}" for r in tool_results
])
- messages.append({"role": "user", "content": observation + "\n\n请基于以上信息回答问题。"})
+ messages.append({
+ "role": "user",
+ "content": observation + "\n\n请基于以上模拟数据回答问题。\n【重要】请在回答中引用检索到的原文,使用 > 格式。"
+ })
# 达到最大迭代,获取最终响应
final_response = self.llm.chat(
@@ -910,10 +1146,14 @@ Final Answer:
max_tokens=2048
)
+ # 清理响应
+ clean_response = re.sub(r'.*?', '', final_response, flags=re.DOTALL)
+ clean_response = re.sub(r'\[TOOL_CALL\].*?\)', '', clean_response)
+
return {
- "response": final_response,
+ "response": clean_response.strip(),
"tool_calls": tool_calls_made,
- "sources": []
+ "sources": [tc.get("parameters", {}).get("query", "") for tc in tool_calls_made]
}
diff --git a/backend/app/services/zep_tools.py b/backend/app/services/zep_tools.py
index 166cc97..df101e5 100644
--- a/backend/app/services/zep_tools.py
+++ b/backend/app/services/zep_tools.py
@@ -1,16 +1,23 @@
"""
Zep检索工具服务
封装图谱搜索、节点读取、边查询等工具,供Report Agent使用
+
+核心检索工具(优化后):
+1. InsightForge(深度洞察检索)- 最强大的混合检索,自动生成子问题并多维度检索
+2. PanoramaSearch(广度搜索)- 获取全貌,包括过期内容
+3. QuickSearch(简单搜索)- 快速检索
"""
import time
+import json
from typing import Dict, Any, List, Optional
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from zep_cloud.client import Zep
from ..config import Config
from ..utils.logger import get_logger
+from ..utils.llm_client import LLMClient
logger = get_logger('mirofish.zep_tools')
@@ -79,6 +86,11 @@ class EdgeInfo:
target_node_uuid: str
source_node_name: Optional[str] = None
target_node_name: Optional[str] = None
+ # 时间信息
+ created_at: Optional[str] = None
+ valid_at: Optional[str] = None
+ invalid_at: Optional[str] = None
+ expired_at: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
@@ -88,42 +100,231 @@ class EdgeInfo:
"source_node_uuid": self.source_node_uuid,
"target_node_uuid": self.target_node_uuid,
"source_node_name": self.source_node_name,
- "target_node_name": self.target_node_name
+ "target_node_name": self.target_node_name,
+ "created_at": self.created_at,
+ "valid_at": self.valid_at,
+ "invalid_at": self.invalid_at,
+ "expired_at": self.expired_at
+ }
+
+ def to_text(self, include_temporal: bool = False) -> str:
+ """转换为文本格式"""
+ source = self.source_node_name or self.source_node_uuid[:8]
+ target = self.target_node_name or self.target_node_uuid[:8]
+ base_text = f"关系: {source} --[{self.name}]--> {target}\n事实: {self.fact}"
+
+ if include_temporal:
+ valid_at = self.valid_at or "未知"
+ invalid_at = self.invalid_at or "至今"
+ base_text += f"\n时效: {valid_at} - {invalid_at}"
+ if self.expired_at:
+ base_text += f" (已过期: {self.expired_at})"
+
+ return base_text
+
+ @property
+ def is_expired(self) -> bool:
+ """是否已过期"""
+ return self.expired_at is not None
+
+ @property
+ def is_invalid(self) -> bool:
+ """是否已失效"""
+ return self.invalid_at is not None
+
+
+@dataclass
+class InsightForgeResult:
+ """
+ 深度洞察检索结果 (InsightForge)
+ 包含多个子问题的检索结果,以及综合分析
+ """
+ query: str
+ simulation_requirement: str
+ sub_queries: List[str]
+
+ # 各维度检索结果
+ semantic_facts: List[str] = field(default_factory=list) # 语义搜索结果
+ entity_insights: List[Dict[str, Any]] = field(default_factory=list) # 实体洞察
+ relationship_chains: List[str] = field(default_factory=list) # 关系链
+
+ # 统计信息
+ total_facts: int = 0
+ total_entities: int = 0
+ total_relationships: int = 0
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "query": self.query,
+ "simulation_requirement": self.simulation_requirement,
+ "sub_queries": self.sub_queries,
+ "semantic_facts": self.semantic_facts,
+ "entity_insights": self.entity_insights,
+ "relationship_chains": self.relationship_chains,
+ "total_facts": self.total_facts,
+ "total_entities": self.total_entities,
+ "total_relationships": self.total_relationships
+ }
+
+ def to_text(self) -> str:
+ """转换为详细的文本格式,供LLM理解"""
+ text_parts = [
+ f"## 深度洞察检索结果",
+ f"原始问题: {self.query}",
+ f"模拟需求: {self.simulation_requirement}",
+ f"\n### 检索统计",
+ f"- 相关事实: {self.total_facts}条",
+ f"- 涉及实体: {self.total_entities}个",
+ f"- 关系链: {self.total_relationships}条"
+ ]
+
+ # 子问题
+ if self.sub_queries:
+ text_parts.append(f"\n### 分析的子问题")
+ for i, sq in enumerate(self.sub_queries, 1):
+ text_parts.append(f"{i}. {sq}")
+
+ # 语义搜索结果
+ if self.semantic_facts:
+ text_parts.append(f"\n### 【关键事实】(请在报告中引用这些原文)")
+ for i, fact in enumerate(self.semantic_facts, 1):
+ text_parts.append(f"{i}. \"{fact}\"")
+
+ # 实体洞察
+ if self.entity_insights:
+ text_parts.append(f"\n### 【核心实体】")
+ for entity in self.entity_insights:
+ text_parts.append(f"- **{entity.get('name', '未知')}** ({entity.get('type', '实体')})")
+ if entity.get('summary'):
+ text_parts.append(f" 摘要: \"{entity.get('summary')}\"")
+ if entity.get('related_facts'):
+ text_parts.append(f" 相关事实: {len(entity.get('related_facts', []))}条")
+
+ # 关系链
+ if self.relationship_chains:
+ text_parts.append(f"\n### 【关系链】")
+ for chain in self.relationship_chains:
+ text_parts.append(f"- {chain}")
+
+ return "\n".join(text_parts)
+
+
+@dataclass
+class PanoramaResult:
+ """
+ 广度搜索结果 (Panorama)
+ 包含所有相关信息,包括过期内容
+ """
+ query: str
+
+ # 全部节点
+ all_nodes: List[NodeInfo] = field(default_factory=list)
+ # 全部边(包括过期的)
+ all_edges: List[EdgeInfo] = field(default_factory=list)
+ # 当前有效的事实
+ active_facts: List[str] = field(default_factory=list)
+ # 已过期/失效的事实(历史记录)
+ historical_facts: List[str] = field(default_factory=list)
+
+ # 统计
+ total_nodes: int = 0
+ total_edges: int = 0
+ active_count: int = 0
+ historical_count: int = 0
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "query": self.query,
+ "all_nodes": [n.to_dict() for n in self.all_nodes],
+ "all_edges": [e.to_dict() for e in self.all_edges],
+ "active_facts": self.active_facts,
+ "historical_facts": self.historical_facts,
+ "total_nodes": self.total_nodes,
+ "total_edges": self.total_edges,
+ "active_count": self.active_count,
+ "historical_count": self.historical_count
}
def to_text(self) -> str:
"""转换为文本格式"""
- source = self.source_node_name or self.source_node_uuid[:8]
- target = self.target_node_name or self.target_node_uuid[:8]
- return f"关系: {source} --[{self.name}]--> {target}\n事实: {self.fact}"
+ text_parts = [
+ f"## 广度搜索结果(全貌视图)",
+ f"查询: {self.query}",
+ f"\n### 统计信息",
+ f"- 总节点数: {self.total_nodes}",
+ f"- 总边数: {self.total_edges}",
+ f"- 当前有效事实: {self.active_count}条",
+ f"- 历史/过期事实: {self.historical_count}条"
+ ]
+
+ # 当前有效的事实
+ if self.active_facts:
+ text_parts.append(f"\n### 【当前有效事实】(模拟结果原文)")
+ for i, fact in enumerate(self.active_facts[:30], 1):
+ text_parts.append(f"{i}. \"{fact}\"")
+ if len(self.active_facts) > 30:
+ text_parts.append(f"... 还有 {len(self.active_facts) - 30} 条")
+
+ # 历史/过期事实
+ if self.historical_facts:
+ text_parts.append(f"\n### 【历史/过期事实】(演变过程记录)")
+ for i, fact in enumerate(self.historical_facts[:20], 1):
+ text_parts.append(f"{i}. \"{fact}\"")
+ if len(self.historical_facts) > 20:
+ text_parts.append(f"... 还有 {len(self.historical_facts) - 20} 条")
+
+ # 关键实体
+ if self.all_nodes:
+ text_parts.append(f"\n### 【涉及实体】")
+ for node in self.all_nodes[:20]:
+ entity_type = next((l for l in node.labels if l not in ["Entity", "Node"]), "实体")
+ text_parts.append(f"- **{node.name}** ({entity_type})")
+ if len(self.all_nodes) > 20:
+ text_parts.append(f"... 还有 {len(self.all_nodes) - 20} 个实体")
+
+ return "\n".join(text_parts)
class ZepToolsService:
"""
Zep检索工具服务
- 提供多种图谱检索工具,可以被Report Agent调用:
- 1. search_graph - 图谱语义搜索
- 2. get_all_nodes - 获取图谱所有节点
- 3. get_all_edges - 获取图谱所有边
- 4. get_node_detail - 获取节点详细信息
- 5. get_node_edges - 获取节点相关的边
- 6. get_entities_by_type - 按类型获取实体
- 7. get_entity_summary - 获取实体的关系摘要
+ 【核心检索工具 - 优化后】
+ 1. insight_forge - 深度洞察检索(最强大,自动生成子问题,多维度检索)
+ 2. panorama_search - 广度搜索(获取全貌,包括过期内容)
+ 3. quick_search - 简单搜索(快速检索)
+
+ 【基础工具】
+ - search_graph - 图谱语义搜索
+ - get_all_nodes - 获取图谱所有节点
+ - get_all_edges - 获取图谱所有边(含时间信息)
+ - get_node_detail - 获取节点详细信息
+ - get_node_edges - 获取节点相关的边
+ - get_entities_by_type - 按类型获取实体
+ - get_entity_summary - 获取实体的关系摘要
"""
# 重试配置
MAX_RETRIES = 3
RETRY_DELAY = 2.0
- def __init__(self, api_key: Optional[str] = None):
+ def __init__(self, api_key: Optional[str] = None, llm_client: Optional[LLMClient] = None):
self.api_key = api_key or Config.ZEP_API_KEY
if not self.api_key:
raise ValueError("ZEP_API_KEY 未配置")
self.client = Zep(api_key=self.api_key)
+ # LLM客户端用于InsightForge生成子问题
+ self._llm_client = llm_client
logger.info("ZepToolsService 初始化完成")
+ @property
+ def llm(self) -> LLMClient:
+ """延迟初始化LLM客户端"""
+ if self._llm_client is None:
+ self._llm_client = LLMClient()
+ return self._llm_client
+
def _call_with_retry(self, func, operation_name: str, max_retries: int = None):
"""带重试机制的API调用"""
max_retries = max_retries or self.MAX_RETRIES
@@ -363,15 +564,16 @@ class ZepToolsService:
logger.info(f"获取到 {len(result)} 个节点")
return result
- def get_all_edges(self, graph_id: str) -> List[EdgeInfo]:
+ def get_all_edges(self, graph_id: str, include_temporal: bool = True) -> List[EdgeInfo]:
"""
- 获取图谱的所有边
+ 获取图谱的所有边(包含时间信息)
Args:
graph_id: 图谱ID
+ include_temporal: 是否包含时间信息(默认True)
Returns:
- 边列表
+ 边列表(包含created_at, valid_at, invalid_at, expired_at)
"""
logger.info(f"获取图谱 {graph_id} 的所有边...")
@@ -382,13 +584,22 @@ class ZepToolsService:
result = []
for edge in edges:
- result.append(EdgeInfo(
+ edge_info = EdgeInfo(
uuid=getattr(edge, 'uuid_', None) or getattr(edge, 'uuid', ''),
name=edge.name or "",
fact=edge.fact or "",
source_node_uuid=edge.source_node_uuid or "",
target_node_uuid=edge.target_node_uuid or ""
- ))
+ )
+
+ # 添加时间信息
+ if include_temporal:
+ edge_info.created_at = getattr(edge, 'created_at', None)
+ edge_info.valid_at = getattr(edge, 'valid_at', None)
+ edge_info.invalid_at = getattr(edge, 'invalid_at', None)
+ edge_info.expired_at = getattr(edge, 'expired_at', None)
+
+ result.append(edge_info)
logger.info(f"获取到 {len(result)} 条边")
return result
@@ -619,3 +830,322 @@ class ZepToolsService:
"entities": entities[:limit], # 限制数量
"total_entities": len(entities)
}
+
+ # ========== 核心检索工具(优化后) ==========
+
+ def insight_forge(
+ self,
+ graph_id: str,
+ query: str,
+ simulation_requirement: str,
+ report_context: str = "",
+ max_sub_queries: int = 5
+ ) -> InsightForgeResult:
+ """
+ 【InsightForge - 深度洞察检索】
+
+ 最强大的混合检索函数,自动分解问题并多维度检索:
+ 1. 使用LLM将问题分解为多个子问题
+ 2. 对每个子问题进行语义搜索
+ 3. 提取相关实体并获取其详细信息
+ 4. 追踪关系链
+ 5. 整合所有结果,生成深度洞察
+
+ Args:
+ graph_id: 图谱ID
+ query: 用户问题
+ simulation_requirement: 模拟需求描述
+ report_context: 报告上下文(可选,用于更精准的子问题生成)
+ max_sub_queries: 最大子问题数量
+
+ Returns:
+ InsightForgeResult: 深度洞察检索结果
+ """
+ logger.info(f"InsightForge 深度洞察检索: {query[:50]}...")
+
+ result = InsightForgeResult(
+ query=query,
+ simulation_requirement=simulation_requirement,
+ sub_queries=[]
+ )
+
+ # Step 1: 使用LLM生成子问题
+ sub_queries = self._generate_sub_queries(
+ query=query,
+ simulation_requirement=simulation_requirement,
+ report_context=report_context,
+ max_queries=max_sub_queries
+ )
+ result.sub_queries = sub_queries
+ logger.info(f"生成 {len(sub_queries)} 个子问题")
+
+ # Step 2: 对每个子问题进行语义搜索
+ all_facts = []
+ all_edges = []
+ seen_facts = set()
+
+ for sub_query in sub_queries:
+ search_result = self.search_graph(
+ graph_id=graph_id,
+ query=sub_query,
+ limit=15,
+ scope="edges"
+ )
+
+ for fact in search_result.facts:
+ if fact not in seen_facts:
+ all_facts.append(fact)
+ seen_facts.add(fact)
+
+ all_edges.extend(search_result.edges)
+
+ # 对原始问题也进行搜索
+ main_search = self.search_graph(
+ graph_id=graph_id,
+ query=query,
+ limit=20,
+ scope="edges"
+ )
+ for fact in main_search.facts:
+ if fact not in seen_facts:
+ all_facts.append(fact)
+ seen_facts.add(fact)
+
+ result.semantic_facts = all_facts
+ result.total_facts = len(all_facts)
+
+ # Step 3: 提取相关实体并获取详细信息
+ all_nodes = self.get_all_nodes(graph_id)
+ node_map = {n.uuid: n for n in all_nodes}
+
+ # 从边中提取涉及的实体
+ entity_uuids = set()
+ for edge_data in all_edges:
+ if isinstance(edge_data, dict):
+ entity_uuids.add(edge_data.get('source_node_uuid', ''))
+ entity_uuids.add(edge_data.get('target_node_uuid', ''))
+
+ # 获取实体详情
+ entity_insights = []
+ for uuid in list(entity_uuids)[:30]: # 限制数量
+ if uuid in node_map:
+ node = node_map[uuid]
+ entity_type = next((l for l in node.labels if l not in ["Entity", "Node"]), "实体")
+
+ # 获取该实体相关的事实
+ related_facts = [
+ f for f in all_facts
+ if node.name.lower() in f.lower()
+ ]
+
+ entity_insights.append({
+ "uuid": node.uuid,
+ "name": node.name,
+ "type": entity_type,
+ "summary": node.summary,
+ "related_facts": related_facts[:5]
+ })
+
+ result.entity_insights = entity_insights
+ result.total_entities = len(entity_insights)
+
+ # Step 4: 构建关系链
+ relationship_chains = []
+ for edge_data in all_edges[:20]:
+ if isinstance(edge_data, dict):
+ source_uuid = edge_data.get('source_node_uuid', '')
+ target_uuid = edge_data.get('target_node_uuid', '')
+ relation_name = edge_data.get('name', '')
+
+ source_name = node_map.get(source_uuid, NodeInfo('', '', [], '', {})).name or source_uuid[:8]
+ target_name = node_map.get(target_uuid, NodeInfo('', '', [], '', {})).name or target_uuid[:8]
+
+ chain = f"{source_name} --[{relation_name}]--> {target_name}"
+ if chain not in relationship_chains:
+ relationship_chains.append(chain)
+
+ result.relationship_chains = relationship_chains
+ result.total_relationships = len(relationship_chains)
+
+ logger.info(f"InsightForge完成: {result.total_facts}条事实, {result.total_entities}个实体, {result.total_relationships}条关系")
+ return result
+
+ def _generate_sub_queries(
+ self,
+ query: str,
+ simulation_requirement: str,
+ report_context: str = "",
+ max_queries: int = 5
+ ) -> List[str]:
+ """
+ 使用LLM生成子问题
+
+ 将复杂问题分解为多个可以独立检索的子问题
+ """
+ system_prompt = """你是一个专业的问题分析专家。你的任务是将一个复杂问题分解为多个可以独立检索的子问题。
+
+要求:
+1. 每个子问题应该足够具体,可以在知识图谱中检索到相关信息
+2. 子问题应该覆盖原问题的不同维度(如:谁、什么、为什么、怎么样、何时、何地)
+3. 子问题应该与模拟场景相关
+4. 返回JSON格式:{"sub_queries": ["子问题1", "子问题2", ...]}"""
+
+ user_prompt = f"""模拟需求背景:
+{simulation_requirement}
+
+{f"报告上下文:{report_context[:500]}" if report_context else ""}
+
+请将以下问题分解为{max_queries}个子问题:
+{query}
+
+返回JSON格式的子问题列表。"""
+
+ try:
+ response = self.llm.chat_json(
+ messages=[
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_prompt}
+ ],
+ temperature=0.3
+ )
+
+ sub_queries = response.get("sub_queries", [])
+ # 确保是字符串列表
+ return [str(sq) for sq in sub_queries[:max_queries]]
+
+ except Exception as e:
+ logger.warning(f"生成子问题失败: {str(e)},使用默认子问题")
+ # 降级:返回基于原问题的变体
+ return [
+ query,
+ f"{query} 的主要参与者",
+ f"{query} 的原因和影响",
+ f"{query} 的发展过程"
+ ][:max_queries]
+
+ def panorama_search(
+ self,
+ graph_id: str,
+ query: str,
+ include_expired: bool = True,
+ limit: int = 50
+ ) -> PanoramaResult:
+ """
+ 【PanoramaSearch - 广度搜索】
+
+ 获取全貌视图,包括所有相关内容和历史/过期信息:
+ 1. 获取所有相关节点
+ 2. 获取所有边(包括已过期/失效的)
+ 3. 分类整理当前有效和历史信息
+
+ 这个工具适用于需要了解事件全貌、追踪演变过程的场景。
+
+ Args:
+ graph_id: 图谱ID
+ query: 搜索查询(用于相关性排序)
+ include_expired: 是否包含过期内容(默认True)
+ limit: 返回结果数量限制
+
+ Returns:
+ PanoramaResult: 广度搜索结果
+ """
+ logger.info(f"PanoramaSearch 广度搜索: {query[:50]}...")
+
+ result = PanoramaResult(query=query)
+
+ # 获取所有节点
+ all_nodes = self.get_all_nodes(graph_id)
+ node_map = {n.uuid: n for n in all_nodes}
+ result.all_nodes = all_nodes
+ result.total_nodes = len(all_nodes)
+
+ # 获取所有边(包含时间信息)
+ all_edges = self.get_all_edges(graph_id, include_temporal=True)
+ result.all_edges = all_edges
+ result.total_edges = len(all_edges)
+
+ # 分类事实
+ active_facts = []
+ historical_facts = []
+
+ for edge in all_edges:
+ if not edge.fact:
+ continue
+
+ # 为事实添加实体名称
+ source_name = node_map.get(edge.source_node_uuid, NodeInfo('', '', [], '', {})).name or edge.source_node_uuid[:8]
+ target_name = node_map.get(edge.target_node_uuid, NodeInfo('', '', [], '', {})).name or edge.target_node_uuid[:8]
+
+ # 判断是否过期/失效
+ is_historical = edge.is_expired or edge.is_invalid
+
+ if is_historical:
+ # 历史/过期事实,添加时间标记
+ valid_at = edge.valid_at or "未知"
+ invalid_at = edge.invalid_at or edge.expired_at or "未知"
+ fact_with_time = f"[{valid_at} - {invalid_at}] {edge.fact}"
+ historical_facts.append(fact_with_time)
+ else:
+ # 当前有效事实
+ active_facts.append(edge.fact)
+
+ # 基于查询进行相关性排序
+ query_lower = query.lower()
+ keywords = [w.strip() for w in query_lower.replace(',', ' ').replace(',', ' ').split() if len(w.strip()) > 1]
+
+ def relevance_score(fact: str) -> int:
+ fact_lower = fact.lower()
+ score = 0
+ if query_lower in fact_lower:
+ score += 100
+ for kw in keywords:
+ if kw in fact_lower:
+ score += 10
+ return score
+
+ # 排序并限制数量
+ active_facts.sort(key=relevance_score, reverse=True)
+ historical_facts.sort(key=relevance_score, reverse=True)
+
+ result.active_facts = active_facts[:limit]
+ result.historical_facts = historical_facts[:limit] if include_expired else []
+ result.active_count = len(active_facts)
+ result.historical_count = len(historical_facts)
+
+ logger.info(f"PanoramaSearch完成: {result.active_count}条有效, {result.historical_count}条历史")
+ return result
+
+ def quick_search(
+ self,
+ graph_id: str,
+ query: str,
+ limit: int = 10
+ ) -> SearchResult:
+ """
+ 【QuickSearch - 简单搜索】
+
+ 快速、轻量级的检索工具:
+ 1. 直接调用Zep语义搜索
+ 2. 返回最相关的结果
+ 3. 适用于简单、直接的检索需求
+
+ Args:
+ graph_id: 图谱ID
+ query: 搜索查询
+ limit: 返回结果数量
+
+ Returns:
+ SearchResult: 搜索结果
+ """
+ logger.info(f"QuickSearch 简单搜索: {query[:50]}...")
+
+ # 直接调用现有的search_graph方法
+ result = self.search_graph(
+ graph_id=graph_id,
+ query=query,
+ limit=limit,
+ scope="edges"
+ )
+
+ logger.info(f"QuickSearch完成: {result.total_count}条结果")
+ return result