refactor(agent): 移除旧版AI路由和功能处理器,迁移至agent架构

- 删除旧版AI路由模块(ai_router.py)及相关功能处理器(ai_functions.py, handlers.py)
- 移除废弃的reminder_router二级路由
- 清理__init__.py中的废弃模块说明
- 更新robot.py中消息处理逻辑,使用新版_handle_chitchat_async
- 删除已完成重构计划的REFACTOR_PLAN.md
This commit is contained in:
zihanjian
2026-02-25 12:26:26 +08:00
parent 0ac82471d4
commit 307499b18b
7 changed files with 57 additions and 1188 deletions

View File

@@ -1,103 +0,0 @@
# Bubbles 改造计划
## 核心问题
Bubbles 是个单次意图分类器,不是 Agent。AI Router 做一次 `chat/function` 分类就结束了,无法多步推理、自主决策。这是"死板"的根源。
---
## 一、Agent 循环(最高优先级)
`processMsg` 的单次分类改成工具调用循环。AI 自己决定调什么工具、调几次、什么时候停。
**改动范围:** `robot.py` 的消息处理主流程,`commands/ai_router.py` 的路由逻辑
**目标状态:**
```
消息进入 → Agent 循环开始
→ LLM 返回工具调用 → 执行工具 → 结果喂回 LLM → 继续推理
→ LLM 返回纯文本 → 循环结束,发送回复
→ 达到最大步数 → 强制结束
```
**必须包含的防护:**
- 最大步数限制(防无限循环)
- 死循环检测:同一工具连续 3 次相同输入时中断
- 单步超时
---
## 二、工具标准化
定义统一的 Tool 接口,把现有功能改写成标准工具,让 Agent 循环能调用。
**改动范围:** 新建 `tools/` 目录,重构 `commands/ai_functions.py``commands/reminder_router.py`
**Tool 接口定义:**
```python
class Tool:
name: str # 工具唯一标识
description: str # 给 LLM 看的功能描述
parameters: dict # JSON Schema 参数定义
async def execute(self, params: dict, ctx: MessageContext) -> str:
"""执行工具,返回文本结果"""
```
**需要改写的现有功能:**
- `reminder_hub``reminder_create` / `reminder_list` / `reminder_delete`(拆开,消灭二级路由)
- `perplexity_search``web_search`
- `handle_chitchat` 不再是工具,而是 Agent 循环的默认文本输出路径
**工具描述走 LLM 原生的 function calling / tool_use 协议**,不再拼进提示词字符串。
---
## 三、模型 Fallback
当前模型挂了就挂了。必须加 fallback 链。
**改动范围:** `robot.py` 的模型调用层,各 `ai_providers/` 适配器
**目标状态:**
```yaml
# config.yaml
models:
default:
primary: deepseek
fallbacks: [chatgpt, kimi]
```
**必须实现:**
- 区分可重试错误429 限流、超时、服务端 500和不可重试错误401 密钥无效)
- 可重试错误:指数退避重试(初始 2s最大 30s
- 不可重试或重试耗尽:切下一个 fallback 模型
- 记录失败模型的冷却时间,短期内不再尝试
---
## 四、上下文压缩
当前 `max_history` 按条数硬截断,丢失早期重要信息。
**改动范围:** `robot.py` 的历史消息获取逻辑,`commands/handlers.py` 的对话构建
**目标状态:**
- 监控当前对话的 token 总量
- 接近模型上下文窗口上限时,对早期消息做摘要压缩
- 保留最近 N 轮完整对话 + 早期对话的 LLM 生成摘要
- 替代现在的简单条数截断
---
## 执行状态
```
一、工具标准化 ✅ 已完成 — tools/__init__.py, tools/reminder.py, tools/web_search.py, tools/history.py
二、Agent 循环 ✅ 已完成 — 移除 AI RouterLLM 直接通过 _execute_with_tools 自主调用工具
三、模型 Fallback ✅ 已完成 — _handle_chitchat 级联候选模型ai_providers/fallback.py 重试/冷却
四、上下文压缩 ✅ 已完成 — func_summary.get_compressed_context(),字符预算代替固定条数截断
```

View File

@@ -4,14 +4,11 @@
模块说明:
- context: 消息上下文类 (MessageContext)
- handlers: 功能处理函数 (保留旧版兼容)
- keyword_triggers: 关键词触发器
- message_forwarder: 消息转发器
新架构 (agent/):
新架构已迁移至 agent/ 目录:
- agent.loop: Agent Loop 核心
- agent.context: AgentContext
- agent.tools: 工具定义和注册
已废弃 (保留兼容):
- ai_router: AI 智能路由核心 -> 被 agent.loop 取代
- ai_functions: 面向 AI 路由的功能注册 -> 被 agent.tools 取代
"""
"""

View File

@@ -1,166 +0,0 @@
"""
AI路由功能注册
将需要通过AI路由的功能在这里注册
"""
from .ai_router import ai_router
from .context import MessageContext
# ======== 提醒功能(一级交给二级路由) ========
@ai_router.register(
name="reminder_hub",
description="处理提醒相关需求,会进一步判断是创建、查看还是删除提醒,并自动执行。",
examples=[
"提醒我明天上午十点开会",
"看看今天有哪些提醒",
"删除下午三点的提醒"
],
params_description="原始提醒类请求内容"
)
def ai_handle_reminder_hub(ctx: MessageContext, params: str) -> bool:
from .reminder_router import reminder_router
from .handlers import handle_reminder, handle_list_reminders, handle_delete_reminder
original_text = params.strip() if isinstance(params, str) and params.strip() else ctx.text or ""
decision = reminder_router.route(ctx, original_text)
if not decision:
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("抱歉,暂时无法理解提醒请求,可以换一种说法吗?", at_list)
return True
action = decision.action
payload = decision.params or original_text
if action == "list":
return handle_list_reminders(ctx, None)
if action == "delete":
original_content = ctx.msg.content
ctx.msg.content = f"删除提醒 {payload}".strip()
try:
return handle_delete_reminder(ctx, None)
finally:
ctx.msg.content = original_content
# 默认视为创建提醒
original_content = ctx.msg.content
ctx.msg.content = payload if payload.startswith("提醒我") else f"提醒我{payload}"
try:
return handle_reminder(ctx, None)
finally:
ctx.msg.content = original_content
# ======== Perplexity搜索功能 ========
@ai_router.register(
name="perplexity_search",
description="在网络上搜索任何问题",
examples=[
"搜索Python最新特性",
'深圳天气咋样'
'{"query":"量子计算发展历史的详细研究报告", "deep_research": true}'
],
params_description="可直接填写搜索内容;只有当问题确实十分复杂、需要长时间联网深度研究时,才在 params 中使用 JSON 字段,如 {\"query\":\"主题\", \"deep_research\": true},否则保持默认以节省时间和费用。"
)
def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool:
"""AI路由的Perplexity搜索处理"""
import json
original_params = params
deep_research = False
query = ""
if isinstance(params, dict):
query = params.get("query") or params.get("q") or ""
mode = params.get("mode") or params.get("research_mode")
deep_research = bool(
params.get("deep_research")
or params.get("full_research")
or (isinstance(mode, str) and mode.lower() in {"deep", "full", "research"})
)
else:
params = str(params or "").strip()
if not params:
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("请告诉我你想搜索什么内容", at_list)
return True
if params.startswith("{"):
try:
parsed = json.loads(params)
if isinstance(parsed, dict):
query = parsed.get("query") or parsed.get("q") or ""
mode = parsed.get("mode") or parsed.get("research_mode")
deep_research = bool(
parsed.get("deep_research")
or parsed.get("full_research")
or (isinstance(mode, str) and mode.lower() in {"deep", "full", "research"})
)
except json.JSONDecodeError:
query = params
if not query:
query = params
if not isinstance(query, str):
query = str(query or "")
query = query.strip()
if not query:
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("请告诉我你想搜索什么内容", at_list)
return True
# 获取Perplexity实例
perplexity_instance = getattr(ctx.robot, 'perplexity', None)
if not perplexity_instance:
ctx.send_text("❌ Perplexity搜索功能当前不可用")
return True
# 调用Perplexity处理
content_for_perplexity = query
chat_id = ctx.get_receiver()
sender_wxid = ctx.msg.sender
room_id = ctx.msg.roomid if ctx.is_group else None
is_group = ctx.is_group
was_handled, fallback_prompt = perplexity_instance.process_message(
content=content_for_perplexity,
chat_id=chat_id,
sender=sender_wxid,
roomid=room_id,
from_group=is_group,
send_text_func=ctx.send_text,
enable_full_research=deep_research
)
# 如果Perplexity无法处理使用默认AI
if not was_handled and fallback_prompt:
chat_model = getattr(ctx, 'chat', None) or (getattr(ctx.robot, 'chat', None) if ctx.robot else None)
if chat_model:
try:
import time
current_time = time.strftime("%H:%M", time.localtime())
if isinstance(original_params, str):
formatted_request = original_params
else:
try:
formatted_request = json.dumps(original_params, ensure_ascii=False)
except Exception:
formatted_request = str(original_params)
q_with_info = f"[{current_time}] {ctx.sender_name}: {formatted_request}"
rsp = chat_model.get_answer(
question=q_with_info,
wxid=ctx.get_receiver(),
system_prompt_override=fallback_prompt
)
if rsp:
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text(rsp, at_list)
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"默认AI处理失败: {e}")
return was_handled

