MiroFish/backend/README.md
666ghj e9b49e38bf Refactor activity logging and update Zep integration for real-time processing
- Changed the activity logging format to remove simulation prefixes, ensuring clearer natural language descriptions.
- Updated the `ZepGraphMemoryUpdater` to send activities individually, enhancing the accuracy of entity and relationship extraction.
- Adjusted the README.md to reflect these changes in activity processing and Zep integration, providing clearer instructions for users.
2025-12-05 18:20:33 +08:00

49 KiB
Raw Blame History

MiroFish Backend - 详细技术文档

目录


项目简介

MiroFish Backend 是一个基于 Flask 的后端服务,用于社交媒体舆论模拟。系统核心功能包括:

  1. 知识图谱构建: 从文档中提取实体和关系,使用 Zep Cloud 构建知识图谱
  2. 本体生成: 使用 LLM 自动分析文档并生成适合舆论模拟的实体类型和关系类型
  3. Agent人设生成: 基于图谱实体,使用 LLM 生成详细的社交媒体用户人设
  4. 模拟配置智能生成: 使用 LLM 根据需求自动生成模拟参数(时间、活跃度、事件等)
  5. 双平台模拟: 支持 Twitter 和 Reddit 双平台并行舆论模拟(基于 OASIS 框架)
  6. 图谱记忆动态更新: 可选功能,将模拟中Agent的活动实时更新到Zep图谱,让图谱"记住"模拟过程

技术架构

┌─────────────────────────────────────────────────────────────┐
│                        MiroFish Backend                       │
├─────────────────────────────────────────────────────────────┤
│  Flask Web Framework + CORS                                  │
│  ┌────────────────┐  ┌──────────────┐  ┌─────────────────┐ │
│  │  API层         │  │  服务层      │  │  模型层         │ │
│  │  - graph.py    │→ │  - 本体生成  │→ │  - Project      │ │
│  │  - simulation  │  │  - 图谱构建  │  │  - Task         │ │
│  └────────────────┘  │  - 实体读取  │  └─────────────────┘ │
│                      │  - 人设生成  │                        │
│                      │  - 配置生成  │                        │
│                      │  - 模拟运行  │                        │
│                      └──────────────┘                        │
├─────────────────────────────────────────────────────────────┤
│  外部服务集成                                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐   │
│  │ Zep Cloud│  │ LLM API  │  │  OASIS   │  │  文件系统│   │
│  │ 知识图谱 │  │ (OpenAI) │  │  社交模拟│  │  存储    │   │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘   │
└─────────────────────────────────────────────────────────────┘

核心流程

  1. 图谱构建流程:

    上传文档 → 提取文本 → LLM生成本体 → 文本分块 → Zep构建图谱
    
  2. 模拟准备流程:

    创建模拟 → 读取图谱实体 → LLM生成人设 → LLM生成配置 → 准备完成
    
  3. 模拟运行流程:

    启动模拟 → 运行OASIS脚本 → 实时监控 → 记录动作 → (可选)更新Zep图谱记忆 → 状态查询
    

技术栈

核心框架

  • Flask 3.0+: Web 框架
  • Flask-CORS: 跨域支持

AI & 知识图谱

  • Zep Cloud SDK 2.0+: 知识图谱构建与管理
  • OpenAI SDK 1.0+: LLM 调用(支持 OpenAI 兼容接口)
  • OASIS-AI: 社交媒体模拟框架
  • CAMEL-AI: Agent 行为模拟

数据处理

  • PyMuPDF (fitz): PDF 文本提取
  • Pydantic 2.0+: 数据验证
  • Python-dotenv: 环境变量管理

文件处理

  • Werkzeug 3.0+: 文件上传处理

项目结构

backend/
├── run.py                      # 启动入口
├── requirements.txt            # Python依赖
├── .env                        # 环境配置(需创建)
├── logs/                       # 日志文件
│   └── YYYY-MM-DD.log
├── uploads/                    # 数据存储
│   ├── projects/               # 项目数据
│   │   └── proj_xxx/
│   │       ├── project.json    # 项目元数据
│   │       ├── files/          # 上传的文件
│   │       └── extracted_text.txt  # 提取的文本
│   └── simulations/            # 模拟数据
│       └── sim_xxx/
│           ├── state.json      # 模拟状态
│           ├── simulation_config.json  # 模拟配置
│           ├── reddit_profiles.json    # Reddit人设
│           ├── twitter_profiles.csv    # Twitter人设
│           ├── run_state.json  # 运行状态
│           ├── simulation.log  # 主日志
│           ├── twitter/        # Twitter数据
│           │   ├── actions.jsonl
│           │   └── twitter_simulation.db
│           └── reddit/         # Reddit数据
│               ├── actions.jsonl
│               └── reddit_simulation.db
├── scripts/                    # 模拟运行脚本
│   ├── run_twitter_simulation.py
│   ├── run_reddit_simulation.py
│   ├── run_parallel_simulation.py
│   └── action_logger.py
└── app/
    ├── __init__.py            # Flask应用工厂
    ├── config.py              # 配置管理
    ├── api/                   # API路由
    │   ├── __init__.py
    │   ├── graph.py           # 图谱相关接口
    │   └── simulation.py      # 模拟相关接口
    ├── models/                # 数据模型
    │   ├── __init__.py
    │   ├── project.py         # 项目模型
    │   └── task.py            # 任务模型
    ├── services/              # 业务服务
    │   ├── __init__.py
    │   ├── ontology_generator.py          # 本体生成
    │   ├── graph_builder.py               # 图谱构建
    │   ├── text_processor.py              # 文本处理
    │   ├── zep_entity_reader.py           # 实体读取
    │   ├── oasis_profile_generator.py     # 人设生成
    │   ├── simulation_config_generator.py # 配置生成
    │   ├── simulation_manager.py          # 模拟管理
    │   ├── simulation_runner.py           # 模拟运行
    │   └── zep_graph_memory_updater.py    # 图谱记忆动态更新
    └── utils/                 # 工具类
        ├── __init__.py
        ├── file_parser.py     # 文件解析
        ├── llm_client.py      # LLM客户端
        ├── logger.py          # 日志配置
        └── retry.py           # 重试机制

