From 08f417f3b7274de25ac01a5cb878a14ce40b804c Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Fri, 28 Nov 2025 17:21:08 +0800 Subject: [PATCH] Introduce Project ID for context management, finalizing the stateful API pipeline from file submission to graph construction. --- .env.example | 8 + .gitignore | 10 +- backend/README.md | 243 ++++++++ backend/app/__init__.py | 52 ++ backend/app/api/__init__.py | 10 + backend/app/api/graph.py | 657 +++++++++++++++++++++ backend/app/config.py | 53 ++ backend/app/models/__init__.py | 9 + backend/app/models/project.py | 305 ++++++++++ backend/app/models/task.py | 178 ++++++ backend/app/services/__init__.py | 10 + backend/app/services/graph_builder.py | 457 ++++++++++++++ backend/app/services/ontology_generator.py | 361 +++++++++++ backend/app/services/text_processor.py | 71 +++ backend/app/utils/__init__.py | 9 + backend/app/utils/file_parser.py | 141 +++++ backend/app/utils/llm_client.py | 91 +++ backend/app/utils/logger.py | 107 ++++ backend/requirements.txt | 22 + backend/run.py | 57 ++ 20 files changed, 2850 insertions(+), 1 deletion(-) create mode 100644 .env.example create mode 100644 backend/README.md create mode 100644 backend/app/__init__.py create mode 100644 backend/app/api/__init__.py create mode 100644 backend/app/api/graph.py create mode 100644 backend/app/config.py create mode 100644 backend/app/models/__init__.py create mode 100644 backend/app/models/project.py create mode 100644 backend/app/models/task.py create mode 100644 backend/app/services/__init__.py create mode 100644 backend/app/services/graph_builder.py create mode 100644 backend/app/services/ontology_generator.py create mode 100644 backend/app/services/text_processor.py create mode 100644 backend/app/utils/__init__.py create mode 100644 backend/app/utils/file_parser.py create mode 100644 backend/app/utils/llm_client.py create mode 100644 backend/app/utils/logger.py create mode 100644 backend/requirements.txt create mode 100644 backend/run.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..86d3783 --- /dev/null +++ b/.env.example @@ -0,0 +1,8 @@ +# Zep Cloud API Key +# 从 https://app.getzep.com 获取 +ZEP_API_KEY= + +# LLM API Key +LLM_API_KEY= +LLM_BASE_URL= +LLM_MODEL_NAME= \ No newline at end of file diff --git a/.gitignore b/.gitignore index 009b597..951e75e 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ .env.development.local .env.test.local .env.production.local +__pycache__/ .vscode .idea @@ -19,4 +20,11 @@ .cursor/ mydoc/ -mytest/ \ No newline at end of file +mytest/ + +# 日志文件 +backend/logs/ +*.log + +# 上传文件 +backend/uploads/ \ No newline at end of file diff --git a/backend/README.md b/backend/README.md new file mode 100644 index 0000000..cd5fac0 --- /dev/null +++ b/backend/README.md @@ -0,0 +1,243 @@ +# MiroFish Backend + +社会舆论模拟系统后端服务,基于Flask框架。 + +## 项目结构 + +``` +backend/ +├── app/ +│ ├── __init__.py # Flask应用工厂 +│ ├── config.py # 配置管理 +│ ├── api/ # API路由 +│ │ ├── __init__.py +│ │ └── graph.py # 图谱相关接口 +│ ├── services/ # 业务逻辑层 +│ │ ├── ontology_generator.py # 本体生成服务 +│ │ ├── graph_builder.py # 图谱构建服务 +│ │ └── text_processor.py # 文本处理服务 +│ ├── models/ # 数据模型 +│ │ ├── task.py # 任务状态管理 +│ │ └── project.py # 项目上下文管理 +│ └── utils/ # 工具模块 +│ ├── file_parser.py # 文件解析 +│ └── llm_client.py # LLM客户端 +├── requirements.txt +└── run.py # 启动入口 +``` + +## 安装 + +```bash +conda activate MiroFish +cd backend +pip install -r requirements.txt +``` + +## 配置 + +在项目根目录 `MiroFish/.env` 中配置: + +```bash +# LLM配置 +LLM_API_KEY=your-llm-api-key +LLM_BASE_URL=https://openrouter.ai/api/v1 +LLM_MODEL_NAME=gpt-4o-mini + +# Zep配置 +ZEP_API_KEY=your-zep-api-key +``` + +## 启动服务 + +```bash +python run.py +``` + +服务默认运行在 http://localhost:5000 + +--- + +## API接口 + +### 核心工作流程 + +``` +1. 创建项目(可选) + POST /api/graph/project/create + +2. 上传文件 + 生成本体(接口1) + POST /api/graph/ontology/generate + → 返回 project_id + +3. 构建图谱(接口2) + POST /api/graph/build + → 传入 project_id + → 返回 task_id + +4. 查询任务进度 + GET /api/graph/task/{task_id} + +5. 获取图谱数据 + GET /api/graph/data/{graph_id} +``` + +--- + +### 接口1:生成本体定义 + +**POST** `/api/graph/ontology/generate` + +上传文档,分析生成适合社会模拟的实体和关系类型定义。 + +**请求(form-data):** + +| 字段 | 类型 | 必填 | 说明 | +|------|------|------|------| +| `files` | File | ✅ | PDF/MD/TXT文件,可多个 | +| `simulation_requirement` | Text | ✅ | 模拟需求描述 | +| `project_name` | Text | ❌ | 项目名称 | +| `additional_context` | Text | ❌ | 额外说明 | + +**响应示例:** +```json +{ + "success": true, + "data": { + "project_id": "proj_abc123def456", + "project_name": "武汉大学舆情分析", + "ontology": { + "entity_types": [ + { + "name": "Person", + "description": "Individuals who can express opinions", + "attributes": [...] + } + ], + "edge_types": [ + { + "name": "AFFILIATED_WITH", + "description": "Indicates affiliation", + "source_targets": [...] + } + ] + }, + "analysis_summary": "分析说明...", + "files": [ + {"filename": "报告.pdf", "size": 123456} + ], + "total_text_length": 20833 + } +} +``` + +--- + +### 接口2:构建图谱 + +**POST** `/api/graph/build` + +根据 `project_id` 构建Zep知识图谱(异步任务)。 + +**请求(JSON):** +```json +{ + "project_id": "proj_abc123def456", + "graph_name": "图谱名称", + "chunk_size": 500, + "chunk_overlap": 50 +} +``` + +| 字段 | 类型 | 必填 | 说明 | +|------|------|------|------| +| `project_id` | string | ✅ | 来自接口1的返回 | +| `graph_name` | string | ❌ | 图谱名称 | +| `chunk_size` | int | ❌ | 文本块大小,默认500 | +| `chunk_overlap` | int | ❌ | 块重叠字符,默认50 | + +**响应:** +```json +{ + "success": true, + "data": { + "project_id": "proj_abc123def456", + "task_id": "task_xyz789", + "message": "图谱构建任务已启动" + } +} +``` + +--- + +### 任务状态查询 + +**GET** `/api/graph/task/{task_id}` + +```json +{ + "success": true, + "data": { + "task_id": "task_xyz789", + "status": "processing", + "progress": 45, + "message": "添加文本块 (15/30)...", + "result": null + } +} +``` + +**状态值:** +- `pending` - 等待中 +- `processing` - 处理中 +- `completed` - 已完成 +- `failed` - 失败 + +--- + +### 项目管理接口 + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/graph/project/create` | 创建项目 | +| GET | `/api/graph/project/{project_id}` | 获取项目详情 | +| GET | `/api/graph/project/list` | 列出所有项目 | +| DELETE | `/api/graph/project/{project_id}` | 删除项目 | + +--- + +### 图谱数据接口 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/graph/data/{graph_id}` | 获取图谱节点和边 | +| DELETE | `/api/graph/delete/{graph_id}` | 删除Zep图谱 | + +--- + +## 实体设计原则 + +本系统专为社会舆论模拟设计,实体必须是: + +**✅ 可以是:** +- 具体的个人(有名有姓) +- 注册的公司、组织、机构 +- 媒体机构 +- 政府部门 + +**❌ 不可以是:** +- 抽象概念(如"技术"、"创新") +- 情绪、观点、趋势 +- 泛指的群体(如"用户"、"消费者") + +这是因为后续需要模拟各实体对舆论的反应和传播,抽象概念无法参与这种模拟。 + +--- + +## 项目状态流转 + +``` +created → ontology_generated → graph_building → graph_completed + ↓ + failed +``` diff --git a/backend/app/__init__.py b/backend/app/__init__.py new file mode 100644 index 0000000..a46d2a2 --- /dev/null +++ b/backend/app/__init__.py @@ -0,0 +1,52 @@ +""" +MiroFish Backend - Flask应用工厂 +""" + +from flask import Flask, request +from flask_cors import CORS + +from .config import Config +from .utils.logger import setup_logger, get_logger + + +def create_app(config_class=Config): + """Flask应用工厂函数""" + app = Flask(__name__) + app.config.from_object(config_class) + + # 设置日志 + logger = setup_logger('mirofish') + logger.info("=" * 50) + logger.info("MiroFish Backend 启动中...") + logger.info("=" * 50) + + # 启用CORS + CORS(app, resources={r"/api/*": {"origins": "*"}}) + + # 请求日志中间件 + @app.before_request + def log_request(): + logger = get_logger('mirofish.request') + logger.debug(f"请求: {request.method} {request.path}") + if request.content_type and 'json' in request.content_type: + logger.debug(f"请求体: {request.get_json(silent=True)}") + + @app.after_request + def log_response(response): + logger = get_logger('mirofish.request') + logger.debug(f"响应: {response.status_code}") + return response + + # 注册蓝图 + from .api import graph_bp + app.register_blueprint(graph_bp, url_prefix='/api/graph') + + # 健康检查 + @app.route('/health') + def health(): + return {'status': 'ok', 'service': 'MiroFish Backend'} + + logger.info("MiroFish Backend 启动完成") + + return app + diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py new file mode 100644 index 0000000..9723f92 --- /dev/null +++ b/backend/app/api/__init__.py @@ -0,0 +1,10 @@ +""" +API路由模块 +""" + +from flask import Blueprint + +graph_bp = Blueprint('graph', __name__) + +from . import graph # noqa: E402, F401 + diff --git a/backend/app/api/graph.py b/backend/app/api/graph.py new file mode 100644 index 0000000..0e2b0c9 --- /dev/null +++ b/backend/app/api/graph.py @@ -0,0 +1,657 @@ +""" +图谱相关API路由 +采用项目上下文机制,服务端持久化状态 +""" + +import os +import traceback +import threading +from flask import request, jsonify + +from . import graph_bp +from ..config import Config +from ..services.ontology_generator import OntologyGenerator +from ..services.graph_builder import GraphBuilderService +from ..services.text_processor import TextProcessor +from ..utils.file_parser import FileParser +from ..utils.logger import get_logger +from ..models.task import TaskManager, TaskStatus +from ..models.project import ProjectManager, ProjectStatus + +# 获取日志器 +logger = get_logger('mirofish.api') + + +def allowed_file(filename: str) -> bool: + """检查文件扩展名是否允许""" + if not filename or '.' not in filename: + return False + ext = os.path.splitext(filename)[1].lower().lstrip('.') + return ext in Config.ALLOWED_EXTENSIONS + + +# ============== 项目管理接口 ============== + +@graph_bp.route('/project/create', methods=['POST']) +def create_project(): + """ + 创建新项目 + + 请求(JSON): + { + "name": "项目名称" // 可选 + } + + 返回: + { + "success": true, + "data": { + "project_id": "proj_xxxx", + "name": "...", + "status": "created", + ... + } + } + """ + try: + data = request.get_json() or {} + name = data.get('name', 'Unnamed Project') + + project = ProjectManager.create_project(name=name) + + return jsonify({ + "success": True, + "data": project.to_dict() + }) + + except Exception as e: + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@graph_bp.route('/project/', methods=['GET']) +def get_project(project_id: str): + """ + 获取项目详情 + """ + project = ProjectManager.get_project(project_id) + + if not project: + return jsonify({ + "success": False, + "error": f"项目不存在: {project_id}" + }), 404 + + return jsonify({ + "success": True, + "data": project.to_dict() + }) + + +@graph_bp.route('/project/list', methods=['GET']) +def list_projects(): + """ + 列出所有项目 + """ + limit = request.args.get('limit', 50, type=int) + projects = ProjectManager.list_projects(limit=limit) + + return jsonify({ + "success": True, + "data": [p.to_dict() for p in projects], + "count": len(projects) + }) + + +@graph_bp.route('/project/', methods=['DELETE']) +def delete_project(project_id: str): + """ + 删除项目 + """ + success = ProjectManager.delete_project(project_id) + + if not success: + return jsonify({ + "success": False, + "error": f"项目不存在或删除失败: {project_id}" + }), 404 + + return jsonify({ + "success": True, + "message": f"项目已删除: {project_id}" + }) + + +@graph_bp.route('/project//reset', methods=['POST']) +def reset_project(project_id: str): + """ + 重置项目状态(用于重新构建图谱) + """ + project = ProjectManager.get_project(project_id) + + if not project: + return jsonify({ + "success": False, + "error": f"项目不存在: {project_id}" + }), 404 + + # 重置到本体已生成状态 + if project.ontology: + project.status = ProjectStatus.ONTOLOGY_GENERATED + else: + project.status = ProjectStatus.CREATED + + project.graph_id = None + project.graph_build_task_id = None + project.error = None + ProjectManager.save_project(project) + + return jsonify({ + "success": True, + "message": f"项目已重置: {project_id}", + "data": project.to_dict() + }) + + +# ============== 接口1:上传文件并生成本体 ============== + +@graph_bp.route('/ontology/generate', methods=['POST']) +def generate_ontology(): + """ + 接口1:上传文件,分析生成本体定义 + + 请求方式:multipart/form-data + + 参数: + files: 上传的文件(PDF/MD/TXT),可多个 + simulation_requirement: 模拟需求描述(必填) + project_name: 项目名称(可选) + additional_context: 额外说明(可选) + + 返回: + { + "success": true, + "data": { + "project_id": "proj_xxxx", + "ontology": { + "entity_types": [...], + "edge_types": [...], + "analysis_summary": "..." + }, + "files": [...], + "total_text_length": 12345 + } + } + """ + try: + logger.info("=== 开始生成本体定义 ===") + + # 获取参数 + simulation_requirement = request.form.get('simulation_requirement', '') + project_name = request.form.get('project_name', 'Unnamed Project') + additional_context = request.form.get('additional_context', '') + + logger.debug(f"项目名称: {project_name}") + logger.debug(f"模拟需求: {simulation_requirement[:100]}...") + + if not simulation_requirement: + return jsonify({ + "success": False, + "error": "请提供模拟需求描述 (simulation_requirement)" + }), 400 + + # 获取上传的文件 + uploaded_files = request.files.getlist('files') + if not uploaded_files or all(not f.filename for f in uploaded_files): + return jsonify({ + "success": False, + "error": "请至少上传一个文档文件" + }), 400 + + # 创建项目 + project = ProjectManager.create_project(name=project_name) + project.simulation_requirement = simulation_requirement + logger.info(f"创建项目: {project.project_id}") + + # 保存文件并提取文本 + document_texts = [] + all_text = "" + + for file in uploaded_files: + if file and file.filename and allowed_file(file.filename): + # 保存文件到项目目录 + file_info = ProjectManager.save_file_to_project( + project.project_id, + file, + file.filename + ) + project.files.append({ + "filename": file_info["original_filename"], + "size": file_info["size"] + }) + + # 提取文本 + text = FileParser.extract_text(file_info["path"]) + text = TextProcessor.preprocess_text(text) + document_texts.append(text) + all_text += f"\n\n=== {file_info['original_filename']} ===\n{text}" + + if not document_texts: + ProjectManager.delete_project(project.project_id) + return jsonify({ + "success": False, + "error": "没有成功处理任何文档,请检查文件格式" + }), 400 + + # 保存提取的文本 + project.total_text_length = len(all_text) + ProjectManager.save_extracted_text(project.project_id, all_text) + logger.info(f"文本提取完成,共 {len(all_text)} 字符") + + # 生成本体 + logger.info("调用 LLM 生成本体定义...") + generator = OntologyGenerator() + ontology = generator.generate( + document_texts=document_texts, + simulation_requirement=simulation_requirement, + additional_context=additional_context if additional_context else None + ) + + # 保存本体到项目 + entity_count = len(ontology.get("entity_types", [])) + edge_count = len(ontology.get("edge_types", [])) + logger.info(f"本体生成完成: {entity_count} 个实体类型, {edge_count} 个关系类型") + + project.ontology = { + "entity_types": ontology.get("entity_types", []), + "edge_types": ontology.get("edge_types", []) + } + project.analysis_summary = ontology.get("analysis_summary", "") + project.status = ProjectStatus.ONTOLOGY_GENERATED + ProjectManager.save_project(project) + logger.info(f"=== 本体生成完成 === 项目ID: {project.project_id}") + + return jsonify({ + "success": True, + "data": { + "project_id": project.project_id, + "project_name": project.name, + "ontology": project.ontology, + "analysis_summary": project.analysis_summary, + "files": project.files, + "total_text_length": project.total_text_length + } + }) + + except Exception as e: + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ============== 接口2:构建图谱 ============== + +@graph_bp.route('/build', methods=['POST']) +def build_graph(): + """ + 接口2:根据project_id构建图谱 + + 请求(JSON): + { + "project_id": "proj_xxxx", // 必填,来自接口1 + "graph_name": "图谱名称", // 可选 + "chunk_size": 500, // 可选,默认500 + "chunk_overlap": 50 // 可选,默认50 + } + + 返回: + { + "success": true, + "data": { + "project_id": "proj_xxxx", + "task_id": "task_xxxx", + "message": "图谱构建任务已启动" + } + } + """ + try: + logger.info("=== 开始构建图谱 ===") + + # 检查配置 + errors = [] + if not Config.ZEP_API_KEY: + errors.append("ZEP_API_KEY未配置") + if errors: + logger.error(f"配置错误: {errors}") + return jsonify({ + "success": False, + "error": "配置错误: " + "; ".join(errors) + }), 500 + + # 解析请求 + data = request.get_json() or {} + project_id = data.get('project_id') + logger.debug(f"请求参数: project_id={project_id}") + + if not project_id: + return jsonify({ + "success": False, + "error": "请提供 project_id" + }), 400 + + # 获取项目 + project = ProjectManager.get_project(project_id) + if not project: + return jsonify({ + "success": False, + "error": f"项目不存在: {project_id}" + }), 404 + + # 检查项目状态 + force = data.get('force', False) # 强制重新构建 + + if project.status == ProjectStatus.CREATED: + return jsonify({ + "success": False, + "error": "项目尚未生成本体,请先调用 /ontology/generate" + }), 400 + + if project.status == ProjectStatus.GRAPH_BUILDING and not force: + return jsonify({ + "success": False, + "error": "图谱正在构建中,请勿重复提交。如需强制重建,请添加 force: true", + "task_id": project.graph_build_task_id + }), 400 + + # 如果强制重建,重置状态 + if force and project.status in [ProjectStatus.GRAPH_BUILDING, ProjectStatus.FAILED, ProjectStatus.GRAPH_COMPLETED]: + project.status = ProjectStatus.ONTOLOGY_GENERATED + project.graph_id = None + project.graph_build_task_id = None + project.error = None + + # 获取配置 + graph_name = data.get('graph_name', project.name or 'MiroFish Graph') + chunk_size = data.get('chunk_size', project.chunk_size or Config.DEFAULT_CHUNK_SIZE) + chunk_overlap = data.get('chunk_overlap', project.chunk_overlap or Config.DEFAULT_CHUNK_OVERLAP) + + # 更新项目配置 + project.chunk_size = chunk_size + project.chunk_overlap = chunk_overlap + + # 获取提取的文本 + text = ProjectManager.get_extracted_text(project_id) + if not text: + return jsonify({ + "success": False, + "error": "未找到提取的文本内容" + }), 400 + + # 获取本体 + ontology = project.ontology + if not ontology: + return jsonify({ + "success": False, + "error": "未找到本体定义" + }), 400 + + # 创建异步任务 + task_manager = TaskManager() + task_id = task_manager.create_task(f"构建图谱: {graph_name}") + logger.info(f"创建图谱构建任务: task_id={task_id}, project_id={project_id}") + + # 更新项目状态 + project.status = ProjectStatus.GRAPH_BUILDING + project.graph_build_task_id = task_id + ProjectManager.save_project(project) + + # 启动后台任务 + def build_task(): + build_logger = get_logger('mirofish.build') + try: + build_logger.info(f"[{task_id}] 开始构建图谱...") + task_manager.update_task( + task_id, + status=TaskStatus.PROCESSING, + message="初始化图谱构建服务..." + ) + + # 创建图谱构建服务 + builder = GraphBuilderService(api_key=Config.ZEP_API_KEY) + + # 分块 + task_manager.update_task( + task_id, + message="文本分块中...", + progress=5 + ) + chunks = TextProcessor.split_text( + text, + chunk_size=chunk_size, + overlap=chunk_overlap + ) + total_chunks = len(chunks) + + # 创建图谱 + task_manager.update_task( + task_id, + message="创建Zep图谱...", + progress=10 + ) + graph_id = builder.create_graph(name=graph_name) + + # 更新项目的graph_id + project.graph_id = graph_id + ProjectManager.save_project(project) + + # 设置本体 + task_manager.update_task( + task_id, + message="设置本体定义...", + progress=15 + ) + builder.set_ontology(graph_id, ontology) + + # 添加文本(progress_callback 签名是 (msg, progress_ratio)) + def add_progress_callback(msg, progress_ratio): + progress = 15 + int(progress_ratio * 40) # 15% - 55% + task_manager.update_task( + task_id, + message=msg, + progress=progress + ) + + task_manager.update_task( + task_id, + message=f"开始添加 {total_chunks} 个文本块...", + progress=15 + ) + + episode_uuids = builder.add_text_batches( + graph_id, + chunks, + batch_size=3, + progress_callback=add_progress_callback + ) + + # 等待Zep处理完成(查询每个episode的processed状态) + task_manager.update_task( + task_id, + message="等待Zep处理数据...", + progress=55 + ) + + def wait_progress_callback(msg, progress_ratio): + progress = 55 + int(progress_ratio * 35) # 55% - 90% + task_manager.update_task( + task_id, + message=msg, + progress=progress + ) + + builder._wait_for_episodes(episode_uuids, wait_progress_callback) + + # 获取图谱数据 + task_manager.update_task( + task_id, + message="获取图谱数据...", + progress=95 + ) + graph_data = builder.get_graph_data(graph_id) + + # 更新项目状态 + project.status = ProjectStatus.GRAPH_COMPLETED + ProjectManager.save_project(project) + + node_count = graph_data.get("node_count", 0) + edge_count = graph_data.get("edge_count", 0) + build_logger.info(f"[{task_id}] 图谱构建完成: graph_id={graph_id}, 节点={node_count}, 边={edge_count}") + + # 完成 + task_manager.update_task( + task_id, + status=TaskStatus.COMPLETED, + message="图谱构建完成", + progress=100, + result={ + "project_id": project_id, + "graph_id": graph_id, + "node_count": node_count, + "edge_count": edge_count, + "chunk_count": total_chunks + } + ) + + except Exception as e: + # 更新项目状态为失败 + build_logger.error(f"[{task_id}] 图谱构建失败: {str(e)}") + build_logger.debug(traceback.format_exc()) + + project.status = ProjectStatus.FAILED + project.error = str(e) + ProjectManager.save_project(project) + + task_manager.update_task( + task_id, + status=TaskStatus.FAILED, + message=f"构建失败: {str(e)}", + error=traceback.format_exc() + ) + + # 启动后台线程 + thread = threading.Thread(target=build_task, daemon=True) + thread.start() + + return jsonify({ + "success": True, + "data": { + "project_id": project_id, + "task_id": task_id, + "message": "图谱构建任务已启动,请通过 /task/{task_id} 查询进度" + } + }) + + except Exception as e: + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ============== 任务查询接口 ============== + +@graph_bp.route('/task/', methods=['GET']) +def get_task(task_id: str): + """ + 查询任务状态 + """ + task = TaskManager().get_task(task_id) + + if not task: + return jsonify({ + "success": False, + "error": f"任务不存在: {task_id}" + }), 404 + + return jsonify({ + "success": True, + "data": task.to_dict() + }) + + +@graph_bp.route('/tasks', methods=['GET']) +def list_tasks(): + """ + 列出所有任务 + """ + tasks = TaskManager().list_tasks() + + return jsonify({ + "success": True, + "data": [t.to_dict() for t in tasks], + "count": len(tasks) + }) + + +# ============== 图谱数据接口 ============== + +@graph_bp.route('/data/', methods=['GET']) +def get_graph_data(graph_id: str): + """ + 获取图谱数据(节点和边) + """ + try: + if not Config.ZEP_API_KEY: + return jsonify({ + "success": False, + "error": "ZEP_API_KEY未配置" + }), 500 + + builder = GraphBuilderService(api_key=Config.ZEP_API_KEY) + graph_data = builder.get_graph_data(graph_id) + + return jsonify({ + "success": True, + "data": graph_data + }) + + except Exception as e: + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +@graph_bp.route('/delete/', methods=['DELETE']) +def delete_graph(graph_id: str): + """ + 删除Zep图谱 + """ + try: + if not Config.ZEP_API_KEY: + return jsonify({ + "success": False, + "error": "ZEP_API_KEY未配置" + }), 500 + + builder = GraphBuilderService(api_key=Config.ZEP_API_KEY) + builder.delete_graph(graph_id) + + return jsonify({ + "success": True, + "message": f"图谱已删除: {graph_id}" + }) + + except Exception as e: + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 diff --git a/backend/app/config.py b/backend/app/config.py new file mode 100644 index 0000000..32ad513 --- /dev/null +++ b/backend/app/config.py @@ -0,0 +1,53 @@ +""" +配置管理 +统一从项目根目录的 .env 文件加载配置 +""" + +import os +from dotenv import load_dotenv + +# 加载项目根目录的 .env 文件 +# 路径: MiroFish/.env (相对于 backend/app/config.py) +project_root_env = os.path.join(os.path.dirname(__file__), '../../.env') + +if os.path.exists(project_root_env): + load_dotenv(project_root_env) +else: + # 如果根目录没有 .env,尝试加载环境变量(用于生产环境) + load_dotenv() + + +class Config: + """Flask配置类""" + + # Flask配置 + SECRET_KEY = os.environ.get('SECRET_KEY', 'mirofish-secret-key') + DEBUG = os.environ.get('FLASK_DEBUG', 'True').lower() == 'true' + + # LLM配置(统一使用OpenAI格式) + LLM_API_KEY = os.environ.get('LLM_API_KEY') + LLM_BASE_URL = os.environ.get('LLM_BASE_URL', 'https://api.openai.com/v1') + LLM_MODEL_NAME = os.environ.get('LLM_MODEL_NAME', 'gpt-4o-mini') + + # Zep配置 + ZEP_API_KEY = os.environ.get('ZEP_API_KEY') + + # 文件上传配置 + MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB + UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '../uploads') + ALLOWED_EXTENSIONS = {'pdf', 'md', 'txt', 'markdown'} + + # 文本处理配置 + DEFAULT_CHUNK_SIZE = 500 # 默认切块大小 + DEFAULT_CHUNK_OVERLAP = 50 # 默认重叠大小 + + @classmethod + def validate(cls): + """验证必要配置""" + errors = [] + if not cls.LLM_API_KEY: + errors.append("LLM_API_KEY 未配置") + if not cls.ZEP_API_KEY: + errors.append("ZEP_API_KEY 未配置") + return errors + diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py new file mode 100644 index 0000000..55bec61 --- /dev/null +++ b/backend/app/models/__init__.py @@ -0,0 +1,9 @@ +""" +数据模型模块 +""" + +from .task import TaskManager, TaskStatus +from .project import Project, ProjectStatus, ProjectManager + +__all__ = ['TaskManager', 'TaskStatus', 'Project', 'ProjectStatus', 'ProjectManager'] + diff --git a/backend/app/models/project.py b/backend/app/models/project.py new file mode 100644 index 0000000..0897893 --- /dev/null +++ b/backend/app/models/project.py @@ -0,0 +1,305 @@ +""" +项目上下文管理 +用于在服务端持久化项目状态,避免前端在接口间传递大量数据 +""" + +import os +import json +import uuid +import shutil +from datetime import datetime +from typing import Dict, Any, List, Optional +from enum import Enum +from dataclasses import dataclass, field, asdict +from ..config import Config + + +class ProjectStatus(str, Enum): + """项目状态""" + CREATED = "created" # 刚创建,文件已上传 + ONTOLOGY_GENERATED = "ontology_generated" # 本体已生成 + GRAPH_BUILDING = "graph_building" # 图谱构建中 + GRAPH_COMPLETED = "graph_completed" # 图谱构建完成 + FAILED = "failed" # 失败 + + +@dataclass +class Project: + """项目数据模型""" + project_id: str + name: str + status: ProjectStatus + created_at: str + updated_at: str + + # 文件信息 + files: List[Dict[str, str]] = field(default_factory=list) # [{filename, path, size}] + total_text_length: int = 0 + + # 本体信息(接口1生成后填充) + ontology: Optional[Dict[str, Any]] = None + analysis_summary: Optional[str] = None + + # 图谱信息(接口2完成后填充) + graph_id: Optional[str] = None + graph_build_task_id: Optional[str] = None + + # 配置 + simulation_requirement: Optional[str] = None + chunk_size: int = 500 + chunk_overlap: int = 50 + + # 错误信息 + error: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """转换为字典""" + return { + "project_id": self.project_id, + "name": self.name, + "status": self.status.value if isinstance(self.status, ProjectStatus) else self.status, + "created_at": self.created_at, + "updated_at": self.updated_at, + "files": self.files, + "total_text_length": self.total_text_length, + "ontology": self.ontology, + "analysis_summary": self.analysis_summary, + "graph_id": self.graph_id, + "graph_build_task_id": self.graph_build_task_id, + "simulation_requirement": self.simulation_requirement, + "chunk_size": self.chunk_size, + "chunk_overlap": self.chunk_overlap, + "error": self.error + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'Project': + """从字典创建""" + status = data.get('status', 'created') + if isinstance(status, str): + status = ProjectStatus(status) + + return cls( + project_id=data['project_id'], + name=data.get('name', 'Unnamed Project'), + status=status, + created_at=data.get('created_at', ''), + updated_at=data.get('updated_at', ''), + files=data.get('files', []), + total_text_length=data.get('total_text_length', 0), + ontology=data.get('ontology'), + analysis_summary=data.get('analysis_summary'), + graph_id=data.get('graph_id'), + graph_build_task_id=data.get('graph_build_task_id'), + simulation_requirement=data.get('simulation_requirement'), + chunk_size=data.get('chunk_size', 500), + chunk_overlap=data.get('chunk_overlap', 50), + error=data.get('error') + ) + + +class ProjectManager: + """项目管理器 - 负责项目的持久化存储和检索""" + + # 项目存储根目录 + PROJECTS_DIR = os.path.join(Config.UPLOAD_FOLDER, 'projects') + + @classmethod + def _ensure_projects_dir(cls): + """确保项目目录存在""" + os.makedirs(cls.PROJECTS_DIR, exist_ok=True) + + @classmethod + def _get_project_dir(cls, project_id: str) -> str: + """获取项目目录路径""" + return os.path.join(cls.PROJECTS_DIR, project_id) + + @classmethod + def _get_project_meta_path(cls, project_id: str) -> str: + """获取项目元数据文件路径""" + return os.path.join(cls._get_project_dir(project_id), 'project.json') + + @classmethod + def _get_project_files_dir(cls, project_id: str) -> str: + """获取项目文件存储目录""" + return os.path.join(cls._get_project_dir(project_id), 'files') + + @classmethod + def _get_project_text_path(cls, project_id: str) -> str: + """获取项目提取文本存储路径""" + return os.path.join(cls._get_project_dir(project_id), 'extracted_text.txt') + + @classmethod + def create_project(cls, name: str = "Unnamed Project") -> Project: + """ + 创建新项目 + + Args: + name: 项目名称 + + Returns: + 新创建的Project对象 + """ + cls._ensure_projects_dir() + + project_id = f"proj_{uuid.uuid4().hex[:12]}" + now = datetime.now().isoformat() + + project = Project( + project_id=project_id, + name=name, + status=ProjectStatus.CREATED, + created_at=now, + updated_at=now + ) + + # 创建项目目录结构 + project_dir = cls._get_project_dir(project_id) + files_dir = cls._get_project_files_dir(project_id) + os.makedirs(project_dir, exist_ok=True) + os.makedirs(files_dir, exist_ok=True) + + # 保存项目元数据 + cls.save_project(project) + + return project + + @classmethod + def save_project(cls, project: Project) -> None: + """保存项目元数据""" + project.updated_at = datetime.now().isoformat() + meta_path = cls._get_project_meta_path(project.project_id) + + with open(meta_path, 'w', encoding='utf-8') as f: + json.dump(project.to_dict(), f, ensure_ascii=False, indent=2) + + @classmethod + def get_project(cls, project_id: str) -> Optional[Project]: + """ + 获取项目 + + Args: + project_id: 项目ID + + Returns: + Project对象,如果不存在返回None + """ + meta_path = cls._get_project_meta_path(project_id) + + if not os.path.exists(meta_path): + return None + + with open(meta_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + return Project.from_dict(data) + + @classmethod + def list_projects(cls, limit: int = 50) -> List[Project]: + """ + 列出所有项目 + + Args: + limit: 返回数量限制 + + Returns: + 项目列表,按创建时间倒序 + """ + cls._ensure_projects_dir() + + projects = [] + for project_id in os.listdir(cls.PROJECTS_DIR): + project = cls.get_project(project_id) + if project: + projects.append(project) + + # 按创建时间倒序排序 + projects.sort(key=lambda p: p.created_at, reverse=True) + + return projects[:limit] + + @classmethod + def delete_project(cls, project_id: str) -> bool: + """ + 删除项目及其所有文件 + + Args: + project_id: 项目ID + + Returns: + 是否删除成功 + """ + project_dir = cls._get_project_dir(project_id) + + if not os.path.exists(project_dir): + return False + + shutil.rmtree(project_dir) + return True + + @classmethod + def save_file_to_project(cls, project_id: str, file_storage, original_filename: str) -> Dict[str, str]: + """ + 保存上传的文件到项目目录 + + Args: + project_id: 项目ID + file_storage: Flask的FileStorage对象 + original_filename: 原始文件名 + + Returns: + 文件信息字典 {filename, path, size} + """ + files_dir = cls._get_project_files_dir(project_id) + os.makedirs(files_dir, exist_ok=True) + + # 生成安全的文件名 + ext = os.path.splitext(original_filename)[1].lower() + safe_filename = f"{uuid.uuid4().hex[:8]}{ext}" + file_path = os.path.join(files_dir, safe_filename) + + # 保存文件 + file_storage.save(file_path) + + # 获取文件大小 + file_size = os.path.getsize(file_path) + + return { + "original_filename": original_filename, + "saved_filename": safe_filename, + "path": file_path, + "size": file_size + } + + @classmethod + def save_extracted_text(cls, project_id: str, text: str) -> None: + """保存提取的文本""" + text_path = cls._get_project_text_path(project_id) + with open(text_path, 'w', encoding='utf-8') as f: + f.write(text) + + @classmethod + def get_extracted_text(cls, project_id: str) -> Optional[str]: + """获取提取的文本""" + text_path = cls._get_project_text_path(project_id) + + if not os.path.exists(text_path): + return None + + with open(text_path, 'r', encoding='utf-8') as f: + return f.read() + + @classmethod + def get_project_files(cls, project_id: str) -> List[str]: + """获取项目的所有文件路径""" + files_dir = cls._get_project_files_dir(project_id) + + if not os.path.exists(files_dir): + return [] + + return [ + os.path.join(files_dir, f) + for f in os.listdir(files_dir) + if os.path.isfile(os.path.join(files_dir, f)) + ] + diff --git a/backend/app/models/task.py b/backend/app/models/task.py new file mode 100644 index 0000000..2741f11 --- /dev/null +++ b/backend/app/models/task.py @@ -0,0 +1,178 @@ +""" +任务状态管理 +用于跟踪长时间运行的任务(如图谱构建) +""" + +import uuid +import threading +from datetime import datetime +from enum import Enum +from typing import Dict, Any, Optional +from dataclasses import dataclass, field + + +class TaskStatus(str, Enum): + """任务状态枚举""" + PENDING = "pending" # 等待中 + PROCESSING = "processing" # 处理中 + COMPLETED = "completed" # 已完成 + FAILED = "failed" # 失败 + + +@dataclass +class Task: + """任务数据类""" + task_id: str + task_type: str + status: TaskStatus + created_at: datetime + updated_at: datetime + progress: int = 0 # 进度百分比 0-100 + message: str = "" # 状态消息 + result: Optional[Dict] = None # 任务结果 + error: Optional[str] = None # 错误信息 + metadata: Dict = field(default_factory=dict) # 额外元数据 + + def to_dict(self) -> Dict[str, Any]: + """转换为字典""" + return { + "task_id": self.task_id, + "task_type": self.task_type, + "status": self.status.value, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "progress": self.progress, + "message": self.message, + "result": self.result, + "error": self.error, + "metadata": self.metadata, + } + + +class TaskManager: + """ + 任务管理器 + 线程安全的任务状态管理 + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + """单例模式""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._tasks: Dict[str, Task] = {} + cls._instance._task_lock = threading.Lock() + return cls._instance + + def create_task(self, task_type: str, metadata: Optional[Dict] = None) -> str: + """ + 创建新任务 + + Args: + task_type: 任务类型 + metadata: 额外元数据 + + Returns: + 任务ID + """ + task_id = str(uuid.uuid4()) + now = datetime.now() + + task = Task( + task_id=task_id, + task_type=task_type, + status=TaskStatus.PENDING, + created_at=now, + updated_at=now, + metadata=metadata or {} + ) + + with self._task_lock: + self._tasks[task_id] = task + + return task_id + + def get_task(self, task_id: str) -> Optional[Task]: + """获取任务""" + with self._task_lock: + return self._tasks.get(task_id) + + def update_task( + self, + task_id: str, + status: Optional[TaskStatus] = None, + progress: Optional[int] = None, + message: Optional[str] = None, + result: Optional[Dict] = None, + error: Optional[str] = None + ): + """ + 更新任务状态 + + Args: + task_id: 任务ID + status: 新状态 + progress: 进度 + message: 消息 + result: 结果 + error: 错误信息 + """ + with self._task_lock: + task = self._tasks.get(task_id) + if task: + task.updated_at = datetime.now() + if status is not None: + task.status = status + if progress is not None: + task.progress = progress + if message is not None: + task.message = message + if result is not None: + task.result = result + if error is not None: + task.error = error + + def complete_task(self, task_id: str, result: Dict): + """标记任务完成""" + self.update_task( + task_id, + status=TaskStatus.COMPLETED, + progress=100, + message="任务完成", + result=result + ) + + def fail_task(self, task_id: str, error: str): + """标记任务失败""" + self.update_task( + task_id, + status=TaskStatus.FAILED, + message="任务失败", + error=error + ) + + def list_tasks(self, task_type: Optional[str] = None) -> list: + """列出任务""" + with self._task_lock: + tasks = list(self._tasks.values()) + if task_type: + tasks = [t for t in tasks if t.task_type == task_type] + return [t.to_dict() for t in sorted(tasks, key=lambda x: x.created_at, reverse=True)] + + def cleanup_old_tasks(self, max_age_hours: int = 24): + """清理旧任务""" + from datetime import timedelta + cutoff = datetime.now() - timedelta(hours=max_age_hours) + + with self._task_lock: + old_ids = [ + tid for tid, task in self._tasks.items() + if task.created_at < cutoff and task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] + ] + for tid in old_ids: + del self._tasks[tid] + diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py new file mode 100644 index 0000000..42cdc5b --- /dev/null +++ b/backend/app/services/__init__.py @@ -0,0 +1,10 @@ +""" +业务服务模块 +""" + +from .ontology_generator import OntologyGenerator +from .graph_builder import GraphBuilderService +from .text_processor import TextProcessor + +__all__ = ['OntologyGenerator', 'GraphBuilderService', 'TextProcessor'] + diff --git a/backend/app/services/graph_builder.py b/backend/app/services/graph_builder.py new file mode 100644 index 0000000..f8b68e3 --- /dev/null +++ b/backend/app/services/graph_builder.py @@ -0,0 +1,457 @@ +""" +图谱构建服务 +接口2:使用Zep API构建Standalone Graph +""" + +import os +import uuid +import time +import threading +from typing import Dict, Any, List, Optional, Callable +from dataclasses import dataclass + +from zep_cloud.client import Zep +from zep_cloud import EpisodeData, EntityEdgeSourceTarget + +from ..config import Config +from ..models.task import TaskManager, TaskStatus +from .text_processor import TextProcessor + + +@dataclass +class GraphInfo: + """图谱信息""" + graph_id: str + node_count: int + edge_count: int + entity_types: List[str] + + def to_dict(self) -> Dict[str, Any]: + return { + "graph_id": self.graph_id, + "node_count": self.node_count, + "edge_count": self.edge_count, + "entity_types": self.entity_types, + } + + +class GraphBuilderService: + """ + 图谱构建服务 + 负责调用Zep API构建知识图谱 + """ + + def __init__(self, api_key: Optional[str] = None): + self.api_key = api_key or Config.ZEP_API_KEY + if not self.api_key: + raise ValueError("ZEP_API_KEY 未配置") + + self.client = Zep(api_key=self.api_key) + self.task_manager = TaskManager() + + def build_graph_async( + self, + text: str, + ontology: Dict[str, Any], + graph_name: str = "MiroFish Graph", + chunk_size: int = 500, + chunk_overlap: int = 50, + batch_size: int = 3 + ) -> str: + """ + 异步构建图谱 + + Args: + text: 输入文本 + ontology: 本体定义(来自接口1的输出) + graph_name: 图谱名称 + chunk_size: 文本块大小 + chunk_overlap: 块重叠大小 + batch_size: 每批发送的块数量 + + Returns: + 任务ID + """ + # 创建任务 + task_id = self.task_manager.create_task( + task_type="graph_build", + metadata={ + "graph_name": graph_name, + "chunk_size": chunk_size, + "text_length": len(text), + } + ) + + # 在后台线程中执行构建 + thread = threading.Thread( + target=self._build_graph_worker, + args=(task_id, text, ontology, graph_name, chunk_size, chunk_overlap, batch_size) + ) + thread.daemon = True + thread.start() + + return task_id + + def _build_graph_worker( + self, + task_id: str, + text: str, + ontology: Dict[str, Any], + graph_name: str, + chunk_size: int, + chunk_overlap: int, + batch_size: int + ): + """图谱构建工作线程""" + try: + self.task_manager.update_task( + task_id, + status=TaskStatus.PROCESSING, + progress=5, + message="开始构建图谱..." + ) + + # 1. 创建图谱 + graph_id = self.create_graph(graph_name) + self.task_manager.update_task( + task_id, + progress=10, + message=f"图谱已创建: {graph_id}" + ) + + # 2. 设置本体 + self.set_ontology(graph_id, ontology) + self.task_manager.update_task( + task_id, + progress=15, + message="本体已设置" + ) + + # 3. 文本分块 + chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap) + total_chunks = len(chunks) + self.task_manager.update_task( + task_id, + progress=20, + message=f"文本已分割为 {total_chunks} 个块" + ) + + # 4. 分批发送数据 + episode_uuids = self.add_text_batches( + graph_id, chunks, batch_size, + lambda msg, prog: self.task_manager.update_task( + task_id, + progress=20 + int(prog * 0.4), # 20-60% + message=msg + ) + ) + + # 5. 等待Zep处理完成 + self.task_manager.update_task( + task_id, + progress=60, + message="等待Zep处理数据..." + ) + + self._wait_for_episodes( + episode_uuids, + lambda msg, prog: self.task_manager.update_task( + task_id, + progress=60 + int(prog * 0.3), # 60-90% + message=msg + ) + ) + + # 6. 获取图谱信息 + self.task_manager.update_task( + task_id, + progress=90, + message="获取图谱信息..." + ) + + graph_info = self._get_graph_info(graph_id) + + # 完成 + self.task_manager.complete_task(task_id, { + "graph_id": graph_id, + "graph_info": graph_info.to_dict(), + "chunks_processed": total_chunks, + }) + + except Exception as e: + import traceback + error_msg = f"{str(e)}\n{traceback.format_exc()}" + self.task_manager.fail_task(task_id, error_msg) + + def create_graph(self, name: str) -> str: + """创建Zep图谱(公开方法)""" + graph_id = f"mirofish_{uuid.uuid4().hex[:16]}" + + self.client.graph.create( + graph_id=graph_id, + name=name, + description="MiroFish Social Simulation Graph" + ) + + return graph_id + + def set_ontology(self, graph_id: str, ontology: Dict[str, Any]): + """设置图谱本体(公开方法)""" + from typing import Optional + from pydantic import Field + from zep_cloud.external_clients.ontology import EntityModel, EntityText, EdgeModel + + # Zep 保留名称,不能作为属性名 + RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'} + + def safe_attr_name(attr_name: str) -> str: + """将保留名称转换为安全名称""" + if attr_name.lower() in RESERVED_NAMES: + return f"entity_{attr_name}" + return attr_name + + # 动态创建实体类型 + entity_types = {} + for entity_def in ontology.get("entity_types", []): + name = entity_def["name"] + description = entity_def.get("description", f"A {name} entity.") + + # 创建属性字典和类型注解(Pydantic v2 需要) + attrs = {"__doc__": description} + annotations = {} + + for attr_def in entity_def.get("attributes", []): + attr_name = safe_attr_name(attr_def["name"]) # 使用安全名称 + attr_desc = attr_def.get("description", attr_name) + attrs[attr_name] = Field(description=attr_desc, default=None) + annotations[attr_name] = Optional[EntityText] # 类型注解 + + attrs["__annotations__"] = annotations + + # 动态创建类 + entity_class = type(name, (EntityModel,), attrs) + entity_class.__doc__ = description + entity_types[name] = entity_class + + # 动态创建边类型 + edge_definitions = {} + for edge_def in ontology.get("edge_types", []): + name = edge_def["name"] + description = edge_def.get("description", f"A {name} relationship.") + + # 创建属性字典和类型注解 + attrs = {"__doc__": description} + annotations = {} + + for attr_def in edge_def.get("attributes", []): + attr_name = safe_attr_name(attr_def["name"]) # 使用安全名称 + attr_desc = attr_def.get("description", attr_name) + attrs[attr_name] = Field(description=attr_desc, default=None) + annotations[attr_name] = Optional[str] # 边属性用str类型 + + attrs["__annotations__"] = annotations + + # 动态创建类 + class_name = ''.join(word.capitalize() for word in name.split('_')) + edge_class = type(class_name, (EdgeModel,), attrs) + edge_class.__doc__ = description + + # 构建source_targets + source_targets = [] + for st in edge_def.get("source_targets", []): + source_targets.append( + EntityEdgeSourceTarget( + source=st.get("source", "Entity"), + target=st.get("target", "Entity") + ) + ) + + if source_targets: + edge_definitions[name] = (edge_class, source_targets) + + # 调用Zep API设置本体 + if entity_types or edge_definitions: + self.client.graph.set_ontology( + graph_ids=[graph_id], + entities=entity_types if entity_types else None, + edges=edge_definitions if edge_definitions else None, + ) + + def add_text_batches( + self, + graph_id: str, + chunks: List[str], + batch_size: int = 3, + progress_callback: Optional[Callable] = None + ) -> List[str]: + """分批添加文本到图谱,返回所有 episode 的 uuid 列表""" + episode_uuids = [] + total_chunks = len(chunks) + + for i in range(0, total_chunks, batch_size): + batch_chunks = chunks[i:i + batch_size] + batch_num = i // batch_size + 1 + total_batches = (total_chunks + batch_size - 1) // batch_size + + if progress_callback: + progress = (i + len(batch_chunks)) / total_chunks + progress_callback( + f"发送第 {batch_num}/{total_batches} 批数据 ({len(batch_chunks)} 块)...", + progress + ) + + # 构建episode数据 + episodes = [ + EpisodeData(data=chunk, type="text") + for chunk in batch_chunks + ] + + # 发送到Zep + try: + batch_result = self.client.graph.add_batch( + graph_id=graph_id, + episodes=episodes + ) + + # 收集返回的 episode uuid + if batch_result and isinstance(batch_result, list): + for ep in batch_result: + ep_uuid = getattr(ep, 'uuid_', None) or getattr(ep, 'uuid', None) + if ep_uuid: + episode_uuids.append(ep_uuid) + + # 避免请求过快 + time.sleep(1) + + except Exception as e: + if progress_callback: + progress_callback(f"批次 {batch_num} 发送失败: {str(e)}", 0) + raise + + return episode_uuids + + def _wait_for_episodes( + self, + episode_uuids: List[str], + progress_callback: Optional[Callable] = None, + timeout: int = 600 + ): + """等待所有 episode 处理完成(通过查询每个 episode 的 processed 状态)""" + if not episode_uuids: + if progress_callback: + progress_callback("无需等待(没有 episode)", 1.0) + return + + start_time = time.time() + pending_episodes = set(episode_uuids) + completed_count = 0 + total_episodes = len(episode_uuids) + + if progress_callback: + progress_callback(f"开始等待 {total_episodes} 个文本块处理...", 0) + + while pending_episodes: + if time.time() - start_time > timeout: + if progress_callback: + progress_callback( + f"部分文本块超时,已完成 {completed_count}/{total_episodes}", + completed_count / total_episodes + ) + break + + # 检查每个 episode 的处理状态 + for ep_uuid in list(pending_episodes): + try: + episode = self.client.graph.episode.get(uuid_=ep_uuid) + is_processed = getattr(episode, 'processed', False) + + if is_processed: + pending_episodes.remove(ep_uuid) + completed_count += 1 + + except Exception as e: + # 忽略单个查询错误,继续 + pass + + elapsed = int(time.time() - start_time) + if progress_callback: + progress_callback( + f"Zep处理中... {completed_count}/{total_episodes} 完成, {len(pending_episodes)} 待处理 ({elapsed}秒)", + completed_count / total_episodes if total_episodes > 0 else 0 + ) + + if pending_episodes: + time.sleep(3) # 每3秒检查一次 + + if progress_callback: + progress_callback(f"处理完成: {completed_count}/{total_episodes}", 1.0) + + def _get_graph_info(self, graph_id: str) -> GraphInfo: + """获取图谱信息""" + # 获取节点 + nodes = self.client.graph.node.get_by_graph_id(graph_id=graph_id) + + # 获取边 + edges = self.client.graph.edge.get_by_graph_id(graph_id=graph_id) + + # 统计实体类型 + entity_types = set() + for node in nodes: + if node.labels: + for label in node.labels: + if label not in ["Entity", "Node"]: + entity_types.add(label) + + return GraphInfo( + graph_id=graph_id, + node_count=len(nodes), + edge_count=len(edges), + entity_types=list(entity_types) + ) + + def get_graph_data(self, graph_id: str) -> Dict[str, Any]: + """ + 获取完整图谱数据 + + Args: + graph_id: 图谱ID + + Returns: + 包含nodes和edges的字典 + """ + nodes = self.client.graph.node.get_by_graph_id(graph_id=graph_id) + edges = self.client.graph.edge.get_by_graph_id(graph_id=graph_id) + + nodes_data = [] + for node in nodes: + nodes_data.append({ + "uuid": node.uuid_, + "name": node.name, + "labels": node.labels or [], + "summary": node.summary or "", + "attributes": node.attributes or {}, + }) + + edges_data = [] + for edge in edges: + edges_data.append({ + "uuid": edge.uuid_, + "name": edge.name or "", + "fact": edge.fact or "", + "source_node_uuid": edge.source_node_uuid, + "target_node_uuid": edge.target_node_uuid, + "attributes": edge.attributes or {}, + }) + + return { + "graph_id": graph_id, + "nodes": nodes_data, + "edges": edges_data, + "node_count": len(nodes_data), + "edge_count": len(edges_data), + } + + def delete_graph(self, graph_id: str): + """删除图谱""" + self.client.graph.delete(graph_id=graph_id) + diff --git a/backend/app/services/ontology_generator.py b/backend/app/services/ontology_generator.py new file mode 100644 index 0000000..a64f919 --- /dev/null +++ b/backend/app/services/ontology_generator.py @@ -0,0 +1,361 @@ +""" +本体生成服务 +接口1:分析文本内容,生成适合社会模拟的实体和关系类型定义 +""" + +import json +from typing import Dict, Any, List, Optional +from ..utils.llm_client import LLMClient + + +# 本体生成的系统提示词 +ONTOLOGY_SYSTEM_PROMPT = """你是一个专业的知识图谱本体设计专家。你的任务是分析给定的文本内容和模拟需求,设计适合**社交媒体舆论模拟**的实体类型和关系类型。 + +**重要:你必须输出有效的JSON格式数据,不要输出任何其他内容。** + +## 核心任务背景 + +我们正在构建一个**社交媒体舆论模拟系统**。在这个系统中: +- 每个实体都是一个可以在社交媒体上发声、互动、传播信息的"账号"或"主体" +- 实体之间会相互影响、转发、评论、回应 +- 我们需要模拟舆论事件中各方的反应和信息传播路径 + +因此,**实体必须是现实中真实存在的、可以在社媒上发声和互动的主体**: + +**可以是(鼓励多样化划分)**: +- 具体的个人(公众人物、当事人、意见领袖、专家学者) +- 公司、企业(包括其官方账号) +- 组织机构(大学、协会、NGO、工会等) +- 政府部门、监管机构 +- 媒体机构(报纸、电视台、自媒体、网站) +- 社交媒体平台本身 +- 特定群体代表(如校友会、粉丝团、维权群体等) + +**不可以是**: +- 抽象概念(如"舆论"、"情绪"、"趋势") +- 主题/话题(如"学术诚信"、"教育改革") +- 观点/态度(如"支持方"、"反对方") +- 泛指群体(如"网友"、"公众"、"学生群体") + +## 输出格式 + +请输出JSON格式,包含以下结构: + +```json +{ + "entity_types": [ + { + "name": "实体类型名称(英文,PascalCase)", + "description": "简短描述(英文,不超过100字符)", + "attributes": [ + { + "name": "属性名(英文,snake_case)", + "type": "text", + "description": "属性描述" + } + ], + "examples": ["示例实体1", "示例实体2"] + } + ], + "edge_types": [ + { + "name": "关系类型名称(英文,UPPER_SNAKE_CASE)", + "description": "简短描述(英文,不超过100字符)", + "source_targets": [ + {"source": "源实体类型", "target": "目标实体类型"} + ], + "attributes": [] + } + ], + "analysis_summary": "对文本内容的简要分析说明(中文)" +} +``` + +## 设计指南 + +1. **实体类型设计(重要!请尽量多划分)**: + - **数量要求:至少5个,最多10个实体类型** + - 每个实体类型代表一类可以在社媒上发声的主体 + - 尽量细分不同角色,例如: + - 不要只用"Person",可以细分为"PublicFigure"、"Expert"、"Whistleblower"等 + - 不要只用"Organization",可以细分为"University"、"Company"、"NGO"等 + - description必须清晰说明什么样的实体应该被提取 + - 每个类型提供2-3个具体示例 + +2. **关系类型设计**: + - 关系应该反映社媒互动中的真实联系 + - 关注可能影响舆论传播的关系: + - 信息传播:REPORTS_ON, COMMENTS_ON, SHARES + - 组织关系:AFFILIATED_WITH, WORKS_FOR, REPRESENTS + - 互动关系:RESPONDS_TO, SUPPORTS, OPPOSES + - 关系类型:5-10个为宜 + +3. **属性设计**: + - 每个实体类型1-3个关键属性 + - 属性应有助于识别实体的社媒影响力(如role、influence_level等) + +## 实体类型参考(请根据文本内容灵活选择和扩展) + +- Person: 普通个人 +- PublicFigure: 公众人物(明星、网红、意见领袖) +- Expert: 专家学者 +- Journalist: 记者 +- Company: 公司企业 +- University: 高校 +- GovernmentAgency: 政府机构 +- MediaOutlet: 传统媒体 +- SelfMedia: 自媒体账号 +- SocialPlatform: 社交媒体平台 +- NGO: 非政府组织 +- IndustryAssociation: 行业协会 +- AlumniAssociation: 校友会 +- FanGroup: 粉丝群体/支持群体 + +## 关系类型参考 + +- WORKS_FOR: 工作于 +- AFFILIATED_WITH: 隶属于 +- REPRESENTS: 代表 +- REGULATES: 监管 +- REPORTS_ON: 报道 +- COMMENTS_ON: 评论 +- RESPONDS_TO: 回应 +- SUPPORTS: 支持 +- OPPOSES: 反对 +- COLLABORATES_WITH: 合作 +- COMPETES_WITH: 竞争 +""" + + +class OntologyGenerator: + """ + 本体生成器 + 分析文本内容,生成实体和关系类型定义 + """ + + def __init__(self, llm_client: Optional[LLMClient] = None): + self.llm_client = llm_client or LLMClient() + + def generate( + self, + document_texts: List[str], + simulation_requirement: str, + additional_context: Optional[str] = None + ) -> Dict[str, Any]: + """ + 生成本体定义 + + Args: + document_texts: 文档文本列表 + simulation_requirement: 模拟需求描述 + additional_context: 额外上下文 + + Returns: + 本体定义(entity_types, edge_types等) + """ + # 构建用户消息 + user_message = self._build_user_message( + document_texts, + simulation_requirement, + additional_context + ) + + messages = [ + {"role": "system", "content": ONTOLOGY_SYSTEM_PROMPT}, + {"role": "user", "content": user_message} + ] + + # 调用LLM + result = self.llm_client.chat_json( + messages=messages, + temperature=0.3, + max_tokens=4096 + ) + + # 验证和后处理 + result = self._validate_and_process(result) + + return result + + # 传给 LLM 的文本最大长度(5万字) + MAX_TEXT_LENGTH_FOR_LLM = 50000 + + def _build_user_message( + self, + document_texts: List[str], + simulation_requirement: str, + additional_context: Optional[str] + ) -> str: + """构建用户消息""" + + # 合并文本 + combined_text = "\n\n---\n\n".join(document_texts) + original_length = len(combined_text) + + # 如果文本超过5万字,截断(仅影响传给LLM的内容,不影响图谱构建) + if len(combined_text) > self.MAX_TEXT_LENGTH_FOR_LLM: + combined_text = combined_text[:self.MAX_TEXT_LENGTH_FOR_LLM] + combined_text += f"\n\n...(原文共{original_length}字,已截取前{self.MAX_TEXT_LENGTH_FOR_LLM}字用于本体分析)..." + + message = f"""## 模拟需求 + +{simulation_requirement} + +## 文档内容 + +{combined_text} +""" + + if additional_context: + message += f""" +## 额外说明 + +{additional_context} +""" + + message += """ +请根据以上内容,设计适合社会舆论模拟的实体类型和关系类型。 +记住:所有实体类型必须是现实中可以发声的主体,不能是抽象概念。 +""" + + return message + + def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]: + """验证和后处理结果""" + + # 确保必要字段存在 + if "entity_types" not in result: + result["entity_types"] = [] + if "edge_types" not in result: + result["edge_types"] = [] + if "analysis_summary" not in result: + result["analysis_summary"] = "" + + # 验证实体类型 + for entity in result["entity_types"]: + if "attributes" not in entity: + entity["attributes"] = [] + if "examples" not in entity: + entity["examples"] = [] + # 确保description不超过100字符 + if len(entity.get("description", "")) > 100: + entity["description"] = entity["description"][:97] + "..." + + # 验证关系类型 + for edge in result["edge_types"]: + if "source_targets" not in edge: + edge["source_targets"] = [] + if "attributes" not in edge: + edge["attributes"] = [] + if len(edge.get("description", "")) > 100: + edge["description"] = edge["description"][:97] + "..." + + return result + + def generate_python_code(self, ontology: Dict[str, Any]) -> str: + """ + 将本体定义转换为Python代码(类似ontology.py) + + Args: + ontology: 本体定义 + + Returns: + Python代码字符串 + """ + code_lines = [ + '"""', + '自定义实体类型定义', + '由MiroFish自动生成,用于社会舆论模拟', + '"""', + '', + 'from pydantic import Field', + 'from zep_cloud.external_clients.ontology import EntityModel, EntityText, EdgeModel', + '', + '', + '# ============== 实体类型定义 ==============', + '', + ] + + # 生成实体类型 + for entity in ontology.get("entity_types", []): + name = entity["name"] + desc = entity.get("description", f"A {name} entity.") + + code_lines.append(f'class {name}(EntityModel):') + code_lines.append(f' """{desc}"""') + + attrs = entity.get("attributes", []) + if attrs: + for attr in attrs: + attr_name = attr["name"] + attr_desc = attr.get("description", attr_name) + code_lines.append(f' {attr_name}: EntityText = Field(') + code_lines.append(f' description="{attr_desc}",') + code_lines.append(f' default=None') + code_lines.append(f' )') + else: + code_lines.append(' pass') + + code_lines.append('') + code_lines.append('') + + code_lines.append('# ============== 关系类型定义 ==============') + code_lines.append('') + + # 生成关系类型 + for edge in ontology.get("edge_types", []): + name = edge["name"] + # 转换为PascalCase类名 + class_name = ''.join(word.capitalize() for word in name.split('_')) + desc = edge.get("description", f"A {name} relationship.") + + code_lines.append(f'class {class_name}(EdgeModel):') + code_lines.append(f' """{desc}"""') + + attrs = edge.get("attributes", []) + if attrs: + for attr in attrs: + attr_name = attr["name"] + attr_desc = attr.get("description", attr_name) + code_lines.append(f' {attr_name}: EntityText = Field(') + code_lines.append(f' description="{attr_desc}",') + code_lines.append(f' default=None') + code_lines.append(f' )') + else: + code_lines.append(' pass') + + code_lines.append('') + code_lines.append('') + + # 生成类型字典 + code_lines.append('# ============== 类型配置 ==============') + code_lines.append('') + code_lines.append('ENTITY_TYPES = {') + for entity in ontology.get("entity_types", []): + name = entity["name"] + code_lines.append(f' "{name}": {name},') + code_lines.append('}') + code_lines.append('') + code_lines.append('EDGE_TYPES = {') + for edge in ontology.get("edge_types", []): + name = edge["name"] + class_name = ''.join(word.capitalize() for word in name.split('_')) + code_lines.append(f' "{name}": {class_name},') + code_lines.append('}') + code_lines.append('') + + # 生成边的source_targets映射 + code_lines.append('EDGE_SOURCE_TARGETS = {') + for edge in ontology.get("edge_types", []): + name = edge["name"] + source_targets = edge.get("source_targets", []) + if source_targets: + st_list = ', '.join([ + f'{{"source": "{st.get("source", "Entity")}", "target": "{st.get("target", "Entity")}"}}' + for st in source_targets + ]) + code_lines.append(f' "{name}": [{st_list}],') + code_lines.append('}') + + return '\n'.join(code_lines) + diff --git a/backend/app/services/text_processor.py b/backend/app/services/text_processor.py new file mode 100644 index 0000000..91e32ac --- /dev/null +++ b/backend/app/services/text_processor.py @@ -0,0 +1,71 @@ +""" +文本处理服务 +""" + +from typing import List, Optional +from ..utils.file_parser import FileParser, split_text_into_chunks + + +class TextProcessor: + """文本处理器""" + + @staticmethod + def extract_from_files(file_paths: List[str]) -> str: + """从多个文件提取文本""" + return FileParser.extract_from_multiple(file_paths) + + @staticmethod + def split_text( + text: str, + chunk_size: int = 500, + overlap: int = 50 + ) -> List[str]: + """ + 分割文本 + + Args: + text: 原始文本 + chunk_size: 块大小 + overlap: 重叠大小 + + Returns: + 文本块列表 + """ + return split_text_into_chunks(text, chunk_size, overlap) + + @staticmethod + def preprocess_text(text: str) -> str: + """ + 预处理文本 + - 移除多余空白 + - 标准化换行 + + Args: + text: 原始文本 + + Returns: + 处理后的文本 + """ + import re + + # 标准化换行 + text = text.replace('\r\n', '\n').replace('\r', '\n') + + # 移除连续空行(保留最多两个换行) + text = re.sub(r'\n{3,}', '\n\n', text) + + # 移除行首行尾空白 + lines = [line.strip() for line in text.split('\n')] + text = '\n'.join(lines) + + return text.strip() + + @staticmethod + def get_text_stats(text: str) -> dict: + """获取文本统计信息""" + return { + "total_chars": len(text), + "total_lines": text.count('\n') + 1, + "total_words": len(text.split()), + } + diff --git a/backend/app/utils/__init__.py b/backend/app/utils/__init__.py new file mode 100644 index 0000000..5848792 --- /dev/null +++ b/backend/app/utils/__init__.py @@ -0,0 +1,9 @@ +""" +工具模块 +""" + +from .file_parser import FileParser +from .llm_client import LLMClient + +__all__ = ['FileParser', 'LLMClient'] + diff --git a/backend/app/utils/file_parser.py b/backend/app/utils/file_parser.py new file mode 100644 index 0000000..ac0f636 --- /dev/null +++ b/backend/app/utils/file_parser.py @@ -0,0 +1,141 @@ +""" +文件解析工具 +支持PDF、Markdown、TXT文件的文本提取 +""" + +import os +from pathlib import Path +from typing import List, Optional + + +class FileParser: + """文件解析器""" + + SUPPORTED_EXTENSIONS = {'.pdf', '.md', '.markdown', '.txt'} + + @classmethod + def extract_text(cls, file_path: str) -> str: + """ + 从文件中提取文本 + + Args: + file_path: 文件路径 + + Returns: + 提取的文本内容 + """ + path = Path(file_path) + + if not path.exists(): + raise FileNotFoundError(f"文件不存在: {file_path}") + + suffix = path.suffix.lower() + + if suffix not in cls.SUPPORTED_EXTENSIONS: + raise ValueError(f"不支持的文件格式: {suffix}") + + if suffix == '.pdf': + return cls._extract_from_pdf(file_path) + elif suffix in {'.md', '.markdown'}: + return cls._extract_from_md(file_path) + elif suffix == '.txt': + return cls._extract_from_txt(file_path) + + raise ValueError(f"无法处理的文件格式: {suffix}") + + @staticmethod + def _extract_from_pdf(file_path: str) -> str: + """从PDF提取文本""" + try: + import fitz # PyMuPDF + except ImportError: + raise ImportError("需要安装PyMuPDF: pip install PyMuPDF") + + text_parts = [] + with fitz.open(file_path) as doc: + for page in doc: + text = page.get_text() + if text.strip(): + text_parts.append(text) + + return "\n\n".join(text_parts) + + @staticmethod + def _extract_from_md(file_path: str) -> str: + """从Markdown提取文本""" + with open(file_path, 'r', encoding='utf-8') as f: + return f.read() + + @staticmethod + def _extract_from_txt(file_path: str) -> str: + """从TXT提取文本""" + with open(file_path, 'r', encoding='utf-8') as f: + return f.read() + + @classmethod + def extract_from_multiple(cls, file_paths: List[str]) -> str: + """ + 从多个文件提取文本并合并 + + Args: + file_paths: 文件路径列表 + + Returns: + 合并后的文本 + """ + all_texts = [] + + for i, file_path in enumerate(file_paths, 1): + try: + text = cls.extract_text(file_path) + filename = Path(file_path).name + all_texts.append(f"=== 文档 {i}: {filename} ===\n{text}") + except Exception as e: + all_texts.append(f"=== 文档 {i}: {file_path} (提取失败: {str(e)}) ===") + + return "\n\n".join(all_texts) + + +def split_text_into_chunks( + text: str, + chunk_size: int = 500, + overlap: int = 50 +) -> List[str]: + """ + 将文本分割成小块 + + Args: + text: 原始文本 + chunk_size: 每块的字符数 + overlap: 重叠字符数 + + Returns: + 文本块列表 + """ + if len(text) <= chunk_size: + return [text] if text.strip() else [] + + chunks = [] + start = 0 + + while start < len(text): + end = start + chunk_size + + # 尝试在句子边界处分割 + if end < len(text): + # 查找最近的句子结束符 + for sep in ['。', '!', '?', '.\n', '!\n', '?\n', '\n\n', '. ', '! ', '? ']: + last_sep = text[start:end].rfind(sep) + if last_sep != -1 and last_sep > chunk_size * 0.3: + end = start + last_sep + len(sep) + break + + chunk = text[start:end].strip() + if chunk: + chunks.append(chunk) + + # 下一个块从重叠位置开始 + start = end - overlap if end < len(text) else len(text) + + return chunks + diff --git a/backend/app/utils/llm_client.py b/backend/app/utils/llm_client.py new file mode 100644 index 0000000..6db1548 --- /dev/null +++ b/backend/app/utils/llm_client.py @@ -0,0 +1,91 @@ +""" +LLM客户端封装 +统一使用OpenAI格式调用 +""" + +import json +from typing import Optional, Dict, Any, List +from openai import OpenAI + +from ..config import Config + + +class LLMClient: + """LLM客户端""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + model: Optional[str] = None + ): + self.api_key = api_key or Config.LLM_API_KEY + self.base_url = base_url or Config.LLM_BASE_URL + self.model = model or Config.LLM_MODEL_NAME + + if not self.api_key: + raise ValueError("LLM_API_KEY 未配置") + + self.client = OpenAI( + api_key=self.api_key, + base_url=self.base_url + ) + + def chat( + self, + messages: List[Dict[str, str]], + temperature: float = 0.7, + max_tokens: int = 4096, + response_format: Optional[Dict] = None + ) -> str: + """ + 发送聊天请求 + + Args: + messages: 消息列表 + temperature: 温度参数 + max_tokens: 最大token数 + response_format: 响应格式(如JSON模式) + + Returns: + 模型响应文本 + """ + kwargs = { + "model": self.model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens, + } + + if response_format: + kwargs["response_format"] = response_format + + response = self.client.chat.completions.create(**kwargs) + return response.choices[0].message.content + + def chat_json( + self, + messages: List[Dict[str, str]], + temperature: float = 0.3, + max_tokens: int = 4096 + ) -> Dict[str, Any]: + """ + 发送聊天请求并返回JSON + + Args: + messages: 消息列表 + temperature: 温度参数 + max_tokens: 最大token数 + + Returns: + 解析后的JSON对象 + """ + response = self.chat( + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + response_format={"type": "json_object"} + ) + + return json.loads(response) + diff --git a/backend/app/utils/logger.py b/backend/app/utils/logger.py new file mode 100644 index 0000000..078405b --- /dev/null +++ b/backend/app/utils/logger.py @@ -0,0 +1,107 @@ +""" +日志配置模块 +提供统一的日志管理,同时输出到控制台和文件 +""" + +import os +import logging +from datetime import datetime +from logging.handlers import RotatingFileHandler + + +# 日志目录 +LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'logs') + + +def setup_logger(name: str = 'mirofish', level: int = logging.DEBUG) -> logging.Logger: + """ + 设置日志器 + + Args: + name: 日志器名称 + level: 日志级别 + + Returns: + 配置好的日志器 + """ + # 确保日志目录存在 + os.makedirs(LOG_DIR, exist_ok=True) + + # 创建日志器 + logger = logging.getLogger(name) + logger.setLevel(level) + + # 如果已经有处理器,不重复添加 + if logger.handlers: + return logger + + # 日志格式 + detailed_formatter = logging.Formatter( + '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + simple_formatter = logging.Formatter( + '[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%H:%M:%S' + ) + + # 1. 文件处理器 - 详细日志(按日期命名,带轮转) + log_filename = datetime.now().strftime('%Y-%m-%d') + '.log' + file_handler = RotatingFileHandler( + os.path.join(LOG_DIR, log_filename), + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5, + encoding='utf-8' + ) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(detailed_formatter) + + # 2. 控制台处理器 - 简洁日志(INFO及以上) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter(simple_formatter) + + # 添加处理器 + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger + + +def get_logger(name: str = 'mirofish') -> logging.Logger: + """ + 获取日志器(如果不存在则创建) + + Args: + name: 日志器名称 + + Returns: + 日志器实例 + """ + logger = logging.getLogger(name) + if not logger.handlers: + return setup_logger(name) + return logger + + +# 创建默认日志器 +logger = setup_logger() + + +# 便捷方法 +def debug(msg, *args, **kwargs): + logger.debug(msg, *args, **kwargs) + +def info(msg, *args, **kwargs): + logger.info(msg, *args, **kwargs) + +def warning(msg, *args, **kwargs): + logger.warning(msg, *args, **kwargs) + +def error(msg, *args, **kwargs): + logger.error(msg, *args, **kwargs) + +def critical(msg, *args, **kwargs): + logger.critical(msg, *args, **kwargs) + diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..0824a74 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,22 @@ +# Flask框架 +flask>=3.0.0 +flask-cors>=4.0.0 + +# Zep Cloud SDK +zep-cloud>=2.0.0 + +# OpenAI SDK(用于LLM调用) +openai>=1.0.0 + +# PDF处理 +PyMuPDF>=1.24.0 + +# 环境变量 +python-dotenv>=1.0.0 + +# 数据验证 +pydantic>=2.0.0 + +# 文件处理 +werkzeug>=3.0.0 + diff --git a/backend/run.py b/backend/run.py new file mode 100644 index 0000000..e31ab98 --- /dev/null +++ b/backend/run.py @@ -0,0 +1,57 @@ +""" +MiroFish Backend 启动入口 +""" + +import os +import sys + +# 添加项目根目录到路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app import create_app +from app.config import Config + + +def main(): + """主函数""" + # 验证配置 + errors = Config.validate() + if errors: + print("配置错误:") + for err in errors: + print(f" - {err}") + print("\n请检查 .env 文件中的配置") + sys.exit(1) + + # 创建应用 + app = create_app() + + # 获取运行配置 + host = os.environ.get('FLASK_HOST', '0.0.0.0') + port = int(os.environ.get('FLASK_PORT', 5001)) + debug = Config.DEBUG + + print(f""" +╔══════════════════════════════════════════════════╗ +║ MiroFish Backend Server ║ +╠══════════════════════════════════════════════════╣ +║ Running on: http://{host}:{port} +║ Debug mode: {debug} +║ +║ API Endpoints: +║ POST /api/graph/ontology/generate - 生成本体 +║ POST /api/graph/build - 构建图谱 +║ GET /api/graph/task/ - 查询任务 +║ GET /api/graph/tasks - 列出任务 +║ GET /api/graph/data/ - 获取图数据 +║ DELETE /api/graph/delete/- 删除图谱 +╚══════════════════════════════════════════════════╝ + """) + + # 启动服务 + app.run(host=host, port=port, debug=debug, threaded=True) + + +if __name__ == '__main__': + main() +