From f0339f79ac82ff747464a5b35f34308cadf5ff64 Mon Sep 17 00:00:00 2001 From: zihanjian Date: Wed, 4 Feb 2026 17:35:08 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E4=B8=BA=20Agent=20?= =?UTF-8?q?=E6=9E=B6=E6=9E=84=E5=B9=B6=E5=AE=9E=E7=8E=B0=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- REFACTOR_PLAN.md | 103 ++++++++ ai_providers/ai_chatgpt.py | 63 ++--- ai_providers/ai_deepseek.py | 55 ++-- ai_providers/ai_kimi.py | 40 +-- ai_providers/fallback.py | 143 ++++++++++ commands/handlers.py | 505 ++++++++---------------------------- function/func_summary.py | 74 ++++++ robot.py | 218 ++++++++-------- tools/__init__.py | 80 ++++++ tools/history.py | 189 ++++++++++++++ tools/reminder.py | 164 ++++++++++++ tools/web_search.py | 61 +++++ 12 files changed, 1113 insertions(+), 582 deletions(-) create mode 100644 REFACTOR_PLAN.md create mode 100644 ai_providers/fallback.py create mode 100644 tools/__init__.py create mode 100644 tools/history.py create mode 100644 tools/reminder.py create mode 100644 tools/web_search.py diff --git a/REFACTOR_PLAN.md b/REFACTOR_PLAN.md new file mode 100644 index 0000000..5a456e2 --- /dev/null +++ b/REFACTOR_PLAN.md @@ -0,0 +1,103 @@ +# Bubbles 改造计划 + +## 核心问题 + +Bubbles 是个单次意图分类器,不是 Agent。AI Router 做一次 `chat/function` 分类就结束了,无法多步推理、自主决策。这是"死板"的根源。 + +--- + +## 一、Agent 循环(最高优先级) + +把 `processMsg` 的单次分类改成工具调用循环。AI 自己决定调什么工具、调几次、什么时候停。 + +**改动范围:** `robot.py` 的消息处理主流程,`commands/ai_router.py` 的路由逻辑 + +**目标状态:** + +``` +消息进入 → Agent 循环开始 + → LLM 返回工具调用 → 执行工具 → 结果喂回 LLM → 继续推理 + → LLM 返回纯文本 → 循环结束,发送回复 + → 达到最大步数 → 强制结束 +``` + +**必须包含的防护:** +- 最大步数限制(防无限循环) +- 死循环检测:同一工具连续 3 次相同输入时中断 +- 单步超时 + +--- + +## 二、工具标准化 + +定义统一的 Tool 接口,把现有功能改写成标准工具,让 Agent 循环能调用。 + +**改动范围:** 新建 `tools/` 目录,重构 `commands/ai_functions.py`、`commands/reminder_router.py` + +**Tool 接口定义:** + +```python +class Tool: + name: str # 工具唯一标识 + description: str # 给 LLM 看的功能描述 + parameters: dict # JSON Schema 参数定义 + + async def execute(self, params: dict, ctx: MessageContext) -> str: + """执行工具,返回文本结果""" +``` + +**需要改写的现有功能:** +- `reminder_hub` → `reminder_create` / `reminder_list` / `reminder_delete`(拆开,消灭二级路由) +- `perplexity_search` → `web_search` +- `handle_chitchat` 不再是工具,而是 Agent 循环的默认文本输出路径 + +**工具描述走 LLM 原生的 function calling / tool_use 协议**,不再拼进提示词字符串。 + +--- + +## 三、模型 Fallback + +当前模型挂了就挂了。必须加 fallback 链。 + +**改动范围:** `robot.py` 的模型调用层,各 `ai_providers/` 适配器 + +**目标状态:** + +```yaml +# config.yaml +models: + default: + primary: deepseek + fallbacks: [chatgpt, kimi] +``` + +**必须实现:** +- 区分可重试错误(429 限流、超时、服务端 500)和不可重试错误(401 密钥无效) +- 可重试错误:指数退避重试(初始 2s,最大 30s) +- 不可重试或重试耗尽:切下一个 fallback 模型 +- 记录失败模型的冷却时间,短期内不再尝试 + +--- + +## 四、上下文压缩 + +当前 `max_history` 按条数硬截断,丢失早期重要信息。 + +**改动范围:** `robot.py` 的历史消息获取逻辑,`commands/handlers.py` 的对话构建 + +**目标状态:** +- 监控当前对话的 token 总量 +- 接近模型上下文窗口上限时,对早期消息做摘要压缩 +- 保留最近 N 轮完整对话 + 早期对话的 LLM 生成摘要 +- 替代现在的简单条数截断 + +--- + +## 执行状态 + +``` +一、工具标准化 ✅ 已完成 — tools/__init__.py, tools/reminder.py, tools/web_search.py, tools/history.py +二、Agent 循环 ✅ 已完成 — 移除 AI Router,LLM 直接通过 _execute_with_tools 自主调用工具 +三、模型 Fallback ✅ 已完成 — _handle_chitchat 级联候选模型,ai_providers/fallback.py 重试/冷却 +四、上下文压缩 ✅ 已完成 — func_summary.get_compressed_context(),字符预算代替固定条数截断 +``` diff --git a/ai_providers/ai_chatgpt.py b/ai_providers/ai_chatgpt.py index 9d7d4b7..7407016 100644 --- a/ai_providers/ai_chatgpt.py +++ b/ai_providers/ai_chatgpt.py @@ -82,32 +82,39 @@ class ChatGPT(): api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"}) - # 2. 获取并格式化历史消息 + # 2. 获取并格式化历史消息(使用上下文压缩) if self.message_summary and self.bot_wxid: - history = self.message_summary.get_messages(wxid) - - # -限制历史消息数量 - # 优先使用传入的特定限制,如果没有则使用模型默认限制 limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages - self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}") - - if limit_to_use is not None and limit_to_use > 0: - history = history[-limit_to_use:] # 取最新的 N 条 - elif limit_to_use == 0: # 如果设置为0,则不包含历史 - history = [] - - self.LOG.debug(f"应用限制后历史条数: {len(history)}") + try: + limit_to_use = int(limit_to_use) if limit_to_use is not None else None + except (TypeError, ValueError): + limit_to_use = self.max_history_messages + + if limit_to_use == 0: + history = [] + context_summary = None + elif hasattr(self.message_summary, 'get_compressed_context'): + history, context_summary = self.message_summary.get_compressed_context( + wxid, max_context_chars=8000, max_recent=limit_to_use + ) + else: + history = self.message_summary.get_messages(wxid) + if limit_to_use and limit_to_use > 0: + history = history[-limit_to_use:] + context_summary = None + + if context_summary: + api_messages.append({"role": "system", "content": f"Earlier conversation context:\n{context_summary}"}) for msg in history: role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user" content = msg.get('content', '') - if content: # 避免添加空内容 + if content: if role == "user": - sender_name = msg.get('sender', '未知用户') # 获取发送者名字,如果不存在则使用默认值 - formatted_content = f"{sender_name}: {content}" # 格式化内容,加入发送者名字 - api_messages.append({"role": role, "content": formatted_content}) - else: # 如果是助手(机器人自己)的消息,则不加名字 - api_messages.append({"role": role, "content": content}) + sender_name = msg.get('sender', '未知用户') + api_messages.append({"role": role, "content": f"{sender_name}: {content}"}) + else: + api_messages.append({"role": role, "content": content}) else: self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。") @@ -130,18 +137,12 @@ class ChatGPT(): ) return response_text - except AuthenticationError: - self.LOG.error("OpenAI API 认证失败,请检查 API 密钥是否正确") - return "API认证失败" - except APIConnectionError: - self.LOG.error("无法连接到 OpenAI API,请检查网络连接") - return "网络连接错误" - except APIError as e1: - self.LOG.error(f"OpenAI API 返回了错误:{str(e1)}") - return f"API错误: {str(e1)}" - except Exception as e0: - self.LOG.error(f"发生未知错误:{str(e0)}", exc_info=True) - return "发生未知错误" + except (AuthenticationError, APIConnectionError, APIError) as e: + self.LOG.error(f"ChatGPT API 调用失败: {e}") + raise + except Exception as e: + self.LOG.error(f"ChatGPT 未知错误: {e}", exc_info=True) + raise def _execute_with_tools( self, diff --git a/ai_providers/ai_deepseek.py b/ai_providers/ai_deepseek.py index 502eb43..1959fd0 100644 --- a/ai_providers/ai_deepseek.py +++ b/ai_providers/ai_deepseek.py @@ -78,32 +78,39 @@ class DeepSeek(): api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"}) - # 2. 获取并格式化历史消息 + # 2. 获取并格式化历史消息(使用上下文压缩) if self.message_summary and self.bot_wxid: - history = self.message_summary.get_messages(wxid) - - # 限制历史消息数量 - # 优先使用传入的特定限制,如果没有则使用模型默认限制 limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages - self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}") - - if limit_to_use is not None and limit_to_use > 0: - history = history[-limit_to_use:] # 取最新的 N 条 - elif limit_to_use == 0: # 如果设置为0,则不包含历史 - history = [] - - self.LOG.debug(f"应用限制后历史条数: {len(history)}") + try: + limit_to_use = int(limit_to_use) if limit_to_use is not None else None + except (TypeError, ValueError): + limit_to_use = self.max_history_messages + + if limit_to_use == 0: + history = [] + context_summary = None + elif hasattr(self.message_summary, 'get_compressed_context'): + history, context_summary = self.message_summary.get_compressed_context( + wxid, max_context_chars=8000, max_recent=limit_to_use + ) + else: + history = self.message_summary.get_messages(wxid) + if limit_to_use and limit_to_use > 0: + history = history[-limit_to_use:] + context_summary = None + + if context_summary: + api_messages.append({"role": "system", "content": f"Earlier conversation context:\n{context_summary}"}) for msg in history: role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user" content = msg.get('content', '') if content: if role == "user": - sender_name = msg.get('sender', '未知用户') # 获取发送者名字 - formatted_content = f"{sender_name}: {content}" # 格式化内容 - api_messages.append({"role": role, "content": formatted_content}) - else: # 助手消息 - api_messages.append({"role": role, "content": content}) + sender_name = msg.get('sender', '未知用户') + api_messages.append({"role": role, "content": f"{sender_name}: {content}"}) + else: + api_messages.append({"role": role, "content": content}) else: self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。") @@ -125,12 +132,12 @@ class DeepSeek(): ) return final_response - except (APIConnectionError, APIError, AuthenticationError) as e1: - self.LOG.error(f"DeepSeek API 返回了错误:{str(e1)}") - return f"DeepSeek API 返回了错误:{str(e1)}" - except Exception as e0: - self.LOG.error(f"发生未知错误:{str(e0)}", exc_info=True) - return "抱歉,处理您的请求时出现了错误" + except (APIConnectionError, APIError, AuthenticationError) as e: + self.LOG.error(f"DeepSeek API 调用失败: {e}") + raise + except Exception as e: + self.LOG.error(f"DeepSeek 未知错误: {e}", exc_info=True) + raise def _execute_with_tools( self, diff --git a/ai_providers/ai_kimi.py b/ai_providers/ai_kimi.py index 67052cd..d84b5f5 100644 --- a/ai_providers/ai_kimi.py +++ b/ai_providers/ai_kimi.py @@ -77,18 +77,27 @@ class Kimi: api_messages.append({"role": "system", "content": f"Current time is: {now_time}"}) if self.message_summary and self.bot_wxid: - history = self.message_summary.get_messages(wxid) - limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages try: limit_to_use = int(limit_to_use) if limit_to_use is not None else None except (TypeError, ValueError): limit_to_use = self.max_history_messages - if limit_to_use is not None and limit_to_use > 0: - history = history[-limit_to_use:] - elif limit_to_use == 0: + if limit_to_use == 0: history = [] + context_summary = None + elif hasattr(self.message_summary, 'get_compressed_context'): + history, context_summary = self.message_summary.get_compressed_context( + wxid, max_context_chars=8000, max_recent=limit_to_use + ) + else: + history = self.message_summary.get_messages(wxid) + if limit_to_use and limit_to_use > 0: + history = history[-limit_to_use:] + context_summary = None + + if context_summary: + api_messages.append({"role": "system", "content": f"Earlier conversation context:\n{context_summary}"}) for msg in history: role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user" @@ -97,8 +106,7 @@ class Kimi: continue if role == "user": sender_name = msg.get("sender", "未知用户") - formatted_content = f"{sender_name}: {content}" - api_messages.append({"role": role, "content": formatted_content}) + api_messages.append({"role": role, "content": f"{sender_name}: {content}"}) else: api_messages.append({"role": role, "content": content}) else: @@ -132,18 +140,12 @@ class Kimi: return response_text - except AuthenticationError: - self.LOG.error("Kimi API 认证失败,请检查 API 密钥是否正确") - return "Kimi API 认证失败,请检查配置。" - except APIConnectionError: - self.LOG.error("无法连接到 Kimi API,请检查网络或代理设置") - return "无法连接到 Kimi 服务,请稍后再试。" - except APIError as api_err: - self.LOG.error(f"Kimi API 返回错误:{api_err}") - return f"Kimi API 错误:{api_err}" - except Exception as exc: - self.LOG.error(f"Kimi 处理请求时出现未知错误:{exc}", exc_info=True) - return "处理请求时出现未知错误,请稍后再试。" + except (AuthenticationError, APIConnectionError, APIError) as e: + self.LOG.error(f"Kimi API 调用失败: {e}") + raise + except Exception as e: + self.LOG.error(f"Kimi 未知错误: {e}", exc_info=True) + raise def _execute_with_tools( self, diff --git a/ai_providers/fallback.py b/ai_providers/fallback.py new file mode 100644 index 0000000..224ae6c --- /dev/null +++ b/ai_providers/fallback.py @@ -0,0 +1,143 @@ +""" +模型 Fallback 机制 —— 主模型失败时自动切到备选模型。 + +参考 OpenClaw 的 model-fallback.ts 设计: + - 区分可重试错误(限流、超时、服务端 500)和不可重试错误(密钥无效) + - 可重试:指数退避重试 + - 不可重试或重试耗尽:切下一个 fallback 模型 + - 记录失败模型的冷却时间 +""" + +import logging +import time +from typing import Any, Callable, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# 冷却时间(秒):模型失败后暂时不再尝试 +_MODEL_COOLDOWN: Dict[int, float] = {} # model_id -> 冷却结束时间戳 +COOLDOWN_DURATION = 60 # 60秒冷却 + +# 重试参数 +RETRY_INITIAL_DELAY = 2.0 +RETRY_BACKOFF_FACTOR = 2.0 +RETRY_MAX_DELAY = 30.0 +MAX_RETRIES_PER_MODEL = 2 + + +def _is_retryable(error: Exception) -> bool: + """判断错误是否可重试。""" + error_str = str(error).lower() + error_type = type(error).__name__ + + # 限流 + if "rate" in error_str and "limit" in error_str: + return True + if "429" in error_str: + return True + # 超时 + if "timeout" in error_str or "timed out" in error_str: + return True + # 服务端错误 + if "500" in error_str or "502" in error_str or "503" in error_str: + return True + if "server" in error_str and "error" in error_str: + return True + # 连接错误 + if "connection" in error_str: + return True + # OpenAI 特定 + if error_type in ("APIConnectionError", "APITimeoutError", "InternalServerError"): + return True + + return False + + +def _is_in_cooldown(model_id: int) -> bool: + """检查模型是否在冷却中。""" + deadline = _MODEL_COOLDOWN.get(model_id) + if deadline is None: + return False + if time.time() < deadline: + return True + # 冷却结束,清除 + _MODEL_COOLDOWN.pop(model_id, None) + return False + + +def _set_cooldown(model_id: int) -> None: + """将模型加入冷却。""" + _MODEL_COOLDOWN[model_id] = time.time() + COOLDOWN_DURATION + logger.info(f"模型 {model_id} 进入冷却({COOLDOWN_DURATION}秒)") + + +def call_with_fallback( + primary_model_id: int, + chat_models: Dict[int, Any], + fallback_ids: List[int], + call_fn: Callable[[Any], str], +) -> Tuple[str, int]: + """ + 带 Fallback 的模型调用。 + + :param primary_model_id: 主模型 ID + :param chat_models: 所有可用模型 {id: instance} + :param fallback_ids: 按优先级排序的 fallback 模型 ID 列表 + :param call_fn: 实际调用函数,接收模型实例,返回回复文本 + :return: (回复文本, 实际使用的模型ID) + """ + # 构建候选列表:主模型 + fallbacks,跳过冷却中的 + candidates = [] + for mid in [primary_model_id] + fallback_ids: + if mid not in chat_models: + continue + if mid in [c[0] for c in candidates]: + continue # 去重 + if _is_in_cooldown(mid): + logger.info(f"模型 {mid} 处于冷却中,跳过") + continue + candidates.append((mid, chat_models[mid])) + + if not candidates: + # 所有模型都在冷却,强制使用主模型 + if primary_model_id in chat_models: + candidates = [(primary_model_id, chat_models[primary_model_id])] + else: + return "所有模型暂时不可用,请稍后再试。", primary_model_id + + last_error = None + for model_id, model_instance in candidates: + # 对每个候选模型,尝试最多 MAX_RETRIES_PER_MODEL 次 + for attempt in range(MAX_RETRIES_PER_MODEL + 1): + try: + result = call_fn(model_instance) + if result: + return result, model_id + # 空结果视为失败,但不重试 + break + except Exception as e: + last_error = e + model_name = getattr(model_instance, '__class__', type(model_instance)).__name__ + logger.warning( + f"模型 {model_name}(ID:{model_id}) 第 {attempt + 1} 次调用失败: {e}" + ) + + if not _is_retryable(e): + logger.info(f"不可重试错误,跳过模型 {model_id}") + _set_cooldown(model_id) + break + + if attempt < MAX_RETRIES_PER_MODEL: + delay = min( + RETRY_INITIAL_DELAY * (RETRY_BACKOFF_FACTOR ** attempt), + RETRY_MAX_DELAY, + ) + logger.info(f"等待 {delay:.1f}s 后重试...") + time.sleep(delay) + else: + _set_cooldown(model_id) + + # 所有候选都失败了 + error_msg = f"模型调用失败: {last_error}" if last_error else "无法获取回复" + logger.error(error_msg) + return f"抱歉,服务暂时不可用,请稍后再试。", primary_model_id diff --git a/commands/handlers.py b/commands/handlers.py index dccccba..74ae750 100644 --- a/commands/handlers.py +++ b/commands/handlers.py @@ -1,8 +1,9 @@ import re from typing import Optional, Match, Dict, Any -import json # 确保已导入json -from datetime import datetime # 确保已导入datetime -import os # 导入os模块用于文件路径操作 +import json +from datetime import datetime +import os +import time as time_mod from function.func_persona import build_persona_system_prompt @@ -13,9 +14,10 @@ if TYPE_CHECKING: DEFAULT_CHAT_HISTORY = 30 + def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool: """ - 处理闲聊,调用AI模型生成回复 + Agent 入口 —— 处理用户消息,LLM 自主决定是否调用工具。 """ # 获取对应的AI模型 chat_model = None @@ -23,13 +25,13 @@ def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool: chat_model = ctx.chat elif ctx.robot and hasattr(ctx.robot, 'chat'): chat_model = ctx.robot.chat - + if not chat_model: if ctx.logger: - ctx.logger.error("没有可用的AI模型处理闲聊") + ctx.logger.error("没有可用的AI模型") ctx.send_text("抱歉,我现在无法进行对话。") return False - + # 获取特定的历史消息数量限制 raw_specific_max_history = getattr(ctx, 'specific_max_history', None) specific_max_history = None @@ -39,432 +41,93 @@ def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool: except (TypeError, ValueError): specific_max_history = None if specific_max_history is not None: - if specific_max_history < 10: - specific_max_history = 10 - elif specific_max_history > 300: - specific_max_history = 300 + specific_max_history = max(10, min(300, specific_max_history)) if specific_max_history is None: specific_max_history = DEFAULT_CHAT_HISTORY setattr(ctx, 'specific_max_history', specific_max_history) - if ctx.logger: - ctx.logger.debug(f"为 {ctx.get_receiver()} 使用特定历史限制: {specific_max_history}") - - # 处理引用图片情况 + + # ── 引用图片特殊处理 ────────────────────────────────── if getattr(ctx, 'is_quoted_image', False): - ctx.logger.info("检测到引用图片消息,尝试处理图片内容...") - - import os - from ai_providers.ai_chatgpt import ChatGPT - - # 确保是 ChatGPT 类型且支持图片处理 - support_vision = False - if isinstance(chat_model, ChatGPT): - if hasattr(chat_model, 'support_vision') and chat_model.support_vision: - support_vision = True - else: - # 检查模型名称判断是否支持视觉 - if hasattr(chat_model, 'model'): - model_name = getattr(chat_model, 'model', '') - support_vision = model_name == "gpt-4.1-mini" or model_name == "gpt-4o" or "-vision" in model_name - - if not support_vision: - ctx.send_text("抱歉,当前 AI 模型不支持处理图片。请联系管理员配置支持视觉的模型 (如 gpt-4-vision-preview、gpt-4o 等)。") - return True - - # 下载图片并处理 - try: - # 创建临时目录 - temp_dir = "temp/image_cache" - os.makedirs(temp_dir, exist_ok=True) - - # 下载图片 - ctx.logger.info(f"正在下载引用图片: msg_id={ctx.quoted_msg_id}") - image_path = ctx.wcf.download_image( - id=ctx.quoted_msg_id, - extra=ctx.quoted_image_extra, - dir=temp_dir, - timeout=30 - ) - - if not image_path or not os.path.exists(image_path): - ctx.logger.error(f"图片下载失败: {image_path}") - ctx.send_text("抱歉,无法下载图片进行分析。") - return True - - ctx.logger.info(f"图片下载成功: {image_path},准备分析...") - - # 调用 ChatGPT 分析图片 - try: - # 根据用户的提问构建 prompt - prompt = ctx.text - if not prompt or prompt.strip() == "": - prompt = "请详细描述这张图片中的内容" - - # 调用图片分析函数 - response = chat_model.get_image_description(image_path, prompt) - ctx.send_text(response) - - ctx.logger.info("图片分析完成并已发送回复") - except Exception as e: - ctx.logger.error(f"分析图片时出错: {e}") - ctx.send_text(f"分析图片时出错: {str(e)}") - - # 清理临时图片 - try: - if os.path.exists(image_path): - os.remove(image_path) - ctx.logger.info(f"临时图片已删除: {image_path}") - except Exception as e: - ctx.logger.error(f"删除临时图片出错: {e}") - - return True # 已处理,不执行后续的普通文本处理流程 - - except Exception as e: - ctx.logger.error(f"处理引用图片过程中出错: {e}") - ctx.send_text(f"处理图片时发生错误: {str(e)}") - return True # 已处理,即使出错也不执行后续普通文本处理 - - # 获取消息内容 + return _handle_quoted_image(ctx, chat_model) + + # ── 构建用户消息 ────────────────────────────────────── content = ctx.text sender_name = ctx.sender_name - - # 使用XML处理器格式化消息 + if ctx.robot and hasattr(ctx.robot, "xml_processor"): - # 创建格式化的聊天内容(带有引用消息等) if ctx.is_group: - # 处理群聊消息 msg_data = ctx.robot.xml_processor.extract_quoted_message(ctx.msg) - q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, sender_name) else: - # 处理私聊消息 msg_data = ctx.robot.xml_processor.extract_private_quoted_message(ctx.msg) - q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, sender_name) - + q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, sender_name) if not q_with_info: - import time - current_time = time.strftime("%H:%M", time.localtime()) + current_time = time_mod.strftime("%H:%M", time_mod.localtime()) q_with_info = f"[{current_time}] {sender_name}: {content or '[空内容]'}" else: - # 简单格式化 - import time - current_time = time.strftime("%H:%M", time.localtime()) + current_time = time_mod.strftime("%H:%M", time_mod.localtime()) q_with_info = f"[{current_time}] {sender_name}: {content or '[空内容]'}" - - if ctx.is_group and not ctx.is_at_bot and getattr(ctx, 'auto_random_reply', False): + + is_auto_random_reply = getattr(ctx, 'auto_random_reply', False) + + if ctx.is_group and not ctx.is_at_bot and is_auto_random_reply: latest_message_prompt = ( "# 群聊插话提醒\n" "你目前是在群聊里主动接话,没有人点名让你发言。\n" "请根据下面这句(或者你任选一句)最新消息插入一条自然、不突兀的中文回复,语气放松随和即可:\n" - f"“{q_with_info}”\n" + f"\u201c{q_with_info}\u201d\n" "不要重复任何已知的内容,提出新的思维碰撞(例如:基于上下文的新问题、不同角度的解释等,但是不要反驳任何内容),也不要显得过于正式。" ) else: latest_message_prompt = ( "# 本轮需要回复的用户及其最新信息\n" "请你基于下面这条最新收到的用户讯息(和该用户最近的历史消息),直接面向发送者进行自然的中文回复:\n" - f"“{q_with_info}”\n" + f"\u201c{q_with_info}\u201d\n" "请只针对该用户进行回复。" ) - # 获取AI回复 + # ── 构建工具列表 ────────────────────────────────────── + tools = None + tool_handler = None + + # 插嘴模式不使用工具,减少 token 消耗 + if not is_auto_random_reply: + from tools import tool_registry + # 导入工具模块以触发注册(仅首次生效) + import tools.history + import tools.reminder + import tools.web_search + + openai_tools = tool_registry.get_openai_tools() + if openai_tools: + tools = openai_tools + tool_handler = tool_registry.create_handler(ctx) + + # ── 构建系统提示 ────────────────────────────────────── + persona_text = getattr(ctx, 'persona', None) + system_prompt_override = None + if persona_text: + try: + system_prompt_override = build_persona_system_prompt(chat_model, persona_text) + except Exception as persona_exc: + if ctx.logger: + ctx.logger.error(f"构建人设系统提示失败: {persona_exc}", exc_info=True) + + # ── 调用 LLM(Agent 循环在 _execute_with_tools 中)── try: if ctx.logger: - ctx.logger.info(f"【发送内容】将以下消息发送给AI: \n{latest_message_prompt}") - - # 调用AI模型,传递特定历史限制 - tools = None - tool_handler = None - - # 插嘴模式下不使用 function call,减少 token 消耗 - is_auto_random_reply = getattr(ctx, 'auto_random_reply', False) - - if ctx.robot and getattr(ctx.robot, 'message_summary', None) and not is_auto_random_reply: - chat_id = ctx.get_receiver() - message_summary = ctx.robot.message_summary - - visible_history_limit = getattr(ctx, 'specific_max_history', DEFAULT_CHAT_HISTORY) - try: - visible_history_limit = int(visible_history_limit) - except (TypeError, ValueError): - visible_history_limit = DEFAULT_CHAT_HISTORY - if visible_history_limit < 1: - visible_history_limit = DEFAULT_CHAT_HISTORY - - history_lookup_tool = { - "type": "function", - "function": { - "name": "lookup_chat_history", - "description": ( - f"你目前只能看见最近的{visible_history_limit}条消息,所以不一定能设身处地地了解用户。" - "和人交流的过程中,掌握更多的上下文是非常重要的,这可以保证你的回答有温度、真实且有价值。" - "用户不会主动要求你去看上下文,但是你要自己判断需要看什么、看多少、看哪些上下文。" - "请你在回答之前,尽可能地通过查看历史记录来了解用户或事情的全貌,而如果需要查看历史记录消息,那么就请调用此函数。\n" - "调用时必须明确指定 mode(keywords / range / time),并按照以下说明提供参数:\n" - "1. mode=\"keywords\":最常用的模式,用于对关键词进行模糊检索,用户对某些消息进行更深入的理解,在历史记录中找到这些内容的上下文。需要提供 `keywords` 数组(2-4 个与核心相关的词或短语),系统会自动按最新匹配段落返回,函数的返回值中 `segments` 列表包含格式化的 \"时间 昵称 内容\" 行。\n" - f"2. mode=\"range\":用于获取某个倒数的区间内的连续消息块,用于快速找到最近的 n 条消息,只有在对**最近的**记录进行观察时使用。需要提供 `start_offset` 与 `end_offset`(均需 >{visible_history_limit},且 end_offset ≥ start_offset)。偏移基于最新消息的倒数编号,例如 {visible_history_limit + 1}~{visible_history_limit + 90} 表示排除当前可见的消息后,再向前取更多历史。\n" - "3. mode=\"time\":次常用的模式,用于对某段时间内的消息进行检索,比如当提到昨晚、前天、昨天、今早上、上周、去年之类的具体时间的时候使用。需要提供 `start_time`、`end_time`(格式如 2025-05-01 08:00 或 2025-05-01 08:00:00),函数将返回该时间范围内的所有消息。若区间不符合用户需求,可再次调用调整时间。\n" - "函数随时可以多次调用并组合使用:例如先用 keywords 找锚点,再用 range/time 取更大上下文。" - ), - "parameters": { - "type": "object", - "properties": { - "mode": { - "type": "string", - "description": "One of keywords, range, time.", - "enum": ["keywords", "range", "time"] - }, - "keywords": { - "type": "array", - "description": "Keywords for fuzzy search when mode=keywords.", - "items": {"type": "string"} - }, - "start_offset": { - "type": "integer", - "description": f"Smaller offset counted from the latest message (>{visible_history_limit}) when mode=range." - }, - "end_offset": { - "type": "integer", - "description": f"Larger offset counted from the latest message (>{visible_history_limit}) when mode=range." - }, - "start_time": { - "type": "string", - "description": "Start timestamp when mode=time (e.g., 2025-05-01 08:00[:00])." - }, - "end_time": { - "type": "string", - "description": "End timestamp when mode=time (e.g., 2025-05-01 12:00[:00])." - } - }, - "additionalProperties": False - } - } - } - - def handle_tool_call(tool_name: str, arguments: Dict[str, Any]) -> str: - try: - if tool_name != "lookup_chat_history": - return json.dumps({"error": f"Unknown tool '{tool_name}'"}, ensure_ascii=False) - - mode = (arguments.get("mode") or "").strip().lower() - keywords = arguments.get("keywords") - start_offset = arguments.get("start_offset") - end_offset = arguments.get("end_offset") - start_time = arguments.get("start_time") - end_time = arguments.get("end_time") - - inferred_mode = mode - if not inferred_mode: - if start_time and end_time: - inferred_mode = "time" - elif start_offset is not None and end_offset is not None: - inferred_mode = "range" - elif keywords: - inferred_mode = "keywords" - else: - inferred_mode = "keywords" - - print(f"[lookup_chat_history] inferred_mode={inferred_mode}, raw_args={arguments}") - if ctx.logger: - ctx.logger.info(f"[lookup_chat_history] inferred_mode={inferred_mode}, raw_args={arguments}") - - if inferred_mode == "keywords": - keywords = arguments.get("keywords", []) - if isinstance(keywords, str): - keywords = [keywords] - elif not isinstance(keywords, list): - keywords = [] - - cleaned_keywords = [] - for kw in keywords: - if kw is None: - continue - kw_str = str(kw).strip() - if kw_str: - if len(kw_str) == 1 and not kw_str.isdigit(): - continue - cleaned_keywords.append(kw_str) - - # 去重同时保持顺序 - seen = set() - deduped_keywords = [] - for kw in cleaned_keywords: - lower_kw = kw.lower() - if lower_kw not in seen: - seen.add(lower_kw) - deduped_keywords.append(kw) - - if not deduped_keywords: - return json.dumps({"error": "No valid keywords provided.", "results": []}, ensure_ascii=False) - - context_window = 10 - max_results = 20 - - print(f"[search_chat_history] chat_id={chat_id}, keywords={deduped_keywords}, " - f"context_window={context_window}, max_results={max_results}") - if ctx.logger: - ctx.logger.info( - f"[search_chat_history] keywords={deduped_keywords}, " - f"context_window={context_window}, max_results={max_results}" - ) - - search_results = message_summary.search_messages_with_context( - chat_id=chat_id, - keywords=deduped_keywords, - context_window=context_window, - max_groups=max_results, - exclude_recent=visible_history_limit - ) - - segments = [] - lines_seen = set() - for segment in search_results: - formatted = [] - for line in segment.get("formatted_messages", []): - if line not in lines_seen: - lines_seen.add(line) - formatted.append(line) - if not formatted: - continue - segments.append({ - "matched_keywords": segment.get("matched_keywords", []), - "messages": formatted - }) - - response_payload = { - "segments": segments, - "returned_groups": len(segments), - "keywords": deduped_keywords - } - - print(f"[search_chat_history] returned_groups={len(segments)}") - if ctx.logger: - ctx.logger.info(f"[search_chat_history] returned_groups={len(segments)}") - - if not segments: - response_payload["notice"] = "No messages matched the provided keywords." - - return json.dumps(response_payload, ensure_ascii=False) - - elif inferred_mode == "range": - if start_offset is None or end_offset is None: - return json.dumps({"error": "start_offset and end_offset are required."}, ensure_ascii=False) - - try: - start_offset = int(start_offset) - end_offset = int(end_offset) - except (TypeError, ValueError): - return json.dumps({"error": "start_offset and end_offset must be integers."}, ensure_ascii=False) - - if start_offset <= visible_history_limit or end_offset <= visible_history_limit: - return json.dumps( - {"error": f"Offsets must be greater than {visible_history_limit} to avoid visible messages."}, - ensure_ascii=False - ) - - if start_offset > end_offset: - start_offset, end_offset = end_offset, start_offset - - print(f"[fetch_chat_history_range] chat_id={chat_id}, start_offset={start_offset}, " - f"end_offset={end_offset}") - if ctx.logger: - ctx.logger.info( - f"[fetch_chat_history_range] start_offset={start_offset}, " - f"end_offset={end_offset}" - ) - - range_result = message_summary.get_messages_by_reverse_range( - chat_id=chat_id, - start_offset=start_offset, - end_offset=end_offset - ) - - response_payload = { - "start_offset": range_result.get("start_offset"), - "end_offset": range_result.get("end_offset"), - "messages": range_result.get("messages", []), - "returned_count": range_result.get("returned_count", 0), - "total_messages": range_result.get("total_messages", 0) - } - - print(f"[fetch_chat_history_range] returned_count={response_payload['returned_count']}") - if ctx.logger: - ctx.logger.info( - f"[fetch_chat_history_range] returned_count={response_payload['returned_count']}" - ) - - if response_payload["returned_count"] == 0: - response_payload["notice"] = "No messages available in the requested range." - - return json.dumps(response_payload, ensure_ascii=False) - - elif inferred_mode == "time": - if not start_time or not end_time: - return json.dumps({"error": "start_time and end_time are required."}, ensure_ascii=False) - - print(f"[fetch_chat_history_time_window] chat_id={chat_id}, start_time={start_time}, end_time={end_time}") - if ctx.logger: - ctx.logger.info( - f"[fetch_chat_history_time_window] start_time={start_time}, end_time={end_time}" - ) - - time_lines = message_summary.get_messages_by_time_window( - chat_id=chat_id, - start_time=start_time, - end_time=end_time - ) - - response_payload = { - "start_time": start_time, - "end_time": end_time, - "messages": time_lines, - "returned_count": len(time_lines) - } - - print(f"[fetch_chat_history_time_window] returned_count={response_payload['returned_count']}") - if ctx.logger: - ctx.logger.info( - f"[fetch_chat_history_time_window] returned_count={response_payload['returned_count']}" - ) - - if response_payload["returned_count"] == 0: - response_payload["notice"] = "No messages found within the requested time window." - - return json.dumps(response_payload, ensure_ascii=False) - - else: - return json.dumps({"error": f"Unsupported mode '{inferred_mode}'"}, ensure_ascii=False) - - except Exception as tool_exc: - if ctx.logger: - ctx.logger.error(f"历史搜索工具调用失败: {tool_exc}", exc_info=True) - return json.dumps( - {"error": f"History tool failed: {tool_exc.__class__.__name__}"}, - ensure_ascii=False - ) - - tools = [history_lookup_tool] - tool_handler = handle_tool_call - - persona_text = getattr(ctx, 'persona', None) - system_prompt_override = None - if persona_text: - try: - system_prompt_override = build_persona_system_prompt(chat_model, persona_text) - except Exception as persona_exc: - if ctx.logger: - ctx.logger.error(f"构建人设系统提示失败: {persona_exc}", exc_info=True) - system_prompt_override = None + tool_names = [t["function"]["name"] for t in tools] if tools else [] + ctx.logger.info(f"Agent 调用: tools={tool_names}") rsp = chat_model.get_answer( - question=latest_message_prompt, + question=latest_message_prompt, wxid=ctx.get_receiver(), system_prompt_override=system_prompt_override, specific_max_history=specific_max_history, tools=tools, tool_handler=tool_handler, - tool_max_iterations=10 + tool_max_iterations=10, ) - + if rsp: ctx.send_text(rsp, "") return True @@ -474,9 +137,59 @@ def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool: return False except Exception as e: if ctx.logger: - ctx.logger.error(f"获取AI回复时出错: {e}") + ctx.logger.error(f"获取AI回复时出错: {e}", exc_info=True) return False + +def _handle_quoted_image(ctx, chat_model) -> bool: + """处理引用图片消息。""" + if ctx.logger: + ctx.logger.info("检测到引用图片消息,尝试处理图片内容...") + + from ai_providers.ai_chatgpt import ChatGPT + + support_vision = False + if isinstance(chat_model, ChatGPT): + if hasattr(chat_model, 'support_vision') and chat_model.support_vision: + support_vision = True + elif hasattr(chat_model, 'model'): + model_name = getattr(chat_model, 'model', '') + support_vision = model_name in ("gpt-4.1-mini", "gpt-4o") or "-vision" in model_name + + if not support_vision: + ctx.send_text("抱歉,当前 AI 模型不支持处理图片。请联系管理员配置支持视觉的模型。") + return True + + try: + temp_dir = "temp/image_cache" + os.makedirs(temp_dir, exist_ok=True) + + image_path = ctx.wcf.download_image( + id=ctx.quoted_msg_id, extra=ctx.quoted_image_extra, + dir=temp_dir, timeout=30, + ) + + if not image_path or not os.path.exists(image_path): + ctx.send_text("抱歉,无法下载图片进行分析。") + return True + + prompt = ctx.text if ctx.text and ctx.text.strip() else "请详细描述这张图片中的内容" + response = chat_model.get_image_description(image_path, prompt) + ctx.send_text(response) + + try: + if os.path.exists(image_path): + os.remove(image_path) + except Exception: + pass + return True + + except Exception as e: + if ctx.logger: + ctx.logger.error(f"处理引用图片出错: {e}", exc_info=True) + ctx.send_text(f"处理图片时发生错误: {str(e)}") + return True + def handle_perplexity_ask(ctx: 'MessageContext', match: Optional[Match]) -> bool: """ 处理 "ask" 命令,调用 Perplexity AI diff --git a/function/func_summary.py b/function/func_summary.py index 864ed64..dd3a747 100644 --- a/function/func_summary.py +++ b/function/func_summary.py @@ -784,3 +784,77 @@ class MessageSummary: collected.reverse() return collected + + # ── 上下文压缩 ──────────────────────────────────────── + + def get_compressed_context(self, chat_id, max_context_chars=8000, max_recent=None): + """返回压缩后的上下文:近期完整消息 + 早期消息摘要。 + + 使用字符预算而非固定条数,短消息多保留、长消息少保留,充分利用上下文窗口。 + + Args: + chat_id: 聊天 ID + max_context_chars: 近期消息的字符预算(粗略对应 token 数的 2 倍) + max_recent: 近期消息条数硬上限,None 表示不限制 + + Returns: + tuple: (recent_messages, summary_text) + recent_messages: 按时间升序的消息 dict 列表 + summary_text: 早期消息的压缩摘要,无需压缩时为 None + """ + messages = self.get_messages(chat_id) + if not messages: + return [], None + + # 从最新消息倒序填充,直到字符预算或条数上限耗尽 + char_budget = max_context_chars + recent = [] + cutoff_idx = len(messages) + + for i in range(len(messages) - 1, -1, -1): + msg = messages[i] + content = msg.get("content", "") + if _is_internal_tool_message(content): + continue + sender = msg.get("sender", "") + msg_chars = len(sender) + 2 + len(content) # "sender: content" + + if char_budget - msg_chars < 0 and recent: + break # 预算耗尽 + + char_budget -= msg_chars + recent.insert(0, msg) + cutoff_idx = i + + if max_recent and len(recent) >= max_recent: + break + + # 没有更早的消息,无需压缩 + older = [m for m in messages[:cutoff_idx] + if not _is_internal_tool_message(m.get("content", ""))] + if not older: + return recent, None + + # 压缩早期消息 + summary_budget = min(2000, max(500, char_budget)) + summary = self._compress_messages(older, summary_budget) + return recent, summary + + def _compress_messages(self, messages, max_chars=2000): + """将消息列表压缩成简短文本,保留最近的信息优先。""" + lines = [] + for msg in messages: + line = f"[{msg.get('time', '')}] {msg.get('sender', '')}: {msg.get('content', '')}" + lines.append(line) + + text = "\n".join(lines) + if len(text) <= max_chars: + return text + + # 超预算:保留最近的部分(靠后的消息更重要) + text = text[-max_chars:] + newline_idx = text.find("\n") + if 0 < newline_idx < 100: + text = text[newline_idx + 1:] + + return f"(earlier messages omitted)\n{text}" diff --git a/robot.py b/robot.py index 6e206d2..4b8d7bd 100644 --- a/robot.py +++ b/robot.py @@ -6,9 +6,7 @@ import time import xml.etree.ElementTree as ET from queue import Empty from threading import Thread -import os import random -import shutil import copy from image.img_manager import ImageGenerationManager @@ -38,10 +36,6 @@ from commands.handlers import handle_chitchat # 导入闲聊处理函数 from commands.keyword_triggers import KeywordTriggerProcessor from commands.message_forwarder import MessageForwarder -# 导入AI路由系统 -from commands.ai_router import ai_router -import commands.ai_functions # 导入以注册所有AI功能 - __version__ = "39.2.4.0" @@ -264,8 +258,8 @@ class Robot(Job): # 初始化图像生成管理器 self.image_manager = ImageGenerationManager(self.config, self.wcf, self.LOG, self.sendTextMsg) - # 初始化AI路由器 - self.LOG.info(f"AI路由系统初始化完成,共加载 {len(ai_router.functions)} 个AI功能") + # 工具系统在首次 handle_chitchat 调用时自动加载 + self.LOG.info("Agent 工具系统就绪(延迟加载)") # 初始化提醒管理器 try: @@ -363,99 +357,64 @@ class Robot(Job): return if ctx.reasoning_requested: - self.LOG.info("检测到推理模式触发词,跳过AI路由,直接进入闲聊推理模式。") + self.LOG.info("检测到推理模式触发词,直接进入推理模式。") self._handle_chitchat(ctx, None) return - handled = False - - # 5. 优先尝试使用AI路由器处理消息(仅限私聊或@机器人) - if (msg.from_group() and group_enabled and msg.is_at(self.wxid)) or not msg.from_group(): - self.LOG.debug(f"[AI路由调试] 准备调用AI路由器处理消息: {msg.content}") - handled = ai_router.dispatch(ctx) - self.LOG.debug(f"[AI路由调试] AI路由器处理结果: {handled}") - router_decision = getattr(ctx, 'router_decision', None) - if router_decision: - action_type = router_decision.get("action_type") - if action_type == "chat": - if router_decision.get("enable_reasoning"): - self.LOG.info("AI路由器请求启用推理模式处理聊天消息") - ctx.reasoning_requested = ctx.reasoning_requested or bool(router_decision.get("enable_reasoning")) - else: - if ctx.reasoning_requested: - self.LOG.debug("AI路由器选择了非聊天模式,关闭推理模式") - ctx.reasoning_requested = False - if handled: - self.LOG.info("消息已由AI路由器处理") - self.LOG.debug("[AI路由调试] 消息已成功由AI路由器处理") - return + # 5. 特殊消息处理(非 AI 决策) + if msg.type == 37: # 好友请求 + if getattr(self.config, "AUTO_ACCEPT_FRIEND_REQUEST", False): + self.LOG.info("检测到好友请求,自动通过。") + self.autoAcceptFriendRequest(msg) else: - self.LOG.warning("[AI路由调试] AI路由器未处理该消息") + self.LOG.info("检测到好友请求,保持待处理。") + return - # 6. 如果AI路由器未处理,则进行特殊逻辑处理 - if not handled: - # 7.1 好友请求自动处理 - if msg.type == 37: # 好友请求 - if getattr(self.config, "AUTO_ACCEPT_FRIEND_REQUEST", False): - self.LOG.info("检测到好友请求,自动通过开关已启用,准备同意。") - self.autoAcceptFriendRequest(msg) - else: - self.LOG.info("检测到好友请求,自动通过开关已关闭,保持待处理状态。") - return - - # 7.2 系统消息处理 - elif msg.type == 10000: - # 7.2.1 处理新成员入群 - if ( - "加入了群聊" in msg.content - and msg.from_group() - and msg.roomid in getattr(self.config, "GROUPS", []) - ): - new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content) - if new_member_match: - inviter = new_member_match.group(1) # 邀请人 - new_member = new_member_match.group(2) # 新成员 - # 使用配置文件中的欢迎语,支持变量替换 - welcome_msg = self.config.WELCOME_MSG.format(new_member=new_member, inviter=inviter) - self.sendTextMsg(welcome_msg, msg.roomid) - self.LOG.info(f"已发送欢迎消息给新成员 {new_member} 在群 {msg.roomid}") - return - # 7.2.2 处理新好友添加 - elif "你已添加了" in msg.content: - self.sayHiToNewFriend(msg) - return - - # 7.3 群聊消息,且配置了响应该群 - if msg.from_group() and msg.roomid in self.config.GROUPS: - # 如果在群里被@了,但AI路由器未处理,则进行闲聊 - if msg.is_at(self.wxid): - # 调用handle_chitchat函数处理闲聊,传递完整的上下文 - self._handle_chitchat(ctx, None) - else: - can_auto_reply = ( - not msg.from_self() - and ctx.text - and (msg.type == 1 or (msg.type == 49 and ctx.text)) - ) - if can_auto_reply: - rate = self._prepare_group_random_reply_current_rate(msg.roomid) - if rate > 0: - rand_val = random.random() - if rand_val < rate: - self.LOG.info( - f"触发群聊主动闲聊回复: 群={msg.roomid}, 当前概率={rate:.2f}, 随机值={rand_val:.2f}" - ) - setattr(ctx, 'auto_random_reply', True) - self._handle_chitchat(ctx, None) - self._apply_group_random_reply_decay(msg.roomid) - - # 7.4 私聊消息,未被命令处理,进行闲聊 - elif not msg.from_group() and not msg.from_self(): - # 检查是否是文本消息(type 1)或者是包含用户输入的类型49消息 - if msg.type == 1 or (msg.type == 49 and ctx.text): - self.LOG.info(f"准备回复私聊消息: 类型={msg.type}, 文本内容='{ctx.text}'") - # 调用handle_chitchat函数处理闲聊,传递完整的上下文 - self._handle_chitchat(ctx, None) + if msg.type == 10000: # 系统消息 + if ( + "加入了群聊" in msg.content + and msg.from_group() + and msg.roomid in getattr(self.config, "GROUPS", []) + ): + new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content) + if new_member_match: + inviter = new_member_match.group(1) + new_member = new_member_match.group(2) + welcome_msg = self.config.WELCOME_MSG.format(new_member=new_member, inviter=inviter) + self.sendTextMsg(welcome_msg, msg.roomid) + return + + if msg.type == 10000 and "你已添加了" in msg.content: + self.sayHiToNewFriend(msg) + return + + # 6. Agent 响应:LLM 自主决定调什么工具 + # 6.1 群聊:@机器人 或 随机插嘴 + if msg.from_group() and msg.roomid in self.config.GROUPS: + if msg.is_at(self.wxid): + self._handle_chitchat(ctx, None) + else: + can_auto_reply = ( + not msg.from_self() + and ctx.text + and (msg.type == 1 or (msg.type == 49 and ctx.text)) + ) + if can_auto_reply: + rate = self._prepare_group_random_reply_current_rate(msg.roomid) + if rate > 0: + rand_val = random.random() + if rand_val < rate: + self.LOG.info( + f"触发群聊主动闲聊: 群={msg.roomid}, 概率={rate:.2f}, 随机值={rand_val:.2f}" + ) + setattr(ctx, 'auto_random_reply', True) + self._handle_chitchat(ctx, None) + self._apply_group_random_reply_decay(msg.roomid) + + # 6.2 私聊 + elif not msg.from_group() and not msg.from_self(): + if msg.type == 1 or (msg.type == 49 and ctx.text): + self._handle_chitchat(ctx, None) except Exception as e: self.LOG.error(f"处理消息时发生错误: {str(e)}", exc_info=True) @@ -668,38 +627,73 @@ class Robot(Job): return None return self.reasoning_chat_models.get(model_id) + def _get_fallback_model_ids(self) -> list: + """从配置中读取全局 fallback 模型 ID 列表。""" + if not hasattr(self.config, "GROUP_MODELS"): + return [] + raw = self.config.GROUP_MODELS.get("fallbacks", []) + if isinstance(raw, list): + return [int(x) for x in raw if isinstance(x, (int, float, str))] + return [] + def _handle_chitchat(self, ctx, match=None): - """统一处理闲聊,自动切换推理模型""" - # force_reasoning 配置会强制使用推理模型 + """统一处理消息,支持推理模式切换和模型 Fallback。""" force_reasoning = bool(getattr(ctx, 'force_reasoning', False)) reasoning_requested = bool(getattr(ctx, 'reasoning_requested', False)) or force_reasoning - previous_ctx_chat = getattr(ctx, 'chat', None) - reasoning_chat = None + original_chat = getattr(ctx, 'chat', None) if reasoning_requested: if force_reasoning: - self.LOG.info("群配置了 force_reasoning,闲聊将使用推理模型。") + self.LOG.info("群配置了 force_reasoning,将使用推理模型。") else: self.LOG.info("检测到推理模式请求,将启用深度思考。") ctx.send_text("正在深度思考,请稍候...", record_message=False) reasoning_chat = self._get_reasoning_chat_model() if reasoning_chat: ctx.chat = reasoning_chat - model_label = self._describe_chat_model(reasoning_chat, reasoning=True) - self.LOG.debug(f"使用推理模型 {model_label} 处理消息") else: - self.LOG.warning("当前模型未配置推理模型,使用默认模型处理深度思考请求") + self.LOG.warning("当前模型未配置推理模型,使用默认模型") + + # 构建候选模型列表:当前模型 + fallback + primary_id = getattr(self, 'current_model_id', None) + fallback_ids = self._get_fallback_model_ids() + candidate_ids = [] + if primary_id is not None: + candidate_ids.append(primary_id) + for fid in fallback_ids: + if fid not in candidate_ids and fid in self.chat_models: + candidate_ids.append(fid) handled = False - try: - handled = handle_chitchat(ctx, match) - finally: - if reasoning_chat and previous_ctx_chat is not None: - ctx.chat = previous_ctx_chat + for i, model_id in enumerate(candidate_ids): + if i > 0: + # 切换到 fallback 模型 + fallback_model = self.chat_models[model_id] + if reasoning_requested: + fallback_reasoning = self.reasoning_chat_models.get(model_id) + ctx.chat = fallback_reasoning or fallback_model + else: + ctx.chat = fallback_model + model_name = getattr(ctx.chat, '__class__', type(ctx.chat)).__name__ + self.LOG.info(f"Fallback: 切换到模型 {model_name}(ID:{model_id})") - if reasoning_requested and not handled: - self.LOG.warning("推理模式处理消息失败,向用户返回降级提示") - ctx.send_text("抱歉,深度思考暂时遇到问题,请稍后再试。") + try: + handled = handle_chitchat(ctx, match) + if handled: + break + except Exception as e: + self.LOG.warning(f"模型 {model_id} 调用失败: {e}") + continue + + # 恢复原始模型 + if original_chat is not None: + ctx.chat = original_chat + + if not handled: + if reasoning_requested: + ctx.send_text("抱歉,深度思考暂时遇到问题,请稍后再试。") + else: + ctx.send_text("抱歉,服务暂时不可用,请稍后再试。") return handled diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..29c34bc --- /dev/null +++ b/tools/__init__.py @@ -0,0 +1,80 @@ +""" +工具系统 —— 让 LLM 在 Agent 循环中自主调用工具。 + +每个 Tool 提供 OpenAI function-calling 格式的 schema 和一个同步执行函数。 +ToolRegistry 汇总所有工具,生成 tools 列表和统一的 tool_handler。 +""" + +import json +import logging +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class Tool: + """LLM 可调用的工具。""" + name: str + description: str + parameters: dict # JSON Schema + handler: Callable[..., str] = None # (ctx, **kwargs) -> str + + def to_openai_schema(self) -> dict: + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.parameters, + }, + } + + +class ToolRegistry: + """收集工具,为 Agent 循环提供 tools + tool_handler。""" + + def __init__(self): + self._tools: Dict[str, Tool] = {} + + def register(self, tool: Tool) -> None: + self._tools[tool.name] = tool + logger.info(f"注册工具: {tool.name}") + + def get(self, name: str) -> Optional[Tool]: + return self._tools.get(name) + + @property + def tools(self) -> Dict[str, Tool]: + return dict(self._tools) + + def get_openai_tools(self) -> List[dict]: + """返回所有工具的 OpenAI function-calling schema 列表。""" + return [t.to_openai_schema() for t in self._tools.values()] + + def create_handler(self, ctx: Any) -> Callable[[str, dict], str]: + """创建一个绑定了消息上下文的 tool_handler 函数。""" + registry = self._tools + + def handler(tool_name: str, arguments: dict) -> str: + tool = registry.get(tool_name) + if not tool: + return json.dumps( + {"error": f"Unknown tool: {tool_name}"}, + ensure_ascii=False, + ) + try: + result = tool.handler(ctx, **arguments) + if not isinstance(result, str): + result = json.dumps(result, ensure_ascii=False) + return result + except Exception as e: + logger.error(f"工具 {tool_name} 执行失败: {e}", exc_info=True) + return json.dumps({"error": str(e)}, ensure_ascii=False) + + return handler + + +# ── 全局工具注册表 ────────────────────────────────────────── +tool_registry = ToolRegistry() diff --git a/tools/history.py b/tools/history.py new file mode 100644 index 0000000..c9ad78a --- /dev/null +++ b/tools/history.py @@ -0,0 +1,189 @@ +"""聊天历史查询工具 —— 从 handlers.py 的内联定义中提取而来。 + +支持三种查询模式: + keywords — 关键词模糊搜索 + range — 按倒序偏移取连续消息 + time — 按时间窗口取消息 +""" + +import json + +from tools import Tool, tool_registry + +DEFAULT_VISIBLE_LIMIT = 30 + + +def _handle_lookup_chat_history(ctx, mode: str = "", keywords: list = None, + start_offset: int = None, end_offset: int = None, + start_time: str = None, end_time: str = None, + **_) -> str: + message_summary = getattr(ctx.robot, "message_summary", None) if ctx.robot else None + if not message_summary: + return json.dumps({"error": "消息历史功能不可用"}, ensure_ascii=False) + + chat_id = ctx.get_receiver() + visible_limit = DEFAULT_VISIBLE_LIMIT + raw = getattr(ctx, "specific_max_history", None) + if raw is not None: + try: + visible_limit = int(raw) + except (TypeError, ValueError): + pass + + # 推断模式 + mode = (mode or "").strip().lower() + if not mode: + if start_time and end_time: + mode = "time" + elif start_offset is not None and end_offset is not None: + mode = "range" + else: + mode = "keywords" + + # ── keywords ──────────────────────────────────────────── + if mode == "keywords": + if isinstance(keywords, str): + keywords = [keywords] + elif not isinstance(keywords, list): + keywords = [] + + cleaned = [] + seen = set() + for kw in keywords: + if kw is None: + continue + s = str(kw).strip() + if s and (len(s) > 1 or s.isdigit()): + low = s.lower() + if low not in seen: + seen.add(low) + cleaned.append(s) + + if not cleaned: + return json.dumps({"error": "未提供有效关键词", "results": []}, ensure_ascii=False) + + search_results = message_summary.search_messages_with_context( + chat_id=chat_id, + keywords=cleaned, + context_window=10, + max_groups=20, + exclude_recent=visible_limit, + ) + + segments = [] + lines_seen = set() + for seg in search_results: + formatted = [l for l in seg.get("formatted_messages", []) if l not in lines_seen] + lines_seen.update(formatted) + if formatted: + segments.append({ + "matched_keywords": seg.get("matched_keywords", []), + "messages": formatted, + }) + + payload = {"segments": segments, "returned_groups": len(segments), "keywords": cleaned} + if not segments: + payload["notice"] = "未找到匹配的消息。" + return json.dumps(payload, ensure_ascii=False) + + # ── range ─────────────────────────────────────────────── + if mode == "range": + if start_offset is None or end_offset is None: + return json.dumps({"error": "range 模式需要 start_offset 和 end_offset"}, ensure_ascii=False) + try: + start_offset, end_offset = int(start_offset), int(end_offset) + except (TypeError, ValueError): + return json.dumps({"error": "start_offset 和 end_offset 必须是整数"}, ensure_ascii=False) + + if start_offset <= visible_limit or end_offset <= visible_limit: + return json.dumps( + {"error": f"偏移量必须大于 {visible_limit} 以排除当前可见消息"}, + ensure_ascii=False, + ) + if start_offset > end_offset: + start_offset, end_offset = end_offset, start_offset + + result = message_summary.get_messages_by_reverse_range( + chat_id=chat_id, start_offset=start_offset, end_offset=end_offset, + ) + payload = { + "start_offset": result.get("start_offset"), + "end_offset": result.get("end_offset"), + "messages": result.get("messages", []), + "returned_count": result.get("returned_count", 0), + "total_messages": result.get("total_messages", 0), + } + if payload["returned_count"] == 0: + payload["notice"] = "请求范围内没有消息。" + return json.dumps(payload, ensure_ascii=False) + + # ── time ──────────────────────────────────────────────── + if mode == "time": + if not start_time or not end_time: + return json.dumps({"error": "time 模式需要 start_time 和 end_time"}, ensure_ascii=False) + + time_lines = message_summary.get_messages_by_time_window( + chat_id=chat_id, start_time=start_time, end_time=end_time, + ) + payload = { + "start_time": start_time, + "end_time": end_time, + "messages": time_lines, + "returned_count": len(time_lines), + } + if not time_lines: + payload["notice"] = "该时间范围内没有消息。" + return json.dumps(payload, ensure_ascii=False) + + return json.dumps({"error": f"不支持的模式: {mode}"}, ensure_ascii=False) + + +# ── 注册 ──────────────────────────────────────────────────── + +tool_registry.register(Tool( + name="lookup_chat_history", + description=( + "查询聊天历史记录。你当前只能看到最近的消息,调用此工具可以回溯更早的上下文。" + "支持三种模式:\n" + "1. mode=\"keywords\" — 用关键词模糊搜索历史消息,返回匹配片段及上下文。" + " 需要 keywords 数组(2-4 个关键词)。\n" + "2. mode=\"range\" — 按倒序偏移获取连续消息块。" + " 需要 start_offset 和 end_offset(均需大于当前可见消息数)。\n" + "3. mode=\"time\" — 按时间窗口获取消息。" + " 需要 start_time 和 end_time(格式如 2025-05-01 08:00)。\n" + "可多次调用,例如先用 keywords 找到锚点,再用 range/time 扩展上下文。" + ), + parameters={ + "type": "object", + "properties": { + "mode": { + "type": "string", + "enum": ["keywords", "range", "time"], + "description": "查询模式", + }, + "keywords": { + "type": "array", + "items": {"type": "string"}, + "description": "mode=keywords 时的搜索关键词", + }, + "start_offset": { + "type": "integer", + "description": "mode=range 时的起始偏移(从最新消息倒数)", + }, + "end_offset": { + "type": "integer", + "description": "mode=range 时的结束偏移", + }, + "start_time": { + "type": "string", + "description": "mode=time 时的开始时间 (YYYY-MM-DD HH:MM)", + }, + "end_time": { + "type": "string", + "description": "mode=time 时的结束时间 (YYYY-MM-DD HH:MM)", + }, + }, + "additionalProperties": False, + }, + handler=_handle_lookup_chat_history, +)) diff --git a/tools/reminder.py b/tools/reminder.py new file mode 100644 index 0000000..cae13b1 --- /dev/null +++ b/tools/reminder.py @@ -0,0 +1,164 @@ +"""提醒工具 —— 创建 / 查看 / 删除提醒。 + +LLM 直接传入结构化参数,不再需要二级路由或二次 AI 解析。 +""" + +import json +from datetime import datetime + +from tools import Tool, tool_registry + + +# ── 创建提醒 ──────────────────────────────────────────────── + +def _handle_reminder_create(ctx, type: str = "once", time: str = "", + content: str = "", weekday: int = None, **_) -> str: + if not hasattr(ctx.robot, "reminder_manager"): + return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False) + + if not time or not content: + return json.dumps({"error": "缺少必要字段: time 和 content"}, ensure_ascii=False) + + if len(content.strip()) < 2: + return json.dumps({"error": "提醒内容太短"}, ensure_ascii=False) + + # 校验时间格式 + if type == "once": + parsed_dt = None + for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"): + try: + parsed_dt = datetime.strptime(time, fmt) + break + except ValueError: + continue + if not parsed_dt: + return json.dumps({"error": f"once 类型时间格式应为 YYYY-MM-DD HH:MM,收到: {time}"}, ensure_ascii=False) + if parsed_dt < datetime.now(): + return json.dumps({"error": f"时间 {time} 已过去,请使用未来的时间"}, ensure_ascii=False) + time = parsed_dt.strftime("%Y-%m-%d %H:%M") + + elif type in ("daily", "weekly"): + parsed_time = None + for fmt in ("%H:%M", "%H:%M:%S"): + try: + parsed_time = datetime.strptime(time, fmt) + break + except ValueError: + continue + if not parsed_time: + return json.dumps({"error": f"daily/weekly 类型时间格式应为 HH:MM,收到: {time}"}, ensure_ascii=False) + time = parsed_time.strftime("%H:%M") + else: + return json.dumps({"error": f"不支持的提醒类型: {type}"}, ensure_ascii=False) + + if type == "weekly": + if weekday is None or not (isinstance(weekday, int) and 0 <= weekday <= 6): + return json.dumps({"error": "weekly 类型需要 weekday 参数 (0=周一 … 6=周日)"}, ensure_ascii=False) + + data = {"type": type, "time": time, "content": content, "extra": {}} + if weekday is not None: + data["weekday"] = weekday + + roomid = ctx.msg.roomid if ctx.is_group else None + success, result = ctx.robot.reminder_manager.add_reminder(ctx.msg.sender, data, roomid=roomid) + + if success: + type_label = {"once": "一次性", "daily": "每日", "weekly": "每周"}.get(type, type) + return json.dumps({"success": True, "id": result, + "message": f"已创建{type_label}提醒: {time} - {content}"}, ensure_ascii=False) + return json.dumps({"success": False, "error": result}, ensure_ascii=False) + + +# ── 查看提醒 ──────────────────────────────────────────────── + +def _handle_reminder_list(ctx, **_) -> str: + if not hasattr(ctx.robot, "reminder_manager"): + return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False) + + reminders = ctx.robot.reminder_manager.list_reminders(ctx.msg.sender) + if not reminders: + return json.dumps({"reminders": [], "message": "当前没有任何提醒"}, ensure_ascii=False) + return json.dumps({"reminders": reminders, "count": len(reminders)}, ensure_ascii=False) + + +# ── 删除提醒 ──────────────────────────────────────────────── + +def _handle_reminder_delete(ctx, reminder_id: str = "", delete_all: bool = False, **_) -> str: + if not hasattr(ctx.robot, "reminder_manager"): + return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False) + + if delete_all: + success, message, count = ctx.robot.reminder_manager.delete_all_reminders(ctx.msg.sender) + return json.dumps({"success": success, "message": message, "deleted_count": count}, ensure_ascii=False) + + if not reminder_id: + return json.dumps({"error": "请提供 reminder_id,或设置 delete_all=true 删除全部"}, ensure_ascii=False) + + success, message = ctx.robot.reminder_manager.delete_reminder(ctx.msg.sender, reminder_id) + return json.dumps({"success": success, "message": message}, ensure_ascii=False) + + +# ── 注册 ──────────────────────────────────────────────────── + +tool_registry.register(Tool( + name="reminder_create", + description=( + "创建提醒。支持 once(一次性)、daily(每日)、weekly(每周) 三种类型。" + "当前时间已在对话上下文中提供,请据此计算目标时间。" + ), + parameters={ + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": ["once", "daily", "weekly"], + "description": "提醒类型", + }, + "time": { + "type": "string", + "description": "once → YYYY-MM-DD HH:MM;daily/weekly → HH:MM", + }, + "content": { + "type": "string", + "description": "提醒内容", + }, + "weekday": { + "type": "integer", + "description": "仅 weekly 需要。0=周一 … 6=周日", + }, + }, + "required": ["type", "time", "content"], + "additionalProperties": False, + }, + handler=_handle_reminder_create, +)) + +tool_registry.register(Tool( + name="reminder_list", + description="查看当前用户的所有提醒列表。", + parameters={"type": "object", "properties": {}, "additionalProperties": False}, + handler=_handle_reminder_list, +)) + +tool_registry.register(Tool( + name="reminder_delete", + description=( + "删除提醒。需要先调用 reminder_list 获取 ID,再用 reminder_id 精确删除;" + "或设置 delete_all=true 一次性删除全部。" + ), + parameters={ + "type": "object", + "properties": { + "reminder_id": { + "type": "string", + "description": "要删除的提醒完整 ID", + }, + "delete_all": { + "type": "boolean", + "description": "是否删除该用户全部提醒", + }, + }, + "additionalProperties": False, + }, + handler=_handle_reminder_delete, +)) diff --git a/tools/web_search.py b/tools/web_search.py new file mode 100644 index 0000000..384aa83 --- /dev/null +++ b/tools/web_search.py @@ -0,0 +1,61 @@ +"""网络搜索工具 —— 通过 Perplexity 联网搜索。 + +直接调用 perplexity.get_answer() 获取同步结果, +结果回传给 LLM 做综合回答,而非直接发送给用户。 +""" + +import json +import re + +from tools import Tool, tool_registry + + +def _handle_web_search(ctx, query: str = "", deep_research: bool = False, **_) -> str: + if not query: + return json.dumps({"error": "请提供搜索关键词"}, ensure_ascii=False) + + perplexity_instance = getattr(ctx.robot, "perplexity", None) + if not perplexity_instance: + return json.dumps({"error": "Perplexity 搜索功能不可用,未配置或未初始化"}, ensure_ascii=False) + + try: + chat_id = ctx.get_receiver() + response = perplexity_instance.get_answer(query, chat_id, deep_research=deep_research) + + if not response: + return json.dumps({"error": "搜索无结果"}, ensure_ascii=False) + + # 清理 标签(reasoning 模型可能返回) + cleaned = re.sub(r".*?", "", response, flags=re.DOTALL).strip() + if not cleaned: + cleaned = response + + return json.dumps({"result": cleaned}, ensure_ascii=False) + + except Exception as e: + return json.dumps({"error": f"搜索失败: {e}"}, ensure_ascii=False) + + +tool_registry.register(Tool( + name="web_search", + description=( + "在网络上搜索信息。用于回答需要最新数据、实时信息或你不确定的事实性问题。" + "deep_research 仅在问题非常复杂、需要深度研究时才开启。" + ), + parameters={ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "搜索关键词或问题", + }, + "deep_research": { + "type": "boolean", + "description": "是否启用深度研究模式(耗时较长,仅用于复杂问题)", + }, + }, + "required": ["query"], + "additionalProperties": False, + }, + handler=_handle_web_search, +))