核心功能模块

1. 图谱构建模块

功能: 从文档构建知识图谱

流程:

  1. 上传文档(PDF/TXT/MD)
  2. 提取文本内容
  3. LLM分析生成本体(实体类型+关系类型)
  4. 文本分块(chunk_size=500, overlap=50)
  5. 调用 Zep API 构建图谱
  6. 等待 Zep 处理完成
  7. 返回图谱ID和统计信息

核心服务:

  • OntologyGenerator: 本体生成
  • GraphBuilderService: 图谱构建
  • TextProcessor: 文本处理

2. 模拟准备模块

功能: 准备舆论模拟所需的所有数据

流程:

  1. 创建模拟(指定project_id和graph_id)
  2. 从 Zep 图谱读取并过滤实体
  3. 为每个实体生成 OASIS Agent Profile(支持并行)
  4. 使用 LLM 智能生成模拟配置(时间/活跃度/事件)
  5. 保存配置文件和人设文件

核心服务:

  • ZepEntityReader: 实体读取与过滤
  • OasisProfileGenerator: Agent人设生成
  • SimulationConfigGenerator: 模拟配置生成
  • SimulationManager: 模拟管理

3. 模拟运行模块

功能: 运行 Twitter/Reddit 双平台舆论模拟

流程:

  1. 检查模拟准备状态
  2. 启动 OASIS 模拟进程(subprocess)
  3. 监控进程运行状态
  4. 解析动作日志(actions.jsonl)
  5. (可选)将Agent活动实时更新到Zep图谱
  6. 实时更新运行状态
  7. 支持停止/暂停/恢复

核心服务:

  • SimulationRunner: 模拟运行器
  • ZepGraphMemoryUpdater: 图谱记忆动态更新器

API接口文档

图谱管理接口

1. 生成本体

接口: POST /api/graph/ontology/generate

请求类型: multipart/form-data

请求参数:

参数 类型 必填 说明
files File[] 上传的文档(PDF/MD/TXT)
simulation_requirement String 模拟需求描述
project_name String 项目名称
additional_context String 额外说明

返回示例:

{
  "success": true,
  "data": {
    "project_id": "proj_33469c670f56",
    "project_name": "学术不端事件模拟",
    "ontology": {
      "entity_types": [
        {
          "name": "Student",
          "description": "Students involved in the event",
          "attributes": [
            {"name": "full_name", "type": "text", "description": "Student full name"},
            {"name": "major", "type": "text", "description": "Major field"}
          ],
          "examples": ["张三", "李四"]
        },
        {
          "name": "Professor",
          "description": "Faculty members",
          "attributes": [...]
        },
        ...
        {
          "name": "Person",
          "description": "Any individual person not fitting other specific person types",
          "attributes": [...]
        },
        {
          "name": "Organization",
          "description": "Any organization not fitting other specific types",
          "attributes": [...]
        }
      ],
      "edge_types": [
        {
          "name": "STUDIES_AT",
          "description": "Student studies at university",
          "source_targets": [
            {"source": "Student", "target": "University"}
          ],
          "attributes": []
        },
        ...
      ]
    },
    "analysis_summary": "文档涉及学术不端事件...",
    "files": [
      {"filename": "document.pdf", "size": 102400}
    ],
    "total_text_length": 12345
  }
}

说明:

  • 本体设计必须包含10个实体类型,最后2个为兜底类型(PersonOrganization)
  • 实体类型必须是现实中可以发声的主体
  • 属性名不能使用保留字(name, uuid, group_id, created_at, summary)

2. 构建图谱

接口: POST /api/graph/build

请求类型: application/json

请求参数:

{
  "project_id": "proj_33469c670f56",
  "graph_name": "学术不端事件图谱",
  "chunk_size": 500,
  "chunk_overlap": 50,
  "force": false
}
参数 类型 必填 默认值 说明
project_id String - 项目ID(来自接口1)
graph_name String 项目名称 图谱名称
chunk_size Integer 500 文本块大小
chunk_overlap Integer 50 块重叠大小
force Boolean false 强制重新构建

返回示例:

{
  "success": true,
  "data": {
    "project_id": "proj_33469c670f56",
    "task_id": "a1b2c3d4-e5f6-...",
    "message": "图谱构建任务已启动,请通过 /task/{task_id} 查询进度"
  }
}

异步任务: 此接口立即返回task_id,实际构建在后台进行


3. 查询任务状态

接口: GET /api/graph/task/{task_id}

返回示例:

{
  "success": true,
  "data": {
    "task_id": "a1b2c3d4-e5f6-...",
    "task_type": "graph_build",
    "status": "processing",
    "created_at": "2025-12-02T10:00:00",
    "updated_at": "2025-12-02T10:05:00",
    "progress": 45,
    "message": "Zep处理中... 10/30 完成",
    "result": null,
    "error": null,
    "metadata": {
      "project_id": "proj_33469c670f56"
    }
  }
}

状态值:

  • pending: 等待中
  • processing: 处理中
  • completed: 已完成
  • failed: 失败

4. 获取图谱数据

接口: GET /api/graph/data/{graph_id}

返回示例:

{
  "success": true,
  "data": {
    "graph_id": "mirofish_abc123",
    "nodes": [
      {
        "uuid": "node-uuid-1",
        "name": "张三",
        "labels": ["Entity", "Student"],
        "summary": "某大学计算机专业学生",
        "attributes": {
          "full_name": "张三",
          "major": "计算机科学"
        }
      },
      ...
    ],
    "edges": [
      {
        "uuid": "edge-uuid-1",
        "name": "STUDIES_AT",
        "fact": "张三就读于某大学",
        "source_node_uuid": "node-uuid-1",
        "target_node_uuid": "node-uuid-2",
        "attributes": {}
      },
      ...
    ],
    "node_count": 50,
    "edge_count": 120
  }
}

5. 项目管理接口