View File

@@ -1,271 +0,0 @@
import re
import json
import logging
from typing import Dict, Callable, Optional, Any, Tuple
from dataclasses import dataclass, field
from .context import MessageContext
logger = logging.getLogger(__name__)
ROUTING_HISTORY_LIMIT = 30
CHAT_HISTORY_MIN = 10
CHAT_HISTORY_MAX = 300
@dataclass
class AIFunction:
"""AI可调用的功能定义"""
name: str # 功能唯一标识名
handler: Callable # 处理函数
description: str # 功能描述给AI看的
examples: list[str] = field(default_factory=list) # 示例用法
params_description: str = "" # 参数说明
class AIRouter:
"""AI智能路由器"""
def __init__(self):
self.functions: Dict[str, AIFunction] = {}
self.logger = logger
def register(self, name: str, description: str, examples: list[str] = None, params_description: str = ""):
"""
装饰器注册一个功能到AI路由器
@ai_router.register(
name="reminder_set",
description="设置提醒",
examples=["提醒我下午3点开会", "每天早上8点提醒我吃早饭"],
params_description="提醒时间和内容"
)
def handle_reminder(ctx: MessageContext, params: str) -> bool:
# 实现提醒设置逻辑
pass
"""
def decorator(func: Callable) -> Callable:
ai_func = AIFunction(
name=name,
handler=func,
description=description,
examples=examples or [],
params_description=params_description
)
self.functions[name] = ai_func
self.logger.info(f"AI路由器注册功能: {name} - {description}")
return func
return decorator
def _build_ai_prompt(self) -> str:
"""构建给AI的系统提示词包含所有可用功能的信息"""
prompt = """你是一个智能路由助手。根据用户的输入判断用户的意图并返回JSON格式的响应。
### 注意:
1. 你需要优先判断自己是否可以直接回答用户的问题,如果你可以直接回答,则返回 "chat",无需返回 "function"
2. 如果用户输入中包含多个功能,请优先匹配最符合用户意图的功能。如果无法判断,则返回 "chat"
3. 优先考虑使用 chat 处理,需要外部资料或其他功能逻辑时,再返回 "function"
### 可用的功能列表:
"""
for name, func in self.functions.items():
prompt += f"\n- {name}: {func.description}"
if func.params_description:
prompt += f"\n 参数: {func.params_description}"
if func.examples:
prompt += f"\n 示例: {', '.join(func.examples[:3])}"
prompt += "\n"
prompt += """
请你分析用户输入严格按照以下格式返回JSON
### 返回格式:
1. 如果用户只是聊天或者不匹配任何功能,返回:
{
"action_type": "chat"
}
3 另外,请判断该问题需不需要被认真对待,如果是比较严肃的问题,需要被认真对待,那么请通过参数配置开启深度思考,需要额外提供:
{
"action_type": "chat",
"enable_reasoning": true
}
2.如果用户需要使用上述功能之一,返回:
{
"action_type": "function",
"function_name": "上述功能列表中的功能名",
"params": "从用户输入中提取的参数"
}
#### 示例:
- 用户输入"提醒我下午3点开会" -> {"action_type": "function", "function_name": "reminder_hub", "params": "提醒我下午3点开会"}
- 用户输入"查看我的提醒" -> {"action_type": "function", "function_name": "reminder_hub", "params": "查看我的提醒"}
- 用户输入"你好" -> {"action_type": "chat"}
- 用户输入"帮我认真想想这道题" -> {"action_type": "chat", "enable_reasoning": true}
- 用户输入"查一下Python教程" -> {"action_type": "function", "function_name": "perplexity_search", "params": "Python教程"}
#### 格式注意事项:
1. action_type 只能是 "function""chat"
2. 只返回JSON无需其他解释
3. function_name 必须完全匹配上述功能列表中的名称
"""
return prompt
def route(self, ctx: MessageContext) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
AI路由决策
返回: (是否处理成功, AI决策结果)
"""
self.logger.debug(f"[AI路由器] route方法被调用")
if not ctx.text:
self.logger.debug("[AI路由器] ctx.text为空返回False")
return False, None
# 获取AI模型
chat_model = getattr(ctx, 'chat', None)
if not chat_model:
chat_model = getattr(ctx.robot, 'chat', None) if ctx.robot else None
if not chat_model:
self.logger.error("[AI路由器] 无可用的AI模型")
return False, None
self.logger.debug(f"[AI路由器] 找到AI模型: {type(chat_model)}")
try:
# 构建系统提示词
system_prompt = self._build_ai_prompt()
self.logger.debug(f"[AI路由器] 已构建系统提示词,长度: {len(system_prompt)}")
# 让AI分析用户意图
user_input = f"用户输入:{ctx.text}"
self.logger.debug(f"[AI路由器] 准备调用AI分析意图: {user_input}")
ai_response = chat_model.get_answer(
user_input,
wxid=ctx.get_receiver(),
system_prompt_override=system_prompt,
specific_max_history=ROUTING_HISTORY_LIMIT
)
self.logger.debug(f"[AI路由器] AI响应: {ai_response}")
# 解析AI返回的JSON
json_match = re.search(r'\{.*\}', ai_response, re.DOTALL)
if not json_match:
self.logger.warning(f"AI路由器无法从AI响应中提取JSON - {ai_response}")
return False, None
decision = json.loads(json_match.group(0))
# 验证决策格式
action_type = decision.get("action_type")
if action_type not in ["chat", "function"]:
self.logger.warning(f"AI路由器未知的action_type - {action_type}")
return False, None
# 如果是功能调用,验证功能名
if action_type == "function":
function_name = decision.get("function_name")
if function_name not in self.functions:
self.logger.warning(f"AI路由器未知的功能名 - {function_name}")
return False, None
else:
# 聊天模式下检查是否请求推理
if "enable_reasoning" in decision:
raw_value = decision.get("enable_reasoning")
if isinstance(raw_value, str):
decision["enable_reasoning"] = raw_value.strip().lower() in ("true", "1", "yes", "y")
else:
decision["enable_reasoning"] = bool(raw_value)
self.logger.info(f"AI路由决策: {decision}")
return True, decision
except json.JSONDecodeError as e:
self.logger.error(f"AI路由器解析JSON失败 - {e}")
return False, None
except Exception as e:
self.logger.error(f"AI路由器处理异常 - {e}")
return False, None
def _check_permission(self, ctx: MessageContext) -> bool:
"""
检查是否有权限使用AI路由功能
:param ctx: 消息上下文
:return: 是否有权限
"""
# 检查是否启用AI路由
ai_router_config = getattr(ctx.config, 'AI_ROUTER', {})
if not ai_router_config.get('enable', True):
self.logger.info("AI路由功能已禁用")
return False
# 私聊始终允许
if not ctx.is_group:
return True
# 群聊需要检查白名单
allowed_groups = ai_router_config.get('allowed_groups', [])
current_group = ctx.get_receiver()
if current_group in allowed_groups:
self.logger.info(f"群聊 {current_group} 在AI路由白名单中允许使用")
return True
else:
self.logger.info(f"群聊 {current_group} 不在AI路由白名单中禁止使用")
return False
def dispatch(self, ctx: MessageContext) -> bool:
"""
执行AI路由分发
返回: 是否成功处理
"""
self.logger.debug(f"[AI路由器] dispatch被调用消息内容: {ctx.text}")
# 检查权限
if not self._check_permission(ctx):
self.logger.info("[AI路由器] 权限检查失败返回False")
return False
# 获取AI路由决策
success, decision = self.route(ctx)
ctx.router_decision = decision if success else None
self.logger.debug(f"[AI路由器] route返回 - success: {success}, decision: {decision}")
if not success or not decision:
self.logger.info("[AI路由器] route失败或无决策返回False")
return False
action_type = decision.get("action_type")
# 如果是聊天返回False让后续处理器处理
if action_type == "chat":
self.logger.info("AI路由器识别为聊天意图交给聊天处理器处理。")
return False
# 如果是功能调用
if action_type == "function":
function_name = decision.get("function_name")
params = decision.get("params", "")
func = self.functions.get(function_name)
if not func:
self.logger.error(f"AI路由器功能 {function_name} 未找到")
return False
try:
self.logger.info(f"AI路由器调用功能 {function_name},参数: {params}")
result = func.handler(ctx, params)
return result
except Exception as e:
self.logger.error(f"AI路由器执行功能 {function_name} 出错 - {e}")
return False
return False
# 创建全局AI路由器实例
ai_router = AIRouter()

View File

@@ -1,508 +0,0 @@
import json
import logging
import os
import re
import time as time_mod
from datetime import datetime
from typing import Optional, Match, TYPE_CHECKING
from function.func_persona import build_persona_system_prompt
if TYPE_CHECKING:
from .context import MessageContext
logger = logging.getLogger(__name__)
DEFAULT_CHAT_HISTORY = 30
DEFAULT_VISIBLE_LIMIT = 30
# ══════════════════════════════════════════════════════════
# 工具 handler 函数
# ══════════════════════════════════════════════════════════
def _web_search(ctx, query: str = "", deep_research: bool = False, **_) -> str:
perplexity_instance = getattr(ctx.robot, "perplexity", None)
if not perplexity_instance:
return json.dumps({"error": "Perplexity 搜索功能不可用,未配置或未初始化"}, ensure_ascii=False)
if not query:
return json.dumps({"error": "请提供搜索关键词"}, ensure_ascii=False)
try:
response = perplexity_instance.get_answer(query, ctx.get_receiver(), deep_research=deep_research)
if not response:
return json.dumps({"error": "搜索无结果"}, ensure_ascii=False)
cleaned = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL).strip()
return json.dumps({"result": cleaned or response}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": f"搜索失败: {e}"}, ensure_ascii=False)
def _reminder_create(ctx, type: str = "once", time: str = "",
content: str = "", weekday: int = None, **_) -> str:
if not hasattr(ctx.robot, "reminder_manager"):
return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False)
if not time or not content:
return json.dumps({"error": "缺少必要字段: time 和 content"}, ensure_ascii=False)
if len(content.strip()) < 2:
return json.dumps({"error": "提醒内容太短"}, ensure_ascii=False)
if type == "once":
parsed_dt = None
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"):
try:
parsed_dt = datetime.strptime(time, fmt)
break
except ValueError:
continue
if not parsed_dt:
return json.dumps({"error": f"once 类型时间格式应为 YYYY-MM-DD HH:MM收到: {time}"}, ensure_ascii=False)
if parsed_dt < datetime.now():
return json.dumps({"error": f"时间 {time} 已过去,请使用未来的时间"}, ensure_ascii=False)
time = parsed_dt.strftime("%Y-%m-%d %H:%M")
elif type in ("daily", "weekly"):
parsed_time = None
for fmt in ("%H:%M", "%H:%M:%S"):
try:
parsed_time = datetime.strptime(time, fmt)
break
except ValueError:
continue
if not parsed_time:
return json.dumps({"error": f"daily/weekly 类型时间格式应为 HH:MM收到: {time}"}, ensure_ascii=False)
time = parsed_time.strftime("%H:%M")
else:
return json.dumps({"error": f"不支持的提醒类型: {type}"}, ensure_ascii=False)
if type == "weekly" and (weekday is None or not (isinstance(weekday, int) and 0 <= weekday <= 6)):
return json.dumps({"error": "weekly 类型需要 weekday 参数 (0=周一 … 6=周日)"}, ensure_ascii=False)
data = {"type": type, "time": time, "content": content, "extra": {}}
if weekday is not None:
data["weekday"] = weekday
roomid = ctx.msg.roomid if ctx.is_group else None
success, result = ctx.robot.reminder_manager.add_reminder(ctx.msg.sender, data, roomid=roomid)
if success:
type_label = {"once": "一次性", "daily": "每日", "weekly": "每周"}.get(type, type)
return json.dumps({"success": True, "id": result,
"message": f"已创建{type_label}提醒: {time} - {content}"}, ensure_ascii=False)
return json.dumps({"success": False, "error": result}, ensure_ascii=False)
def _reminder_list(ctx, **_) -> str:
if not hasattr(ctx.robot, "reminder_manager"):
return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False)
reminders = ctx.robot.reminder_manager.list_reminders(ctx.msg.sender)
if not reminders:
return json.dumps({"reminders": [], "message": "当前没有任何提醒"}, ensure_ascii=False)
return json.dumps({"reminders": reminders, "count": len(reminders)}, ensure_ascii=False)
def _reminder_delete(ctx, reminder_id: str = "", delete_all: bool = False, **_) -> str:
if not hasattr(ctx.robot, "reminder_manager"):
return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False)
if delete_all:
success, message, count = ctx.robot.reminder_manager.delete_all_reminders(ctx.msg.sender)
return json.dumps({"success": success, "message": message, "deleted_count": count}, ensure_ascii=False)
if not reminder_id:
return json.dumps({"error": "请提供 reminder_id或设置 delete_all=true 删除全部"}, ensure_ascii=False)
success, message = ctx.robot.reminder_manager.delete_reminder(ctx.msg.sender, reminder_id)
return json.dumps({"success": success, "message": message}, ensure_ascii=False)
def _lookup_chat_history(ctx, mode: str = "", keywords: list = None,
start_offset: int = None, end_offset: int = None,
start_time: str = None, end_time: str = None, **_) -> str:
message_summary = getattr(ctx.robot, "message_summary", None) if ctx.robot else None
if not message_summary:
return json.dumps({"error": "消息历史功能不可用"}, ensure_ascii=False)
chat_id = ctx.get_receiver()
visible_limit = DEFAULT_VISIBLE_LIMIT
raw = getattr(ctx, "specific_max_history", None)
if raw is not None:
try:
visible_limit = int(raw)
except (TypeError, ValueError):
pass
mode = (mode or "").strip().lower()
if not mode:
if start_time and end_time:
mode = "time"
elif start_offset is not None and end_offset is not None:
mode = "range"
else:
mode = "keywords"
if mode == "keywords":
if isinstance(keywords, str):
keywords = [keywords]
elif not isinstance(keywords, list):
keywords = []
cleaned = []
seen = set()
for kw in keywords:
if kw is None:
continue
s = str(kw).strip()
if s and (len(s) > 1 or s.isdigit()):
low = s.lower()
if low not in seen:
seen.add(low)
cleaned.append(s)
if not cleaned:
return json.dumps({"error": "未提供有效关键词", "results": []}, ensure_ascii=False)
search_results = message_summary.search_messages_with_context(
chat_id=chat_id, keywords=cleaned, context_window=10,
max_groups=20, exclude_recent=visible_limit,
)
segments, lines_seen = [], set()
for seg in search_results:
formatted = [l for l in seg.get("formatted_messages", []) if l not in lines_seen]
lines_seen.update(formatted)
if formatted:
segments.append({"matched_keywords": seg.get("matched_keywords", []), "messages": formatted})
payload = {"segments": segments, "returned_groups": len(segments), "keywords": cleaned}
if not segments:
payload["notice"] = "未找到匹配的消息。"
return json.dumps(payload, ensure_ascii=False)
if mode == "range":
if start_offset is None or end_offset is None:
return json.dumps({"error": "range 模式需要 start_offset 和 end_offset"}, ensure_ascii=False)
try:
start_offset, end_offset = int(start_offset), int(end_offset)
except (TypeError, ValueError):
return json.dumps({"error": "start_offset 和 end_offset 必须是整数"}, ensure_ascii=False)
if start_offset <= visible_limit or end_offset <= visible_limit:
return json.dumps({"error": f"偏移量必须大于 {visible_limit} 以排除当前可见消息"}, ensure_ascii=False)
if start_offset > end_offset:
start_offset, end_offset = end_offset, start_offset
result = message_summary.get_messages_by_reverse_range(
chat_id=chat_id, start_offset=start_offset, end_offset=end_offset,
)
payload = {
"start_offset": result.get("start_offset"), "end_offset": result.get("end_offset"),
"messages": result.get("messages", []), "returned_count": result.get("returned_count", 0),
"total_messages": result.get("total_messages", 0),
}
if payload["returned_count"] == 0:
payload["notice"] = "请求范围内没有消息。"
return json.dumps(payload, ensure_ascii=False)
if mode == "time":
if not start_time or not end_time:
return json.dumps({"error": "time 模式需要 start_time 和 end_time"}, ensure_ascii=False)
time_lines = message_summary.get_messages_by_time_window(
chat_id=chat_id, start_time=start_time, end_time=end_time,
)
payload = {"start_time": start_time, "end_time": end_time,
"messages": time_lines, "returned_count": len(time_lines)}
if not time_lines:
payload["notice"] = "该时间范围内没有消息。"
return json.dumps(payload, ensure_ascii=False)
return json.dumps({"error": f"不支持的模式: {mode}"}, ensure_ascii=False)
# ══════════════════════════════════════════════════════════
# 工具注册表
# ══════════════════════════════════════════════════════════
TOOLS = {
"web_search": {
"handler": _web_search,
"description": "在网络上搜索信息。用于回答需要最新数据、实时信息或你不确定的事实性问题。deep_research 仅在问题非常复杂、需要深度研究时才开启。",
"status_text": "正在联网搜索: ",
"status_arg": "query",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词或问题"},
"deep_research": {"type": "boolean", "description": "是否启用深度研究模式(耗时较长,仅用于复杂问题)"},
},
"required": ["query"],
"additionalProperties": False,
},
},
"reminder_create": {
"handler": _reminder_create,
"description": "创建提醒。支持 once(一次性)、daily(每日)、weekly(每周) 三种类型。当前时间已在对话上下文中提供,请据此计算目标时间。",
"status_text": "正在设置提醒...",
"parameters": {
"type": "object",
"properties": {
"type": {"type": "string", "enum": ["once", "daily", "weekly"], "description": "提醒类型"},
"time": {"type": "string", "description": "once → YYYY-MM-DD HH:MMdaily/weekly → HH:MM"},
"content": {"type": "string", "description": "提醒内容"},
"weekday": {"type": "integer", "description": "仅 weekly 需要。0=周一 … 6=周日"},
},
"required": ["type", "time", "content"],
"additionalProperties": False,
},
},
"reminder_list": {
"handler": _reminder_list,
"description": "查看当前用户的所有提醒列表。",
"parameters": {"type": "object", "properties": {}, "additionalProperties": False},
},
"reminder_delete": {
"handler": _reminder_delete,
"description": "删除提醒。需要先调用 reminder_list 获取 ID再用 reminder_id 精确删除;或设置 delete_all=true 一次性删除全部。",
"parameters": {
"type": "object",
"properties": {
"reminder_id": {"type": "string", "description": "要删除的提醒完整 ID"},
"delete_all": {"type": "boolean", "description": "是否删除该用户全部提醒"},
},
"additionalProperties": False,
},
},
"lookup_chat_history": {
"handler": _lookup_chat_history,
"description": "查询聊天历史记录。你当前只能看到最近的消息,调用此工具可以回溯更早的上下文。支持 keywords/range/time 三种模式。",
"status_text": "正在翻阅聊天记录: ",
"status_arg": "keywords",
"parameters": {
"type": "object",
"properties": {
"mode": {"type": "string", "enum": ["keywords", "range", "time"], "description": "查询模式"},
"keywords": {"type": "array", "items": {"type": "string"}, "description": "mode=keywords 时的搜索关键词"},
"start_offset": {"type": "integer", "description": "mode=range 时的起始偏移(从最新消息倒数)"},
"end_offset": {"type": "integer", "description": "mode=range 时的结束偏移"},
"start_time": {"type": "string", "description": "mode=time 时的开始时间 (YYYY-MM-DD HH:MM)"},
"end_time": {"type": "string", "description": "mode=time 时的结束时间 (YYYY-MM-DD HH:MM)"},
},
"additionalProperties": False,
},
},
}
def _get_openai_tools():
return [
{"type": "function", "function": {"name": n, "description": s["description"], "parameters": s["parameters"]}}
for n, s in TOOLS.items()
]
def _create_tool_handler(ctx):
def _send_status(spec, arguments):
status = spec.get("status_text", "")
if not status:
return
try:
arg_name = spec.get("status_arg", "")
if arg_name:
val = arguments.get(arg_name)
if val is not None:
if isinstance(val, list):
val = "".join(str(k) for k in val[:3])
status = f"{status}{val}"
ctx.send_text(status, record_message=False)
except Exception:
pass
def handler(tool_name, arguments):
spec = TOOLS.get(tool_name)
if not spec:
return json.dumps({"error": f"Unknown tool: {tool_name}"}, ensure_ascii=False)
_send_status(spec, arguments)
try:
result = spec["handler"](ctx, **arguments)
if not isinstance(result, str):
result = json.dumps(result, ensure_ascii=False)
return result
except Exception as e:
logger.error(f"工具 {tool_name} 执行失败: {e}", exc_info=True)
return json.dumps({"error": str(e)}, ensure_ascii=False)
return handler
# ══════════════════════════════════════════════════════════
# Agent 入口
# ══════════════════════════════════════════════════════════
def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""Agent 入口 —— 处理用户消息LLM 自主决定是否调用工具。"""
chat_model = None
if hasattr(ctx, 'chat'):
chat_model = ctx.chat
elif ctx.robot and hasattr(ctx.robot, 'chat'):
chat_model = ctx.robot.chat
if not chat_model:
if ctx.logger:
ctx.logger.error("没有可用的AI模型")
ctx.send_text("抱歉,我现在无法进行对话。")
return False
# 历史消息数量限制
raw_specific_max_history = getattr(ctx, 'specific_max_history', None)
specific_max_history = None
if raw_specific_max_history is not None:
try:
specific_max_history = int(raw_specific_max_history)
except (TypeError, ValueError):
specific_max_history = None
if specific_max_history is not None:
specific_max_history = max(10, min(300, specific_max_history))
if specific_max_history is None:
specific_max_history = DEFAULT_CHAT_HISTORY
setattr(ctx, 'specific_max_history', specific_max_history)
# ── 引用图片特殊处理 ──────────────────────────────────
if getattr(ctx, 'is_quoted_image', False):
return _handle_quoted_image(ctx, chat_model)
# ── 构建用户消息 ──────────────────────────────────────
content = ctx.text
sender_name = ctx.sender_name
if ctx.robot and hasattr(ctx.robot, "xml_processor"):
if ctx.is_group:
msg_data = ctx.robot.xml_processor.extract_quoted_message(ctx.msg)
else:
msg_data = ctx.robot.xml_processor.extract_private_quoted_message(ctx.msg)
q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, sender_name)
if not q_with_info:
current_time = time_mod.strftime("%H:%M", time_mod.localtime())
q_with_info = f"[{current_time}] {sender_name}: {content or '[空内容]'}"
else:
current_time = time_mod.strftime("%H:%M", time_mod.localtime())
q_with_info = f"[{current_time}] {sender_name}: {content or '[空内容]'}"
is_auto_random_reply = getattr(ctx, 'auto_random_reply', False)
if ctx.is_group and not ctx.is_at_bot and is_auto_random_reply:
latest_message_prompt = (
"# 群聊插话提醒\n"
"你目前是在群聊里主动接话,没有人点名让你发言。\n"
"请根据下面这句(或者你任选一句)最新消息插入一条自然、不突兀的中文回复,语气放松随和即可:\n"
f"\u201c{q_with_info}\u201d\n"
"不要重复任何已知的内容,提出新的思维碰撞(例如:基于上下文的新问题、不同角度的解释等,但是不要反驳任何内容),也不要显得过于正式。"
)
else:
latest_message_prompt = (
"# 本轮需要回复的用户及其最新信息\n"
"请你基于下面这条最新收到的用户讯息(和该用户最近的历史消息),直接面向发送者进行自然的中文回复:\n"
f"\u201c{q_with_info}\u201d\n"
"请只针对该用户进行回复。"
)
# ── 构建工具列表 ──────────────────────────────────────
tools = None
tool_handler = None
if not is_auto_random_reply:
openai_tools = _get_openai_tools()
if openai_tools:
tools = openai_tools
tool_handler = _create_tool_handler(ctx)
# ── 构建系统提示 ──────────────────────────────────────
persona_text = getattr(ctx, 'persona', None)
system_prompt_override = None
tool_guidance = ""
if tools:
tool_guidance = (
"\n\n## 工具使用指引\n"
"你可以调用工具来辅助回答,以下是决策原则:\n"
"- 用户询问需要最新信息、实时数据、或你不确定的事实 → 调用 web_search\n"
"- 用户想设置/查看/删除提醒 → 调用 reminder_create / reminder_list / reminder_delete\n"
"- 用户提到之前聊过的内容、或你需要回顾更早的对话 → 调用 lookup_chat_history\n"
"- 日常闲聊、观点讨论、情感交流 → 直接回复,不需要调用任何工具\n"
"你可以在一次对话中多次调用工具,每次调用的结果会反馈给你继续推理。"
)
if persona_text:
try:
base_prompt = build_persona_system_prompt(chat_model, persona_text)
system_prompt_override = base_prompt + tool_guidance if base_prompt else tool_guidance or None
except Exception as persona_exc:
if ctx.logger:
ctx.logger.error(f"构建人设系统提示失败: {persona_exc}", exc_info=True)
system_prompt_override = tool_guidance or None
elif tool_guidance:
system_prompt_override = tool_guidance
# ── 调用 LLM ─────────────────────────────────────────
try:
if ctx.logger:
tool_names = [t["function"]["name"] for t in tools] if tools else []
ctx.logger.info(f"Agent 调用: tools={tool_names}")
rsp = chat_model.get_answer(
question=latest_message_prompt,
wxid=ctx.get_receiver(),
system_prompt_override=system_prompt_override,
specific_max_history=specific_max_history,
tools=tools,
tool_handler=tool_handler,
tool_max_iterations=20,
)
if rsp:
ctx.send_text(rsp, "")
return True
else:
if ctx.logger:
ctx.logger.error("无法从AI获得答案")
return False
except Exception as e:
if ctx.logger:
ctx.logger.error(f"获取AI回复时出错: {e}", exc_info=True)
return False
def _handle_quoted_image(ctx, chat_model) -> bool:
"""处理引用图片消息。"""
if ctx.logger:
ctx.logger.info("检测到引用图片消息,尝试处理图片内容...")
from ai_providers.ai_chatgpt import ChatGPT
support_vision = False
if isinstance(chat_model, ChatGPT):
if hasattr(chat_model, 'support_vision') and chat_model.support_vision:
support_vision = True
elif hasattr(chat_model, 'model'):
model_name = getattr(chat_model, 'model', '')
support_vision = model_name in ("gpt-4.1-mini", "gpt-4o") or "-vision" in model_name
if not support_vision:
ctx.send_text("抱歉,当前 AI 模型不支持处理图片。请联系管理员配置支持视觉的模型。")
return True
try:
temp_dir = "temp/image_cache"
os.makedirs(temp_dir, exist_ok=True)
image_path = ctx.wcf.download_image(
id=ctx.quoted_msg_id, extra=ctx.quoted_image_extra,
dir=temp_dir, timeout=30,
)
if not image_path or not os.path.exists(image_path):
ctx.send_text("抱歉,无法下载图片进行分析。")
return True
prompt = ctx.text if ctx.text and ctx.text.strip() else "请详细描述这张图片中的内容"
response = chat_model.get_image_description(image_path, prompt)
ctx.send_text(response)
try:
if os.path.exists(image_path):
os.remove(image_path)
except Exception:
pass
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"处理引用图片出错: {e}", exc_info=True)
ctx.send_text(f"处理图片时发生错误: {str(e)}")
return True