获取项目: GET /api/graph/project/{project_id}

列出项目: GET /api/graph/project/list?limit=50

删除项目: DELETE /api/graph/project/{project_id}

重置项目: POST /api/graph/project/{project_id}/reset


模拟管理接口

1. 创建模拟

接口: POST /api/simulation/create

请求参数:

{
  "project_id": "proj_33469c670f56",
  "graph_id": "mirofish_abc123",
  "enable_twitter": true,
  "enable_reddit": true
}

返回示例:

{
  "success": true,
  "data": {
    "simulation_id": "sim_10b494550540",
    "project_id": "proj_33469c670f56",
    "graph_id": "mirofish_abc123",
    "status": "created",
    "enable_twitter": true,
    "enable_reddit": true,
    "created_at": "2025-12-02T10:00:00"
  }
}

2. 准备模拟

接口: POST /api/simulation/prepare

请求参数:

{
  "simulation_id": "sim_10b494550540",
  "entity_types": ["Student", "Professor"],
  "use_llm_for_profiles": true,
  "parallel_profile_count": 5,
  "force_regenerate": false
}
参数 类型 必填 默认值 说明
simulation_id String - 模拟ID
entity_types String[] null 指定实体类型(为空则全部)
use_llm_for_profiles Boolean true 是否用LLM生成详细人设
parallel_profile_count Integer 5 并行生成人设数量
force_regenerate Boolean false 强制重新生成

返回示例:

{
  "success": true,
  "data": {
    "simulation_id": "sim_10b494550540",
    "task_id": "task_xyz789",
    "status": "preparing",
    "message": "准备任务已启动",
    "already_prepared": false
  }
}

特性:

  • 自动检测已完成的准备工作,避免重复生成
  • 支持并行生成人设(默认5个并发)
  • 支持强制重新生成

3. 查询准备进度

接口: POST /api/simulation/prepare/status

请求参数:

{
  "task_id": "task_xyz789",
  "simulation_id": "sim_10b494550540"
}

返回示例:

{
  "success": true,
  "data": {
    "task_id": "task_xyz789",
    "status": "processing",
    "progress": 45,
    "message": "[2/4] 生成Agent配置: 5/15 - 已完成 Student: 张三",
    "progress_detail": {
      "current_stage": "generating_profiles",
      "current_stage_name": "生成Agent人设",
      "stage_index": 2,
      "total_stages": 4,
      "stage_progress": 33,
      "current_item": 5,
      "total_items": 15,
      "item_description": "已完成 Student: 张三"
    },
    "already_prepared": false
  }
}

进度阶段:

  1. reading: 读取图谱实体 (0-20%)
  2. generating_profiles: 生成Agent人设 (20-70%)
  3. generating_config: 生成模拟配置 (70-90%)
  4. copying_scripts: 准备模拟脚本 (90-100%)

4. 启动模拟

接口: POST /api/simulation/start

请求参数:

{
  "simulation_id": "sim_10b494550540",
  "platform": "parallel",
  "max_rounds": 100,
  "enable_graph_memory_update": false
}
参数 类型 必填 默认值 说明
simulation_id String - 模拟ID
platform String parallel 运行平台: twitter/reddit/parallel
max_rounds Integer - 最大模拟轮数,用于截断过长的模拟
enable_graph_memory_update Boolean false 是否将Agent活动动态更新到Zep图谱

返回示例:

{
  "success": true,
  "data": {
    "simulation_id": "sim_10b494550540",
    "runner_status": "running",
    "process_pid": 12345,
    "twitter_running": true,
    "reddit_running": true,
    "started_at": "2025-12-02T11:00:00",
    "total_rounds": 100,
    "max_rounds_applied": 100,
    "graph_memory_update_enabled": true,
    "graph_id": "mirofish_abc123"
  }
}

说明:

  • max_rounds_applied 字段仅在指定了 max_rounds 参数时返回
  • graph_memory_update_enabledgraph_id 字段在启用图谱记忆更新时返回

图谱记忆更新功能说明:

启用 enable_graph_memory_update 后:

  • 模拟中所有Agent的活动(发帖、评论、点赞、转发等)会实时更新到Zep图谱
  • 每条活动单独发送,确保Zep能正确解析实体和关系
  • 活动会被转换为自然语言描述,例如:张三: 发布了一条帖子:「...」
  • Zep会自动从文本中提取实体和关系,丰富图谱知识
  • 需要项目已构建有效的图谱(graph_id)

5. 停止模拟

接口: POST /api/simulation/stop

请求参数:

{
  "simulation_id": "sim_10b494550540"
}

返回示例:

{
  "success": true,
  "data": {
    "simulation_id": "sim_10b494550540",
    "runner_status": "stopped",
    "completed_at": "2025-12-02T12:00:00"
  }
}

6. 获取运行状态

接口: GET /api/simulation/{simulation_id}/run-status

返回示例:

{
  "success": true,
  "data": {
    "simulation_id": "sim_10b494550540",
    "runner_status": "running",
    "current_round": 5,
    "total_rounds": 144,
    "progress_percent": 3.5,
    "simulated_hours": 2,
    "total_simulation_hours": 72,
    "twitter_running": true,
    "reddit_running": true,
    "twitter_actions_count": 150,
    "reddit_actions_count": 200,
    "total_actions_count": 350,
    "started_at": "2025-12-02T11:00:00",
    "updated_at": "2025-12-02T11:30:00"
  }
}

7. 获取详细状态(含最近动作)

接口: GET /api/simulation/{simulation_id}/run-status/detail

返回示例:

{
  "success": true,
  "data": {
    ... (基本状态同上) ...,
    "recent_actions": [
      {
        "round_num": 5,
        "timestamp": "2025-12-02T11:30:15",
        "platform": "twitter",
        "agent_id": 3,
        "agent_name": "张三_123",
        "action_type": "CREATE_POST",
        "action_args": {
          "content": "对学术不端事件的看法..."
        },
        "result": "post_id_123",
        "success": true
      },
      ...
    ]
  }
}

8. 其他接口

获取实体列表: GET /api/simulation/entities/{graph_id}

获取模拟配置: GET /api/simulation/{simulation_id}/config

获取Agent人设: GET /api/simulation/{simulation_id}/profiles?platform=reddit

获取动作历史: GET /api/simulation/{simulation_id}/actions?limit=100&platform=twitter

获取时间线: GET /api/simulation/{simulation_id}/timeline?start_round=0&end_round=10

获取Agent统计: GET /api/simulation/{simulation_id}/agent-stats

获取帖子: GET /api/simulation/{simulation_id}/posts?platform=reddit&limit=50

获取评论: GET /api/simulation/{simulation_id}/comments?post_id=123


数据模型

1. Project (项目模型)

文件: app/models/project.py

字段:

project_id: str              # 项目ID (proj_xxx)
name: str                    # 项目名称
status: ProjectStatus        # 状态
created_at: str              # 创建时间
updated_at: str              # 更新时间

# 文件信息
files: List[Dict]            # 上传的文件列表
total_text_length: int       # 文本总长度

# 本体信息
ontology: Dict               # 实体类型和关系类型
analysis_summary: str        # 分析摘要

# 图谱信息
graph_id: str                # Zep图谱ID
graph_build_task_id: str     # 构建任务ID

# 配置
simulation_requirement: str  # 模拟需求
chunk_size: int              # 文本块大小
chunk_overlap: int           # 块重叠大小

# 错误信息
error: str                   # 错误描述

状态枚举:

CREATED = "created"                      # 已创建
ONTOLOGY_GENERATED = "ontology_generated"  # 本体已生成
GRAPH_BUILDING = "graph_building"        # 图谱构建中
GRAPH_COMPLETED = "graph_completed"      # 图谱已完成
FAILED = "failed"                        # 失败

2. Task (任务模型)

文件: app/models/task.py

字段:

task_id: str                 # 任务ID (UUID)
task_type: str               # 任务类型
status: TaskStatus           # 状态
created_at: datetime         # 创建时间
updated_at: datetime         # 更新时间
progress: int                # 进度 (0-100)
message: str                 # 状态消息
result: Dict                 # 任务结果
error: str                   # 错误信息
metadata: Dict               # 元数据
progress_detail: Dict        # 详细进度

状态枚举:

PENDING = "pending"          # 等待中
PROCESSING = "processing"    # 处理中
COMPLETED = "completed"      # 已完成
FAILED = "failed"            # 失败

3. SimulationState (模拟状态)

文件: app/services/simulation_manager.py

字段:

simulation_id: str           # 模拟ID (sim_xxx)
project_id: str              # 项目ID
graph_id: str                # 图谱ID
enable_twitter: bool         # 启用Twitter
enable_reddit: bool          # 启用Reddit
status: SimulationStatus     # 状态
entities_count: int          # 实体数量
profiles_count: int          # 人设数量
entity_types: List[str]      # 实体类型列表
config_generated: bool       # 配置已生成
config_reasoning: str        # 配置推理说明
current_round: int           # 当前轮次
twitter_status: str          # Twitter状态
reddit_status: str           # Reddit状态
created_at: str              # 创建时间
updated_at: str              # 更新时间
error: str                   # 错误信息

4. EntityNode (实体节点)

文件: app/services/zep_entity_reader.py

字段:

uuid: str                    # 实体UUID
name: str                    # 实体名称
labels: List[str]            # 标签列表
summary: str                 # 摘要
attributes: Dict             # 属性字典
related_edges: List[Dict]    # 相关边信息
related_nodes: List[Dict]    # 关联节点信息

5. OasisAgentProfile (Agent人设)

文件: app/services/oasis_profile_generator.py

字段:

user_id: int                 # 用户ID
user_name: str               # 用户名
name: str                    # 真实姓名
bio: str                     # 简介 (200字)
persona: str                 # 详细人设 (2000字)
karma: int                   # Reddit积分
friend_count: int            # Twitter好友数
follower_count: int          # 粉丝数
statuses_count: int          # 发帖数
age: int                     # 年龄
gender: str                  # 性别 (male/female/other)
mbti: str                    # MBTI类型
country: str                 # 国家
profession: str              # 职业
interested_topics: List[str] # 兴趣话题
source_entity_uuid: str      # 来源实体UUID
source_entity_type: str      # 来源实体类型
created_at: str              # 创建时间

6. SimulationParameters (模拟参数)

文件: app/services/simulation_config_generator.py

字段:

simulation_id: str           # 模拟ID
project_id: str              # 项目ID
graph_id: str                # 图谱ID
simulation_requirement: str  # 模拟需求

# 时间配置
time_config: TimeSimulationConfig
  ├── total_simulation_hours: int        # 总时长(小时)
  ├── minutes_per_round: int             # 每轮分钟数
  ├── agents_per_hour_min: int           # 每小时最少激活Agent数
  ├── agents_per_hour_max: int           # 每小时最多激活Agent数
  ├── peak_hours: List[int]              # 高峰时段 [19,20,21,22]
  ├── off_peak_hours: List[int]          # 低谷时段 [0,1,2,3,4,5]
  ├── morning_hours: List[int]           # 早间时段 [6,7,8]
  ├── work_hours: List[int]              # 工作时段 [9-18]
  ├── peak_activity_multiplier: float    # 高峰活跃度系数 1.5
  ├── off_peak_activity_multiplier: float # 低谷活跃度系数 0.05
  ├── morning_activity_multiplier: float # 早间活跃度系数 0.4
  └── work_activity_multiplier: float    # 工作时段活跃度系数 0.7

# Agent配置列表
agent_configs: List[AgentActivityConfig]
  ├── agent_id: int              # Agent ID
  ├── entity_uuid: str           # 实体UUID
  ├── entity_name: str           # 实体名称
  ├── entity_type: str           # 实体类型
  ├── activity_level: float      # 活跃度 (0.0-1.0)
  ├── posts_per_hour: float      # 每小时发帖数
  ├── comments_per_hour: float   # 每小时评论数
  ├── active_hours: List[int]    # 活跃时间段
  ├── response_delay_min: int    # 最小响应延迟(分钟)
  ├── response_delay_max: int    # 最大响应延迟(分钟)
  ├── sentiment_bias: float      # 情感倾向 (-1.0到1.0)
  ├── stance: str                # 立场 (supportive/opposing/neutral/observer)
  └── influence_weight: float    # 影响力权重