View File

@@ -1,78 +0,0 @@
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
from .context import MessageContext
REMINDER_ROUTER_HISTORY_LIMIT = 10
@dataclass
class ReminderDecision:
action: str
params: str = ""
class ReminderRouter:
"""二级提醒路由器,用于在提醒场景下判定具体操作"""
def __init__(self) -> None:
self.logger = logging.getLogger(__name__ + ".ReminderRouter")
def _build_prompt(self) -> str:
return (
"你是提醒助手的路由器。根据用户关于提醒的说法,判断应该执行哪个操作,并返回 JSON。\n\n"
"### 可执行的操作:\n"
"- create创建新的提醒需要从用户话语中提取完整的提醒内容包括时间、人称、事项等\n"
"- list查询当前用户的所有提醒当用户想要查看、看看、列出、有哪些提醒时使用。\n"
"- delete删除提醒当用户想取消、删除、移除某个提醒时使用。需要根据用户给出的描述、关键字或者编号帮助定位哪条提醒。\n\n"
"### 返回格式:\n"
"{\n"
' "action": "create" | "list" | "delete",\n'
' "content": "从用户话语中提取或保留的关键信息(删除或新增时必填)"\n'
"}\n\n"
"注意:只返回 JSON不要包含多余文字。若无法识别返回 create 并把原句放进 content。"
)
def route(self, ctx: MessageContext, original_text: str) -> Optional[ReminderDecision]:
chat_model = getattr(ctx, "chat", None) or getattr(ctx.robot, "chat", None)
if not chat_model:
self.logger.error("提醒路由器:缺少可用的聊天模型。")
return None
prompt = self._build_prompt()
user_input = f"用户关于提醒的输入:{original_text}"
try:
ai_response = chat_model.get_answer(
user_input,
wxid=ctx.get_receiver(),
system_prompt_override=prompt,
specific_max_history=REMINDER_ROUTER_HISTORY_LIMIT,
)
self.logger.debug("提醒路由器原始响应: %s", ai_response)
json_match = json.loads(json_response(ai_response))
action = json_match.get("action", "").strip().lower()
content = json_match.get("content", "").strip()
if action not in {"create", "list", "delete"}:
self.logger.warning("提醒路由器:未知动作 %s,默认为 create。", action)
action = "create"
return ReminderDecision(action=action, params=content)
except Exception as exc:
self.logger.error("提醒路由器解析失败: %s", exc, exc_info=True)
return None
def json_response(raw: str) -> str:
"""从模型返回的文本中提取 JSON。"""
try:
start = raw.index("{")
end = raw.rindex("}") + 1
return raw[start:end]
except ValueError:
return "{}"
reminder_router = ReminderRouter()