# 事件配置
event_config: EventConfig
  ├── initial_posts: List[Dict]  # 初始帖子
  ├── scheduled_events: List[Dict] # 定时事件
  ├── hot_topics: List[str]      # 热点话题
  └── narrative_direction: str   # 舆论方向

# 平台配置
twitter_config: PlatformConfig
reddit_config: PlatformConfig
  ├── platform: str              # 平台名称
  ├── recency_weight: float      # 时间新鲜度权重
  ├── popularity_weight: float   # 热度权重
  ├── relevance_weight: float    # 相关性权重
  ├── viral_threshold: int       # 病毒传播阈值
  └── echo_chamber_strength: float # 回声室效应强度

# LLM配置
llm_model: str               # LLM模型名称
llm_base_url: str            # LLM API地址
generated_at: str            # 生成时间
generation_reasoning: str    # LLM推理说明

服务层详解

1. OntologyGenerator (本体生成器)

文件: app/services/ontology_generator.py

功能: 使用LLM分析文档内容,生成适合舆论模拟的实体类型和关系类型

核心方法:

def generate(
    document_texts: List[str],
    simulation_requirement: str,
    additional_context: Optional[str] = None
) -> Dict[str, Any]:
    """
    生成本体定义
    
    Returns:
        {
            "entity_types": [...],  # 10个实体类型(最后2个为Person和Organization)
            "edge_types": [...],     # 6-10个关系类型
            "analysis_summary": "..." # 分析摘要
        }
    """

设计原则:

  • 必须返回10个实体类型,最后2个为兜底类型
  • 实体必须是现实中可以发声的主体(人/组织)
  • 属性名不能使用Zep保留字
  • 关系类型要反映社交媒体互动

LLM提示词要点:

  • 系统角色: 知识图谱本体设计专家
  • 任务背景: 社交媒体舆论模拟
  • 输出格式: 严格的JSON结构
  • 实体类型层次: 具体类型(8个) + 兜底类型(2个)

2. GraphBuilderService (图谱构建服务)

文件: app/services/graph_builder.py

功能: 调用Zep API构建知识图谱

核心方法:

def create_graph(name: str) -> str:
    """创建Zep图谱"""

def set_ontology(graph_id: str, ontology: Dict):
    """设置图谱本体(动态创建Pydantic类)"""

def add_text_batches(
    graph_id: str, 
    chunks: List[str], 
    batch_size: int = 3,
    progress_callback: Optional[Callable] = None
) -> List[str]:
    """分批添加文本,返回episode UUIDs"""

def _wait_for_episodes(
    episode_uuids: List[str],
    progress_callback: Optional[Callable] = None,
    timeout: int = 600
):
    """等待所有episode处理完成"""

def get_graph_data(graph_id: str) -> Dict:
    """获取完整图谱数据(节点和边)"""

关键技术点:

  1. 动态类创建: 根据本体定义动态创建Pydantic类
  2. 批量上传: 避免一次性提交大量数据
  3. 异步等待: 轮询episode的processed状态
  4. 容错重试: 所有API调用带重试机制

3. ZepEntityReader (实体读取器)

文件: app/services/zep_entity_reader.py

功能: 从Zep图谱读取并过滤实体

核心方法:

def get_all_nodes(graph_id: str) -> List[Dict]:
    """获取所有节点(带重试)"""

def get_all_edges(graph_id: str) -> List[Dict]:
    """获取所有边(带重试)"""

def filter_defined_entities(
    graph_id: str,
    defined_entity_types: Optional[List[str]] = None,
    enrich_with_edges: bool = True
) -> FilteredEntities:
    """
    筛选符合预定义类型的实体
    
    筛选逻辑:
    - 只保留Labels中包含除"Entity"和"Node"外的自定义标签的节点
    - 如果指定了entity_types,只保留匹配的类型
    - 可选:获取每个实体的相关边和关联节点
    """

def get_entity_with_context(
    graph_id: str, 
    entity_uuid: str
) -> Optional[EntityNode]:
    """获取单个实体及其完整上下文"""

容错机制:

  • 所有Zep API调用带3次重试
  • 使用指数退避策略
  • 详细的日志记录

4. OasisProfileGenerator (人设生成器)

文件: app/services/oasis_profile_generator.py

功能: 将图谱实体转换为OASIS Agent Profile

核心方法:

def generate_profile_from_entity(
    entity: EntityNode, 
    user_id: int,
    use_llm: bool = True
) -> OasisAgentProfile:
    """
    从实体生成Agent人设
    
    步骤:
    1. 构建实体上下文(属性+边+关联节点+Zep检索)
    2. 使用LLM生成详细人设(2000字persona)
    3. 返回OasisAgentProfile对象
    """

def generate_profiles_from_entities(
    entities: List[EntityNode],
    use_llm: bool = True,
    progress_callback: Optional[callable] = None,
    graph_id: Optional[str] = None,
    parallel_count: int = 5
) -> List[OasisAgentProfile]:
    """
    批量生成人设(支持并行)
    
    特性:
    - 并行生成(默认5个并发)
    - Zep混合检索增强上下文
    - 区分个人实体和机构实体
    - 容错处理(失败则使用规则生成)
    """

LLM提示词设计:

  • 个人实体: 生成2000字详细人设(基本信息+背景+性格+社交行为+立场观点+个人记忆)
  • 机构实体: 生成官方账号设定(机构信息+账号定位+发言风格+发布内容+立场态度+机构记忆)
  • 输出格式: JSON (bio, persona, age, gender, mbti, country, profession, interested_topics)

容错措施:

  1. LLM调用失败:最多重试3次
  2. JSON解析失败:尝试修复JSON
  3. 完全失败:使用规则生成基础人设

5. SimulationConfigGenerator (配置生成器)

文件: app/services/simulation_config_generator.py

功能: 使用LLM智能生成模拟配置参数

核心方法:

def generate_config(
    simulation_id: str,
    project_id: str,
    graph_id: str,
    simulation_requirement: str,
    document_text: str,
    entities: List[EntityNode],
    enable_twitter: bool = True,
    enable_reddit: bool = True,
    progress_callback: Optional[Callable] = None,
) -> SimulationParameters:
    """
    智能生成完整模拟配置
    
    分步生成策略(避免一次性生成过长):
    1. 生成时间配置(符合中国人作息)
    2. 生成事件配置(热点话题+初始帖子)
    3. 分批生成Agent配置(每批15个)
    4. 生成平台配置
    """

时间配置特点:

  • 高峰时段: 19-22点(活跃度系数1.5)
  • 低谷时段: 0-5点(活跃度系数0.05)
  • 早间时段: 6-8点(活跃度系数0.4)
  • 工作时段: 9-18点(活跃度系数0.7)

Agent配置规则:

  • 官方机构: 活跃度低(0.1-0.3),工作时间活动,响应慢,影响力高(2.5-3.0)
  • 媒体: 活跃度中(0.4-0.6),全天活动,响应快,影响力高(2.0-2.5)
  • 个人/学生: 活跃度高(0.6-0.9),晚间活动,响应快,影响力低(0.8-1.2)
  • 专家/教授: 活跃度中(0.4-0.6),工作+晚间,影响力中高(1.5-2.0)

6. SimulationManager (模拟管理器)

文件: app/services/simulation_manager.py

功能: 管理模拟的完整生命周期

核心方法:

def create_simulation(
    project_id: str,
    graph_id: str,
    enable_twitter: bool = True,
    enable_reddit: bool = True,
) -> SimulationState:
    """创建新模拟"""

def prepare_simulation(
    simulation_id: str,
    simulation_requirement: str,
    document_text: str,
    defined_entity_types: Optional[List[str]] = None,
    use_llm_for_profiles: bool = True,
    progress_callback: Optional[callable] = None,
    parallel_profile_count: int = 3
) -> SimulationState:
    """
    准备模拟环境(全程自动化)
    
    步骤:
    1. 读取并过滤图谱实体
    2. 并行生成Agent人设(带Zep检索增强)
    3. LLM智能生成模拟配置
    4. 保存配置和人设文件
    """

def get_simulation(simulation_id: str) -> Optional[SimulationState]:
    """获取模拟状态"""

def list_simulations(project_id: Optional[str] = None) -> List[SimulationState]:
    """列出所有模拟"""

数据存储:

uploads/simulations/sim_xxx/
├── state.json                  # 模拟状态
├── simulation_config.json      # 模拟配置(LLM生成)
├── reddit_profiles.json        # Reddit人设(JSON格式)
├── twitter_profiles.csv        # Twitter人设(CSV格式)
├── run_state.json              # 运行状态
├── simulation.log              # 主日志
├── twitter/
│   ├── actions.jsonl           # Twitter动作日志
│   └── twitter_simulation.db   # Twitter数据库
└── reddit/
    ├── actions.jsonl           # Reddit动作日志
    └── reddit_simulation.db    # Reddit数据库

7. SimulationRunner (模拟运行器)

文件: app/services/simulation_runner.py

功能: 在后台运行OASIS模拟并实时监控

核心方法:

@classmethod
def start_simulation(
    cls,
    simulation_id: str,
    platform: str = "parallel"
) -> SimulationRunState:
    """
    启动模拟
    
    步骤:
    1. 启动模拟进程(subprocess)
    2. 创建监控线程
    3. 解析动作日志
    4. 实时更新状态
    """

@classmethod
def stop_simulation(cls, simulation_id: str) -> SimulationRunState:
    """
    停止模拟
    
    使用进程组终止(确保子进程也被终止)
    """

@classmethod
def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]:
    """获取运行状态"""

@classmethod
def get_actions(
    cls,
    simulation_id: str,
    limit: int = 100,
    offset: int = 0,
    platform: Optional[str] = None,
    agent_id: Optional[int] = None,
    round_num: Optional[int] = None
) -> List[AgentAction]:
    """获取动作历史(支持过滤)"""

@classmethod
def cleanup_all_simulations(cls):
    """清理所有运行中的模拟进程(服务器关闭时调用)"""

进程管理:

  • 使用subprocess.Popen启动模拟脚本
  • 使用start_new_session=True创建新进程组
  • 使用os.killpg终止整个进程组
  • 支持优雅关闭(SIGTERM)和强制终止(SIGKILL)

日志解析:

  • 实时读取twitter/actions.jsonlreddit/actions.jsonl
  • 解析每个Agent的动作记录
  • 更新运行状态和进度
  • 保存最近50个动作用于前端展示

8. ZepGraphMemoryUpdater (图谱记忆更新器)

文件: app/services/zep_graph_memory_updater.py

功能: 将模拟中的Agent活动动态更新到Zep图谱

核心类:

class AgentActivity:
    """Agent活动记录"""
    platform: str           # twitter / reddit
    agent_id: int
    agent_name: str
    action_type: str        # CREATE_POST, LIKE_POST, etc.
    action_args: Dict
    round_num: int
    timestamp: str
    
    def to_episode_text(self) -> str:
        """
        将活动转换为自然语言描述(不添加模拟前缀)
        
        示例输出:
        - "张三: 发布了一条帖子:「官方声明:...」"
        - "李四: 在帖子#5下评论道「我认为...」"
        - "王五: 引用帖子#3并评论「同意」"
        """