108
robot.py
View File

@@ -34,7 +34,7 @@ from function.func_xml_process import XmlProcessor
# 导入上下文及常用处理函数
from commands.context import MessageContext
from commands.handlers import handle_chitchat # 保留旧接口用于兼容
# 旧的 handle_chitchat 已被 _handle_chitchat_async 取代
from commands.keyword_triggers import KeywordTriggerProcessor
from commands.message_forwarder import MessageForwarder
@@ -651,6 +651,10 @@ class Robot(Job):
async def _handle_chitchat_async(self, ctx, auto_random_reply: bool = False) -> bool:
"""异步处理闲聊 - 使用 Agent Loop 架构"""
# 引用图片特殊处理
if getattr(ctx, 'is_quoted_image', False):
return await self._handle_quoted_image_async(ctx)
force_reasoning = bool(getattr(ctx, 'force_reasoning', False))
reasoning_requested = bool(getattr(ctx, 'reasoning_requested', False)) or force_reasoning
is_auto_random_reply = bool(getattr(ctx, 'auto_random_reply', False))
@@ -817,66 +821,60 @@ class Robot(Job):
await self.send_text_async("抱歉,服务暂时不可用,请稍后再试。", chat_id)
return False
def _handle_chitchat(self, ctx, match=None):
"""同步版本 - 保持向后兼容,内部调用旧逻辑"""
force_reasoning = bool(getattr(ctx, 'force_reasoning', False))
reasoning_requested = bool(getattr(ctx, 'reasoning_requested', False)) or force_reasoning
original_chat = getattr(ctx, 'chat', None)
async def _handle_quoted_image_async(self, ctx) -> bool:
"""异步处理引用图片消息"""
import os
if reasoning_requested:
if force_reasoning:
self.LOG.info("群配置了 force_reasoning将使用推理模型。")
else:
self.LOG.info("检测到推理模式请求,将启用深度思考。")
ctx.send_text("正在深度思考,请稍候...", record_message=False)
reasoning_chat = self._get_reasoning_chat_model()
if reasoning_chat:
ctx.chat = reasoning_chat
else:
self.LOG.warning("当前模型未配置推理模型,使用默认模型")
self.LOG.info("检测到引用图片消息,尝试处理图片内容...")
# 构建候选模型列表:当前模型 + fallback
primary_id = getattr(self, 'current_model_id', None)
fallback_ids = self._get_fallback_model_ids()
candidate_ids = []
if primary_id is not None:
candidate_ids.append(primary_id)
for fid in fallback_ids:
if fid not in candidate_ids and fid in self.chat_models:
candidate_ids.append(fid)
chat_model = getattr(ctx, 'chat', None) or self.chat
support_vision = False
handled = False
for i, model_id in enumerate(candidate_ids):
if i > 0:
# 切换到 fallback 模型
fallback_model = self.chat_models[model_id]
if reasoning_requested:
fallback_reasoning = self.reasoning_chat_models.get(model_id)
ctx.chat = fallback_reasoning or fallback_model
else:
ctx.chat = fallback_model
model_name = getattr(ctx.chat, '__class__', type(ctx.chat)).__name__
self.LOG.info(f"Fallback: 切换到模型 {model_name}(ID:{model_id})")
if isinstance(chat_model, ChatGPT):
if hasattr(chat_model, 'support_vision') and chat_model.support_vision:
support_vision = True
elif hasattr(chat_model, 'model'):
model_name = getattr(chat_model, 'model', '')
support_vision = model_name in ("gpt-4.1-mini", "gpt-4o") or "-vision" in model_name
if not support_vision:
await self.send_text_async(
"抱歉,当前 AI 模型不支持处理图片。请联系管理员配置支持视觉的模型。",
ctx.get_receiver()
)
return True
try:
temp_dir = "temp/image_cache"
os.makedirs(temp_dir, exist_ok=True)
image_path = await asyncio.to_thread(
ctx.wcf.download_image,
id=ctx.quoted_msg_id,
extra=ctx.quoted_image_extra,
dir=temp_dir,
timeout=30
)
if not image_path or not os.path.exists(image_path):
await self.send_text_async("抱歉,无法下载图片进行分析。", ctx.get_receiver())
return True
prompt = ctx.text if ctx.text and ctx.text.strip() else "请详细描述这张图片中的内容"
response = await asyncio.to_thread(chat_model.get_image_description, image_path, prompt)
await self.send_text_async(response, ctx.get_receiver())
try:
handled = handle_chitchat(ctx, match)
if handled:
break
except Exception as e:
self.LOG.warning(f"模型 {model_id} 调用失败: {e}")
continue
if os.path.exists(image_path):
os.remove(image_path)
except Exception:
pass
return True
# 恢复原始模型
if original_chat is not None:
ctx.chat = original_chat
if not handled:
if reasoning_requested:
ctx.send_text("抱歉,深度思考暂时遇到问题,请稍后再试。")
else:
ctx.send_text("抱歉,服务暂时不可用,请稍后再试。")
return handled
except Exception as e:
self.LOG.error(f"处理引用图片出错: {e}", exc_info=True)
await self.send_text_async(f"处理图片时发生错误: {str(e)}", ctx.get_receiver())
return True
def _describe_chat_model(self, chat_model, reasoning: bool = False) -> str:
"""根据配置返回模型名称,默认回退到实例类名"""