class ZepGraphMemoryUpdater:
    """
    图谱记忆更新器
    
    特性:
    - 逐条发送活动到Zep,确保图谱正确解析
    - 后台线程异步处理,不阻塞主模拟流程
    - 带重试的API调用(MAX_RETRIES=3)
    - 自动跳过DO_NOTHING类型的活动
    - 发送间隔控制(SEND_INTERVAL=0.5秒)
    """
    
    def start(self):
        """启动后台工作线程"""
    
    def stop(self):
        """停止并发送剩余活动"""
    
    def add_activity(self, activity: AgentActivity):
        """添加活动到队列"""
    
    def add_activity_from_dict(self, data: Dict, platform: str):
        """从动作日志字典添加活动"""
    
    def get_stats(self) -> Dict:
        """获取统计信息(total_activities, total_sent, failed_count等)"""
class ZepGraphMemoryManager:
    """
    管理多个模拟的更新器实例
    """
    
    @classmethod
    def create_updater(cls, simulation_id: str, graph_id: str) -> ZepGraphMemoryUpdater:
        """为模拟创建并启动更新器"""
    
    @classmethod
    def get_updater(cls, simulation_id: str) -> Optional[ZepGraphMemoryUpdater]:
        """获取模拟的更新器"""
    
    @classmethod
    def stop_updater(cls, simulation_id: str):
        """停止并移除模拟的更新器"""
    
    @classmethod
    def stop_all(cls):
        """停止所有更新器(服务器关闭时调用)"""

活动类型转换:

action_type 转换后的描述
CREATE_POST 发布了一条帖子:「{content}」
LIKE_POST 点赞了帖子#{post_id}
DISLIKE_POST 踩了帖子#{post_id}
REPOST 转发了帖子#{post_id}
QUOTE_POST 引用帖子#{quoted_id}并评论:「{content}」
FOLLOW 关注了用户#{user_id}
CREATE_COMMENT 在帖子#{post_id}下评论道:「{content}」
LIKE_COMMENT 点赞了评论#{comment_id}
SEARCH_POSTS 搜索了「{query}」
MUTE 屏蔽了用户#{user_id}

使用示例:

# 在启动模拟时启用图谱记忆更新
POST /api/simulation/start
{
    "simulation_id": "sim_xxx",
    "enable_graph_memory_update": true
}

启用后,模拟中的活动会被逐条转换为自然语言描述并发送到Zep:

上级: 发布了一条帖子:「官方声明:经复核并结合司法判决,校方决定撤销对肖某某的处分。学校向当事人致以正式歉意...」
全国顶尖新闻传播学院的大学: 发布了一条帖子:「武汉大学官方发布:学校已决定撤销此前对当事人的处分...」
全国考生: 引用帖子#5并评论
教师代表: 在帖子#2下评论道「此事暴露出高校在程序正义上的问题...」

每条活动单独发送,确保Zep能正确从文本中提取实体(如人名、机构名)和关系,丰富图谱知识。


工具类

1. FileParser (文件解析器)

文件: app/utils/file_parser.py

功能: 从PDF/MD/TXT文件提取文本

支持格式:

  • PDF: 使用PyMuPDF
  • Markdown: 直接读取
  • TXT: 直接读取

核心方法:

@classmethod
def extract_text(cls, file_path: str) -> str:
    """从文件提取文本"""

@classmethod
def extract_from_multiple(cls, file_paths: List[str]) -> str:
    """从多个文件提取并合并文本"""

def split_text_into_chunks(
    text: str, 
    chunk_size: int = 500, 
    overlap: int = 50
) -> List[str]:
    """
    文本分块
    
    特点:
    - 尝试在句子边界分割
    - 支持中英文句子结束符
    - 块之间有重叠(overlap)
    """

2. LLMClient (LLM客户端)

文件: app/utils/llm_client.py

功能: 统一的LLM调用封装(OpenAI格式)

核心方法:

def chat(
    self,
    messages: List[Dict[str, str]],
    temperature: float = 0.7,
    max_tokens: int = 4096,
    response_format: Optional[Dict] = None
) -> str:
    """发送聊天请求"""

def chat_json(
    self,
    messages: List[Dict[str, str]],
    temperature: float = 0.3,
    max_tokens: int = 4096
) -> Dict[str, Any]:
    """发送聊天请求并返回JSON"""

配置:

  • Config.LLM_API_KEY读取API密钥
  • Config.LLM_BASE_URL读取API地址
  • Config.LLM_MODEL_NAME读取模型名称

3. Logger (日志管理)

文件: app/utils/logger.py

功能: 统一的日志配置

特点:

  • 双输出:控制台(INFO+) + 文件(DEBUG+)
  • 按日期命名日志文件
  • 日志轮转(10MB,保留5个备份)
  • 详细格式(文件) + 简洁格式(控制台)

使用方法:

from app.utils.logger import get_logger

logger = get_logger('mirofish.mymodule')
logger.debug("调试信息")
logger.info("普通信息")
logger.warning("警告")
logger.error("错误")

4. Retry (重试机制)

文件: app/utils/retry.py

功能: API调用重试装饰器

核心方法:

@retry_with_backoff(
    max_retries=3,
    initial_delay=1.0,
    backoff_factor=2.0,
    exceptions=(ConnectionError, TimeoutError)
)
def call_api():
    ...

特点:

  • 指数退避
  • 随机抖动(避免雷击)
  • 自定义异常类型
  • 重试回调

配置说明

环境变量配置

在项目根目录创建.env文件:

# Flask配置
FLASK_DEBUG=True
FLASK_HOST=0.0.0.0
FLASK_PORT=5001
SECRET_KEY=your-secret-key

# LLM配置(OpenAI兼容接口)
LLM_API_KEY=sk-xxx
LLM_BASE_URL=https://api.openai.com/v1
LLM_MODEL_NAME=gpt-4o-mini

# Zep配置
ZEP_API_KEY=z_xxx

# OASIS模拟配置
OASIS_DEFAULT_MAX_ROUNDS=10

配置项说明

配置项 类型 默认值 说明
FLASK_DEBUG Boolean True 调试模式
FLASK_HOST String 0.0.0.0 监听地址
FLASK_PORT Integer 5001 监听端口
SECRET_KEY String - Flask密钥
LLM_API_KEY String - LLM API密钥(必填)
LLM_BASE_URL String https://api.openai.com/v1 LLM API地址
LLM_MODEL_NAME String gpt-4o-mini LLM模型名称
ZEP_API_KEY String - Zep API密钥(必填)
OASIS_DEFAULT_MAX_ROUNDS Integer 10 默认模拟轮数

运行指南

1. 环境准备

# 1. 激活conda环境
conda activate MiroFish

# 2. 安装依赖
cd backend
pip install -r requirements.txt

# 3. 配置环境变量
cp .env.example .env
# 编辑.env文件,填入API密钥

2. 启动服务

# 启动Flask服务
python run.py

服务启动后访问:

3. 使用流程

完整流程示例:

# Step 1: 上传文档并生成本体
curl -X POST http://localhost:5001/api/graph/ontology/generate \
  -F "files=@document.pdf" \
  -F "simulation_requirement=模拟学术不端事件的舆论发展" \
  -F "project_name=学术不端事件"

# 返回: project_id, ontology

# Step 2: 构建图谱
curl -X POST http://localhost:5001/api/graph/build \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": "proj_xxx",
    "graph_name": "学术不端事件图谱"
  }'

# 返回: task_id

# Step 3: 查询构建进度
curl http://localhost:5001/api/graph/task/{task_id}

# 等待status=completed, 获取graph_id

# Step 4: 创建模拟
curl -X POST http://localhost:5001/api/simulation/create \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": "proj_xxx",
    "graph_id": "mirofish_xxx"
  }'

# 返回: simulation_id

# Step 5: 准备模拟
curl -X POST http://localhost:5001/api/simulation/prepare \
  -H "Content-Type: application/json" \
  -d '{
    "simulation_id": "sim_xxx",
    "use_llm_for_profiles": true,
    "parallel_profile_count": 5
  }'

# 返回: task_id

# Step 6: 查询准备进度
curl -X POST http://localhost:5001/api/simulation/prepare/status \
  -H "Content-Type: application/json" \
  -d '{
    "task_id": "task_xxx",
    "simulation_id": "sim_xxx"
  }'

# 等待status=completed

# Step 7: 启动模拟可选参数max_rounds限制轮数enable_graph_memory_update启用图谱记忆更新
curl -X POST http://localhost:5001/api/simulation/start \
  -H "Content-Type: application/json" \
  -d '{
    "simulation_id": "sim_xxx",
    "platform": "parallel",
    "max_rounds": 50,
    "enable_graph_memory_update": true
  }'

# Step 8: 实时查询运行状态
curl http://localhost:5001/api/simulation/{sim_xxx}/run-status

# Step 9: 停止模拟
curl -X POST http://localhost:5001/api/simulation/stop \
  -H "Content-Type: application/json" \
  -d '{
    "simulation_id": "sim_xxx"
  }'

开发指南

添加新的实体类型

  1. 修改本体生成提示词(app/services/ontology_generator.py)
  2. 更新实体类型参考列表
  3. 测试本体生成

添加新的平台支持

  1. app/services/oasis_profile_generator.py添加平台格式转换方法
  2. app/services/simulation_manager.py更新文件保存逻辑
  3. scripts/目录添加平台模拟脚本
  4. 更新SimulationRunner的平台检测逻辑

自定义LLM提示词

主要提示词文件:

  • 本体生成: app/services/ontology_generator.pyONTOLOGY_SYSTEM_PROMPT
  • 人设生成: app/services/oasis_profile_generator.py_build_individual_persona_prompt
  • 配置生成: app/services/simulation_config_generator.py_generate_time_config

调试技巧

  1. 查看日志:

    tail -f logs/$(date +%Y-%m-%d).log
    
  2. 测试API:

    # 使用httpie
    http POST localhost:5001/api/graph/ontology/generate \
      files@document.pdf \
      simulation_requirement="测试需求"
    
  3. 调试模式:

    # 在代码中添加断点
    import pdb; pdb.set_trace()
    

常见问题

Q1: Zep API调用失败

原因: API密钥错误或网络问题

解决:

  1. 检查.env中的ZEP_API_KEY
  2. 测试Zep连接:
    from zep_cloud.client import Zep
    client = Zep(api_key="your-key")
    client.graph.list()
    
  3. 查看日志中的详细错误信息

Q2: LLM生成的JSON解析失败

原因: LLM输出被截断或格式不正确

解决:

  • 系统已实现JSON修复逻辑
  • 如仍失败,会自动回退到规则生成
  • 可调整temperature参数降低随机性

Q3: 模拟进程启动失败

原因: conda环境未激活或依赖缺失

解决:

# 确保在MiroFish环境中
conda activate MiroFish

# 检查OASIS依赖
pip install oasis-ai camel-ai

Q4: 内存不足

原因: 大型文档或大量实体

解决:

  1. 减小chunk_size
  2. 限制entity_types数量
  3. 使用更小的LLM模型
  4. 增加系统内存

Q5: 文件上传失败

原因: 文件大小超过限制或格式不支持

解决:

  • 检查Config.MAX_CONTENT_LENGTH(默认50MB)
  • 支持格式:PDF/MD/TXT
  • 确保文件编码为UTF-8

性能优化建议

  1. 并行处理:

    • 人设生成并行数:parallel_profile_count=5
    • Zep批量上传:batch_size=3
  2. 缓存策略:

    • 项目状态已持久化到文件
    • 任务状态使用内存缓存
  3. 容错重试:

    • Zep API调用:3次重试
    • LLM API调用:3次重试
  4. 日志管理:

    • 日志文件自动轮转
    • 控制台只显示INFO+

贡献指南

代码规范

  1. 遵循PEP 8
  2. 使用类型注解
  3. 添加docstring
  4. 编写单元测试

提交规范

feat: 添加新功能
fix: 修复bug
docs: 更新文档
refactor: 重构代码
test: 添加测试

许可证

MIT License


联系方式

  • 项目地址: [GitHub链接]
  • 问题反馈: [Issues链接]
  • 技术文档: 见本README

最后更新: 2025-12-05 版本: v1.1.0