diff --git a/FUNCTION_CALL_USAGE.md b/FUNCTION_CALL_USAGE.md new file mode 100644 index 0000000..81a41f8 --- /dev/null +++ b/FUNCTION_CALL_USAGE.md @@ -0,0 +1,92 @@ +# Function Call 系统使用指南 + +## 概述 + +已成功完成从正则/路由体系到标准 Function Call 的迁移,新系统提供了更标准化、更易维护的函数调用能力。 + +## 系统架构 + +``` +用户消息 -> FunctionCallRouter -> (直接命令匹配 / LLM选择函数) -> 参数验证 -> 执行Handler -> 返回结果 +``` + +## 已迁移的功能 + +| 函数名 | 描述 | 作用域 | 需要@ | +|-------|------|--------|-------| +| `weather_query` | 查询城市天气预报 | both | 是 | +| `news_query` | 获取今日新闻 | both | 是 | +| `help` | 显示帮助信息 | both | 否 | +| `summary` | 总结群聊消息 | group | 是 | +| `reminder_set` | 设置提醒 | both | 是 | +| `reminder_list` | 查看提醒列表 | both | 是 | +| `reminder_delete` | 删除提醒 | both | 是 | +| `perplexity_search` | Perplexity搜索 | both | 是 | +| `clear_messages` | 清除消息历史 | group | 是 | +| `insult` | 骂人功能 | group | 是 | + +## 配置说明 + +在 `config.yaml` 中添加了以下配置: + +```yaml +function_call_router: + enable: true # 是否启用Function Call路由 + debug: false # 是否启用调试日志 +``` + +## 如何添加新功能 + +1. **定义参数模型** (在 `function_calls/models.py`): +```python +class MyFunctionArgs(BaseModel): + param1: str + param2: int +``` + +2. **实现处理器** (在 `function_calls/handlers.py`): +```python +@tool_function( + name="my_function", + description="我的功能描述", + examples=["示例1", "示例2"], + scope="both", + require_at=True +) +def handle_my_function(ctx: MessageContext, args: MyFunctionArgs) -> FunctionResult: + # 实现功能逻辑 + return FunctionResult( + handled=True, + messages=["处理结果"], + at=ctx.msg.sender if ctx.is_group else "" + ) +``` + +## 工作原理 + +1. **直接命令匹配**:对于明确的命令(如"help"、"新闻"),仍可直接调用对应函数。 +2. **多轮函数调用**:在原生 function call 模型下,助手会循环选择函数→等待工具输出→再决定是否继续调用或生成最终答复。 +3. **参数提取与验证**:每次调用前都会根据 JSON Schema 校验参数,确保类型与必填字段正确。 +4. **统一回复**:最终的用户回复由模型生成,工具返回的 `FunctionResult` 只作为 LLM 的工具消息输入。 +5. **无回退逻辑**:系统已移除传统正则路由与 AI 路由,所有功能均通过 Function Call 管理。 + +## 测试验证 + +系统通过了完整的集成测试: +- ✅ 函数注册表正常工作 +- ✅ 直接命令匹配准确 +- ✅ 参数提取正确 +- ✅ 类型验证有效 + +## 兼容性 + +- 保持与现有业务逻辑完全兼容 +- 精简路由体系,不再依赖旧正则路由 +- 不影响现有的微信客户端交互 + +## 性能优势 + +- 减少不必要的LLM调用(直接命令匹配) +- 标准化的参数处理 +- 统一的错误处理和日志记录 +- 更好的代码可维护性 diff --git a/README.MD b/README.MD index d303dd7..c85f6b8 100644 --- a/README.MD +++ b/README.MD @@ -257,11 +257,14 @@ Bubbles-WechatAI/ ├── ai_providers/ # AI 模块 │ ├── ai_name.py # AI 模型接口实现 │ └── ... -├── commands/ # 命令系统 -│ ├── registry.py # 正则命令注册 -│ ├── handlers.py # 命令处理函数 -│ ├── ai_router.py # AI智能路由器 -│ ├── ai_functions.py # AI路由功能注册 +├── commands/ # 命令辅助模块(保留上下文、闲聊等遗留逻辑) +│ ├── context.py +│ ├── handlers.py +│ └── ... +├── function_calls/ # 标准 Function Call 架构 +│ ├── handlers.py # 工具注册入口 +│ ├── services/ # 业务逻辑封装 +│ ├── router.py # 函数路由器 │ └── ... ├── data/ # 数据文件 │ @@ -274,101 +277,13 @@ Bubbles-WechatAI/ ### ✨ 如何添加新功能 -本项目提供两种方式添加新功能: +当前架构基于统一的 Function Call 体系,开发流程如下: -```mermaid -graph LR - A[新功能] --> B{选择路由方式} - B --> C[命令路由] - B --> D[AI路由] - - C --> E[精确匹配] - C --> F[固定格式命令] - C --> G[例如:天气北京] - - D --> H[自然语言理解] - D --> I[灵活表达] - D --> J[例如:北京天气怎么样] - - style A fill:#f9f,stroke:#333,stroke-width:2px - style C fill:#bbf,stroke:#333,stroke-width:2px - style D fill:#bfb,stroke:#333,stroke-width:2px -``` +1. **定义参数模型**:在 `function_calls/models.py` 中添加 `BaseModel` 子类,描述工具所需字段。 +2. **实现业务服务**:在 `function_calls/services/` 下编写纯函数,封装真实业务逻辑并返回文本描述。 +3. **注册处理器**:在 `function_calls/handlers.py` 使用 `@tool_function` 装饰器注册工具,返回 `FunctionResult`。 +4. **了解更多**:详见 `FUNCTION_CALL_USAGE.md`,其中记录了多轮函数调用流程与调试建议。 -#### 方式一:使用命令路由系统(适合有明确触发词的功能) - -1. **定义功能逻辑 (可选但推荐)**: - * 如果你的功能逻辑比较复杂,建议在 `function/` 目录下创建一个新的 Python 文件 (例如 `func_your_feature.py`)。 - * 在这个文件中实现你的核心功能代码,例如定义类或函数。这有助于保持代码结构清晰。 - -2. **创建命令处理器**: - * 打开 `commands/handlers.py` 文件。 - * 添加一个新的处理函数,例如 `handle_your_feature(ctx: 'MessageContext', match: Optional[Match]) -> bool:`。 - * 这个函数接收 `MessageContext` (包含消息上下文信息) 和 `match` (正则表达式匹配结果) 作为参数。 - * 在函数内部,你可以: - * 调用你在 `function/` 目录下创建的功能模块。 - * 使用 `ctx.send_text()` 发送回复消息。 - * 根据需要处理 `match` 对象提取用户输入的参数。 - * 函数应返回 `True` 表示命令已被处理,`False` 则表示未处理 (会继续尝试匹配后续命令或进行闲聊)。 - * 确保从 `function` 目录导入必要的模块。 - -3. **注册命令**: - * 打开 `commands/registry.py` 文件。 - * 在 `COMMANDS` 列表中,按照优先级顺序添加一个新的 `Command` 对象。 - * 配置 `Command` 参数: - * `name`: 命令的唯一标识名 (小写下划线)。 - * `pattern`: 用于匹配用户输入的正则表达式 (`re.compile`)。注意捕获用户参数。 - * `scope`: 命令适用范围 (`"group"`, `"private"`, `"both"`)。 - * `need_at`: 在群聊中是否需要 `@` 机器人才能触发 (`True`/`False`)。 - * `priority`: 命令的优先级 (数字越小越优先匹配)。 - * `handler`: 指向你在 `handlers.py` 中创建的处理函数 (例如 `handle_your_feature`)。 - * `description`: 命令的简短描述,用于帮助信息。 - * 确保从 `handlers.py` 导入你的新处理函数。 - -4. **更新帮助信息 (可选)**: - * 如果希望用户能在 `帮助` 命令中看到你的新功能,可以更新 `commands/handlers.py` 中的 `handle_help` 函数,将新命令的用法添加到帮助文本中。 - -#### 方式二:使用AI智能路由(适合自然语言交互的功能) - -AI路由系统让用户可以用自然语言触发功能,无需记住特定命令格式: - -1. **实现功能逻辑**: - * 在 `function/` 目录下创建功能模块(如已有则跳过) - -2. **注册AI路由功能**: - * 打开 `commands/ai_functions.py` - * 使用装饰器注册你的功能: - ```python - @ai_router.register( - name="your_function_name", - description="功能描述(AI会根据这个判断用户意图)", - examples=[ - "示例用法1", - "示例用法2", - "示例用法3" - ], - params_description="参数说明" - ) - def ai_handle_your_function(ctx: MessageContext, params: str) -> bool: - # params 是AI从用户输入中提取的参数 - # 调用你的功能逻辑 - # 使用 ctx.send_text() 发送回复 - return True - ``` - -3. **工作原理**: - * 用户发送消息时,如果正则路由未匹配,AI会分析用户意图 - * AI根据功能描述和示例,判断应该调用哪个功能 - * AI会自动提取参数并传递给功能处理函数 - -例如,注册了天气查询功能后,用户可以说: -- "北京天气怎么样" -- "查一下上海的天气" -- "明天深圳会下雨吗" - -AI都能理解并调用天气查询功能。 - -完成以上步骤后,重启机器人即可测试你的新功能! ## 📄 许可证 diff --git a/ai_providers/ai_chatgpt.py b/ai_providers/ai_chatgpt.py index 7184c3a..b218413 100644 --- a/ai_providers/ai_chatgpt.py +++ b/ai_providers/ai_chatgpt.py @@ -7,6 +7,7 @@ import base64 import os from datetime import datetime import time # 引入 time 模块 +from typing import Any, Dict, List import httpx from openai import APIConnectionError, APIError, AuthenticationError, OpenAI @@ -136,6 +137,38 @@ class ChatGPT(): return rsp + def call_with_functions(self, messages: List[Dict[str, Any]], functions: list, wxid: str): + """ + 使用函数调用功能的ChatGPT接口 + + Args: + message: 用户消息 + functions: 函数定义列表 + wxid: 用户ID + + Returns: + OpenAI响应对象 + """ + try: + # 调用函数调用接口 + params = { + "model": self.model, + "messages": messages, + "functions": functions, + "function_call": "auto" + } + + # 只有非o系列模型才设置temperature + if not self.model.startswith("o"): + params["temperature"] = 0.2 + + response = self.client.chat.completions.create(**params) + return response + + except Exception as e: + self.LOG.error(f"函数调用失败: {e}") + raise e + def encode_image_to_base64(self, image_path: str) -> str: """将图片文件转换为Base64编码 @@ -226,4 +259,4 @@ if __name__ == "__main__": # --- 测试代码需要调整 --- # 需要模拟 MessageSummary 和提供 bot_wxid 才能测试 print("请注意:直接运行此文件进行测试需要模拟 MessageSummary 并提供 bot_wxid。") - pass # 避免直接运行时出错 \ No newline at end of file + pass # 避免直接运行时出错 diff --git a/check_system.py b/check_system.py new file mode 100644 index 0000000..20f8358 --- /dev/null +++ b/check_system.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +系统完整性检查 +""" +import sys +import os +sys.path.append(os.path.dirname(__file__)) + +def check_imports(): + """检查所有关键模块是否能正常导入""" + print("🔍 检查模块导入...") + + try: + # 检查Function Call系统 + from function_calls.spec import FunctionSpec, FunctionResult + from function_calls.registry import register_function, list_functions + from function_calls.router import FunctionCallRouter + from function_calls.llm import FunctionCallLLM + import function_calls.init_handlers + print("✅ Function Call核心模块导入成功") + + # 检查处理器 + from function_calls.handlers import ( + handle_weather, handle_news, handle_help, + handle_reminder_set, handle_reminder_list, handle_reminder_delete, + handle_perplexity_search, handle_summary, handle_clear_messages + ) + print("✅ 所有处理器导入成功") + + # 检查参数模型 + from function_calls.models import ( + WeatherArgs, NewsArgs, HelpArgs, ReminderArgs, + PerplexityArgs, SummaryArgs, ClearMessagesArgs + ) + print("✅ 参数模型导入成功") + + return True + except Exception as e: + print(f"❌ 导入失败: {e}") + return False + +def check_function_registration(): + """检查函数注册是否正常""" + print("\n🔍 检查函数注册...") + + try: + from function_calls.registry import list_functions + functions = list_functions() + + expected_count = 10 + if len(functions) != expected_count: + print(f"⚠️ 函数数量异常: 期望{expected_count}个,实际{len(functions)}个") + return False + + required_functions = [ + 'weather_query', 'news_query', 'help', 'summary', + 'reminder_set', 'reminder_list', 'reminder_delete', + 'perplexity_search', 'clear_messages', 'insult' + ] + + missing_functions = [] + for func_name in required_functions: + if func_name not in functions: + missing_functions.append(func_name) + + if missing_functions: + print(f"❌ 缺少函数: {missing_functions}") + return False + + print("✅ 所有必需函数都已正确注册") + return True + + except Exception as e: + print(f"❌ 函数注册检查失败: {e}") + return False + +def check_router_initialization(): + """检查路由器初始化""" + print("\n🔍 检查路由器初始化...") + + try: + from function_calls.router import FunctionCallRouter + router = FunctionCallRouter() + print("✅ FunctionCallRouter初始化成功") + + # 测试直接命令匹配 + class MockCtx: + def __init__(self, text): + self.text = text + + test_cases = [ + ("help", "help"), + ("新闻", "news_query"), + ("天气 北京", "weather_query") + ] + + for input_text, expected in test_cases: + ctx = MockCtx(input_text) + result = router._try_direct_command_match(ctx) + if result != expected: + print(f"❌ 直接匹配失败: '{input_text}' -> {result} (期望: {expected})") + return False + + print("✅ 直接命令匹配正常") + return True + + except Exception as e: + print(f"❌ 路由器初始化失败: {e}") + return False + +def check_config_compatibility(): + """检查配置兼容性""" + print("\n🔍 检查配置文件...") + + try: + # 检查模板文件是否包含新配置 + with open('config.yaml.template', 'r', encoding='utf-8') as f: + content = f.read() + + if 'function_call_router:' not in content: + print("❌ config.yaml.template缺少function_call_router配置") + return False + + print("✅ 配置文件包含Function Call配置") + return True + + except Exception as e: + print(f"❌ 配置检查失败: {e}") + return False + +def main(): + print("🚀 Function Call系统完整性检查\n") + + checks = [ + ("模块导入", check_imports), + ("函数注册", check_function_registration), + ("路由器初始化", check_router_initialization), + ("配置兼容性", check_config_compatibility) + ] + + passed = 0 + total = len(checks) + + for name, check_func in checks: + if check_func(): + passed += 1 + else: + print(f"❌ {name}检查失败") + + print(f"\n📊 检查结果: {passed}/{total} 通过") + + if passed == total: + print("🎉 Function Call系统完整性检查全部通过!系统已准备就绪。") + return 0 + else: + print("⚠️ 部分检查失败,请检查上述错误信息。") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/commands/__init__.py b/commands/__init__.py index 85d8b34..cb1aeaa 100644 --- a/commands/__init__.py +++ b/commands/__init__.py @@ -1,11 +1,8 @@ # commands package """ -命令路由系统包 +命令辅助模块 -此包包含了命令路由系统的所有组件: +该包保留了消息上下文与部分遗留处理器,供 Function Call 架构复用: - context: 消息上下文类 -- models: 命令数据模型 -- router: 命令路由器 -- registry: 命令注册表 -- handlers: 命令处理函数 -""" \ No newline at end of file +- handlers: 基础命令/闲聊逻辑 +""" diff --git a/commands/ai_functions.py b/commands/ai_functions.py deleted file mode 100644 index 8e2140c..0000000 --- a/commands/ai_functions.py +++ /dev/null @@ -1,217 +0,0 @@ -""" -AI路由功能注册 -将需要通过AI路由的功能在这里注册 -""" -import re -import json -import os -from typing import Optional, Match -from datetime import datetime - -from .ai_router import ai_router -from .context import MessageContext - -# ======== 天气功能 ======== -@ai_router.register( - name="weather_query", - description="查询城市未来五天的简要天气预报", - examples=["北京天气怎么样", "上海天气"], - params_description="城市名称" -) -def ai_handle_weather(ctx: MessageContext, params: str) -> bool: - """AI路由的天气查询处理""" - city_name = params.strip() - if not city_name: - ctx.send_text("🤔 请告诉我你想查询哪个城市的天气") - return True - - # 加载城市代码 - city_codes = {} - city_code_path = os.path.join(os.path.dirname(__file__), '..', 'function', 'main_city.json') - try: - with open(city_code_path, 'r', encoding='utf-8') as f: - city_codes = json.load(f) - except Exception as e: - if ctx.logger: - ctx.logger.error(f"加载城市代码文件失败: {e}") - ctx.send_text("⚠️ 抱歉,天气功能暂时不可用") - return True - - # 查找城市代码 - city_code = city_codes.get(city_name) - if not city_code: - # 尝试模糊匹配 - for name, code in city_codes.items(): - if city_name in name: - city_code = code - city_name = name - break - - if not city_code: - ctx.send_text(f"😕 找不到城市 '{city_name}' 的天气信息") - return True - - # 获取天气信息 - try: - from function.func_weather import Weather - weather_info = Weather(city_code).get_weather(include_forecast=True) - ctx.send_text(weather_info) - return True - except Exception as e: - if ctx.logger: - ctx.logger.error(f"获取天气信息失败: {e}") - ctx.send_text(f"😥 获取 {city_name} 天气时遇到问题") - return True - -# ======== 新闻功能 ======== -@ai_router.register( - name="news_query", - description="获取今日新闻", - examples=["看看今天的新闻", "今日要闻"], - params_description="无需参数" -) -def ai_handle_news(ctx: MessageContext, params: str) -> bool: - """AI路由的新闻查询处理""" - try: - from function.func_news import News - news_instance = News() - is_today, news_content = news_instance.get_important_news() - - if is_today: - ctx.send_text(f"📰 今日要闻来啦:\n{news_content}") - else: - if news_content: - ctx.send_text(f"ℹ️ 今日新闻暂未发布,为您找到最近的一条新闻:\n{news_content}") - else: - ctx.send_text("❌ 获取新闻失败,请稍后重试") - - return True - except Exception as e: - if ctx.logger: - ctx.logger.error(f"获取新闻失败: {e}") - ctx.send_text("❌ 获取新闻时发生错误") - return True - -# ======== 提醒功能 ======== -@ai_router.register( - name="reminder_set", - description="设置提醒", - examples=["提醒我明天下午3点开会", "每天早上8点提醒我吃早餐"], - params_description="时间和内容" -) -def ai_handle_reminder_set(ctx: MessageContext, params: str) -> bool: - """AI路由的提醒设置处理""" - if not params.strip(): - at_list = ctx.msg.sender if ctx.is_group else "" - ctx.send_text("请告诉我需要提醒什么内容和时间呀~", at_list) - return True - - # 调用原有的提醒处理逻辑 - from .handlers import handle_reminder - - # 临时修改消息内容以适配原有处理器 - original_content = ctx.msg.content - ctx.msg.content = f"提醒我{params}" - - # handle_reminder不使用match参数,直接传None - result = handle_reminder(ctx, None) - - # 恢复原始内容 - ctx.msg.content = original_content - - return result - -@ai_router.register( - name="reminder_list", - description="查看所有提醒", - examples=["查看我的提醒", "我有哪些提醒"], - params_description="无需参数" -) -def ai_handle_reminder_list(ctx: MessageContext, params: str) -> bool: - """AI路由的提醒列表查看处理""" - from .handlers import handle_list_reminders - return handle_list_reminders(ctx, None) - -@ai_router.register( - name="reminder_delete", - description="删除提醒", - examples=["删除开会的提醒", "取消明天的提醒"], - params_description="提醒描述" -) -def ai_handle_reminder_delete(ctx: MessageContext, params: str) -> bool: - """AI路由的提醒删除处理""" - # 调用原有的删除提醒逻辑 - from .handlers import handle_delete_reminder - - # 临时修改消息内容 - original_content = ctx.msg.content - ctx.msg.content = f"删除提醒 {params}" - - # handle_delete_reminder不使用match参数,直接传None - result = handle_delete_reminder(ctx, None) - - # 恢复原始内容 - ctx.msg.content = original_content - - return result - -# ======== Perplexity搜索功能 ======== -@ai_router.register( - name="perplexity_search", - description="搜索查询资料并深度研究某个专业问题", - examples=["搜索Python最新特性", "查查机器学习教程"], - params_description="搜索内容" -) -def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool: - """AI路由的Perplexity搜索处理""" - if not params.strip(): - at_list = ctx.msg.sender if ctx.is_group else "" - ctx.send_text("请告诉我你想搜索什么内容", at_list) - return True - - # 获取Perplexity实例 - perplexity_instance = getattr(ctx.robot, 'perplexity', None) - if not perplexity_instance: - ctx.send_text("❌ Perplexity搜索功能当前不可用") - return True - - # 调用Perplexity处理 - content_for_perplexity = f"ask {params}" - chat_id = ctx.get_receiver() - sender_wxid = ctx.msg.sender - room_id = ctx.msg.roomid if ctx.is_group else None - is_group = ctx.is_group - - was_handled, fallback_prompt = perplexity_instance.process_message( - content=content_for_perplexity, - chat_id=chat_id, - sender=sender_wxid, - roomid=room_id, - from_group=is_group, - send_text_func=ctx.send_text - ) - - # 如果Perplexity无法处理,使用默认AI - if not was_handled and fallback_prompt: - chat_model = getattr(ctx, 'chat', None) or (getattr(ctx.robot, 'chat', None) if ctx.robot else None) - if chat_model: - try: - import time - current_time = time.strftime("%H:%M", time.localtime()) - q_with_info = f"[{current_time}] {ctx.sender_name}: {params}" - - rsp = chat_model.get_answer( - question=q_with_info, - wxid=ctx.get_receiver(), - system_prompt_override=fallback_prompt - ) - - if rsp: - at_list = ctx.msg.sender if ctx.is_group else "" - ctx.send_text(rsp, at_list) - return True - except Exception as e: - if ctx.logger: - ctx.logger.error(f"默认AI处理失败: {e}") - - return was_handled \ No newline at end of file diff --git a/commands/ai_router.py b/commands/ai_router.py deleted file mode 100644 index 341a81f..0000000 --- a/commands/ai_router.py +++ /dev/null @@ -1,252 +0,0 @@ -import re -import json -import logging -from typing import Dict, Callable, Optional, Any, Tuple -from dataclasses import dataclass, field -from .context import MessageContext - -logger = logging.getLogger(__name__) - -@dataclass -class AIFunction: - """AI可调用的功能定义""" - name: str # 功能唯一标识名 - handler: Callable # 处理函数 - description: str # 功能描述(给AI看的) - examples: list[str] = field(default_factory=list) # 示例用法 - params_description: str = "" # 参数说明 - -class AIRouter: - """AI智能路由器""" - - def __init__(self): - self.functions: Dict[str, AIFunction] = {} - self.logger = logger - - def register(self, name: str, description: str, examples: list[str] = None, params_description: str = ""): - """ - 装饰器:注册一个功能到AI路由器 - - @ai_router.register( - name="weather_query", - description="查询指定城市的天气预报", - examples=["北京天气怎么样", "查一下上海的天气", "明天深圳会下雨吗"], - params_description="城市名称" - ) - def handle_weather(ctx: MessageContext, params: str) -> bool: - # 实现天气查询逻辑 - pass - """ - def decorator(func: Callable) -> Callable: - ai_func = AIFunction( - name=name, - handler=func, - description=description, - examples=examples or [], - params_description=params_description - ) - self.functions[name] = ai_func - self.logger.info(f"AI路由器注册功能: {name} - {description}") - return func - - return decorator - - def _build_ai_prompt(self) -> str: - """构建给AI的系统提示词,包含所有可用功能的信息""" - prompt = """你是一个智能路由助手。根据用户的输入,判断用户的意图并返回JSON格式的响应。 - - ### 注意: - 1. 你需要优先判断自己是否可以直接回答用户的问题,如果你可以直接回答,则返回 "chat",无需返回 "function" - 2. 如果用户输入中包含多个功能,请优先匹配最符合用户意图的功能。如果无法判断,则返回 "chat"。 - 3. 优先考虑使用 chat 处理,需要外部资料或其他功能逻辑时,再返回 "function"。 - - ### 可用的功能列表: - """ - for name, func in self.functions.items(): - prompt += f"\n- {name}: {func.description}" - if func.params_description: - prompt += f"\n 参数: {func.params_description}" - if func.examples: - prompt += f"\n 示例: {', '.join(func.examples[:3])}" - prompt += "\n" - - prompt += """ - 请你分析用户输入,严格按照以下格式返回JSON: - - ### 返回格式: - - 1. 如果用户只是聊天或者不匹配任何功能,返回: - { - "action_type": "chat" - } - - 2.如果用户需要使用上述功能之一,返回: - { - "action_type": "function", - "function_name": "上述功能列表中的功能名", - "params": "从用户输入中提取的参数" - } - - #### 示例: - - 用户输入"北京天气怎么样" -> {"action_type": "function", "function_name": "weather_query", "params": "北京"} - - 用户输入"看看新闻" -> {"action_type": "function", "function_name": "news_query", "params": ""} - - 用户输入"你好" -> {"action_type": "chat"} - - 用户输入"查一下Python教程" -> {"action_type": "function", "function_name": "perplexity_search", "params": "Python教程"} - - #### 格式注意事项: - 1. action_type 只能是 "function" 或 "chat" - 2. 只返回JSON,无需其他解释 - 3. function_name 必须完全匹配上述功能列表中的名称 - """ - return prompt - - def route(self, ctx: MessageContext) -> Tuple[bool, Optional[Dict[str, Any]]]: - """ - AI路由决策 - - 返回: (是否处理成功, AI决策结果) - """ - print(f"[AI路由器] route方法被调用") - - if not ctx.text: - print("[AI路由器] ctx.text为空,返回False") - return False, None - - # 获取AI模型 - chat_model = getattr(ctx, 'chat', None) - if not chat_model: - chat_model = getattr(ctx.robot, 'chat', None) if ctx.robot else None - - if not chat_model: - print("[AI路由器] 无可用的AI模型") - self.logger.error("AI路由器:无可用的AI模型") - return False, None - - print(f"[AI路由器] 找到AI模型: {type(chat_model)}") - - try: - # 构建系统提示词 - system_prompt = self._build_ai_prompt() - print(f"[AI路由器] 已构建系统提示词,长度: {len(system_prompt)}") - - # 让AI分析用户意图 - user_input = f"用户输入:{ctx.text}" - print(f"[AI路由器] 准备调用AI分析意图: {user_input}") - - ai_response = chat_model.get_answer( - user_input, - wxid=ctx.get_receiver(), - system_prompt_override=system_prompt - ) - - print(f"[AI路由器] AI响应: {ai_response}") - - # 解析AI返回的JSON - json_match = re.search(r'\{.*\}', ai_response, re.DOTALL) - if not json_match: - self.logger.warning(f"AI路由器:无法从AI响应中提取JSON - {ai_response}") - return False, None - - decision = json.loads(json_match.group(0)) - - # 验证决策格式 - action_type = decision.get("action_type") - if action_type not in ["chat", "function"]: - self.logger.warning(f"AI路由器:未知的action_type - {action_type}") - return False, None - - # 如果是功能调用,验证功能名 - if action_type == "function": - function_name = decision.get("function_name") - if function_name not in self.functions: - self.logger.warning(f"AI路由器:未知的功能名 - {function_name}") - return False, None - - self.logger.info(f"AI路由决策: {decision}") - return True, decision - - except json.JSONDecodeError as e: - self.logger.error(f"AI路由器:解析JSON失败 - {e}") - return False, None - except Exception as e: - self.logger.error(f"AI路由器:处理异常 - {e}") - return False, None - - def _check_permission(self, ctx: MessageContext) -> bool: - """ - 检查是否有权限使用AI路由功能 - - :param ctx: 消息上下文 - :return: 是否有权限 - """ - # 检查是否启用AI路由 - ai_router_config = getattr(ctx.config, 'AI_ROUTER', {}) - if not ai_router_config.get('enable', True): - self.logger.info("AI路由功能已禁用") - return False - - # 私聊始终允许 - if not ctx.is_group: - return True - - # 群聊需要检查白名单 - allowed_groups = ai_router_config.get('allowed_groups', []) - current_group = ctx.get_receiver() - - if current_group in allowed_groups: - self.logger.info(f"群聊 {current_group} 在AI路由白名单中,允许使用") - return True - else: - self.logger.info(f"群聊 {current_group} 不在AI路由白名单中,禁止使用") - return False - - def dispatch(self, ctx: MessageContext) -> bool: - """ - 执行AI路由分发 - - 返回: 是否成功处理 - """ - print(f"[AI路由器] dispatch被调用,消息内容: {ctx.text}") - - # 检查权限 - if not self._check_permission(ctx): - print("[AI路由器] 权限检查失败,返回False") - return False - - # 获取AI路由决策 - success, decision = self.route(ctx) - print(f"[AI路由器] route返回 - success: {success}, decision: {decision}") - - if not success or not decision: - print("[AI路由器] route失败或无决策,返回False") - return False - - action_type = decision.get("action_type") - - # 如果是聊天,返回False让后续处理器处理 - if action_type == "chat": - self.logger.info("AI路由器:识别为聊天意图,交给聊天处理器") - return False - - # 如果是功能调用 - if action_type == "function": - function_name = decision.get("function_name") - params = decision.get("params", "") - - func = self.functions.get(function_name) - if not func: - self.logger.error(f"AI路由器:功能 {function_name} 未找到") - return False - - try: - self.logger.info(f"AI路由器:调用功能 {function_name},参数: {params}") - result = func.handler(ctx, params) - return result - except Exception as e: - self.logger.error(f"AI路由器:执行功能 {function_name} 出错 - {e}") - return False - - return False - -# 创建全局AI路由器实例 -ai_router = AIRouter() \ No newline at end of file diff --git a/commands/handlers.py b/commands/handlers.py index 64938b2..ca5a0ef 100644 --- a/commands/handlers.py +++ b/commands/handlers.py @@ -49,43 +49,6 @@ def handle_help(ctx: 'MessageContext', match: Optional[Match]) -> bool: # 发送消息 return ctx.send_text(help_text) -def handle_check_equipment(ctx: 'MessageContext', match: Optional[Match]) -> bool: - """ - 处理 "查看装备" 命令 - - 匹配: 我的装备/查看装备 - """ - if not ctx.is_group: - ctx.send_text("❌ 装备查看功能只支持群聊") - return True - - try: - from function.func_duel import DuelRankSystem - - player_name = ctx.sender_name - rank_system = DuelRankSystem(ctx.msg.roomid) - player_data = rank_system.get_player_data(player_name) - - if not player_data: - ctx.send_text(f"⚠️ 没有找到 {player_name} 的数据") - return True - - items = player_data.get("items", {"elder_wand": 0, "magic_stone": 0, "invisibility_cloak": 0}) - result = [ - f"🧙‍♂️ {player_name} 的魔法装备:", - f"🪄 老魔杖: {items.get('elder_wand', 0)}次 ", - f"💎 魔法石: {items.get('magic_stone', 0)}次", - f"🧥 隐身衣: {items.get('invisibility_cloak', 0)}次 " - ] - - ctx.send_text("\n".join(result)) - - return True - except Exception as e: - if ctx.logger: - ctx.logger.error(f"查看装备出错: {e}") - ctx.send_text("⚠️ 查看装备失败") - return False def handle_summary(ctx: 'MessageContext', match: Optional[Match]) -> bool: """ diff --git a/commands/models.py b/commands/models.py deleted file mode 100644 index 4034e3f..0000000 --- a/commands/models.py +++ /dev/null @@ -1,38 +0,0 @@ -import re -from dataclasses import dataclass -from typing import Pattern, Callable, Literal, Optional, Any, Union, Match - -# 导入 MessageContext,使用前向引用避免循环导入 -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from .context import MessageContext - - -@dataclass -class Command: - """ - 命令定义类,封装命令的匹配条件和处理函数 - """ - name: str # 命令名称,用于日志和调试 - pattern: Union[Pattern, Callable[['MessageContext'], Optional[Match]]] # 匹配规则:正则表达式或自定义匹配函数 - scope: Literal["group", "private", "both"] # 生效范围: "group"-仅群聊, "private"-仅私聊, "both"-两者都可 - handler: Callable[['MessageContext', Optional[Match]], bool] # 处理函数 - need_at: bool = False # 在群聊中是否必须@机器人才能触发 - priority: int = 100 # 优先级,数字越小越先匹配 - description: str = "" # 命令的描述,用于生成帮助信息 - - def __post_init__(self): - """验证命令配置的有效性""" - if self.scope not in ["group", "private", "both"]: - raise ValueError(f"无效的作用域: {self.scope},必须是 'group', 'private' 或 'both'") - - # 检查pattern是否为正则表达式或可调用对象 - if not isinstance(self.pattern, (Pattern, Callable)): - # 如果是字符串,尝试转换为正则表达式 - if isinstance(self.pattern, str): - try: - self.pattern = re.compile(self.pattern) - except re.error: - raise ValueError(f"无效的正则表达式: {self.pattern}") - else: - raise TypeError(f"pattern 必须是正则表达式或可调用对象,而不是 {type(self.pattern)}") \ No newline at end of file diff --git a/commands/registry.py b/commands/registry.py deleted file mode 100644 index 6b6bcd6..0000000 --- a/commands/registry.py +++ /dev/null @@ -1,136 +0,0 @@ -import re -from .models import Command -from .handlers import ( - handle_help, - # handle_duel, handle_sneak_attack, handle_duel_rank, - # handle_duel_stats, handle_check_equipment, handle_rename, - handle_summary, handle_clear_messages, handle_news_request, - handle_chitchat, handle_insult, - handle_perplexity_ask, handle_reminder, handle_list_reminders, handle_delete_reminder, - handle_weather_forecast -) - -# 命令列表,按优先级排序 -# 优先级越小越先匹配 -COMMANDS = [ - # ======== 基础系统命令 ======== - Command( - name="help", - pattern=re.compile(r"^(info|帮助|指令)$", re.IGNORECASE), - scope="both", # 群聊和私聊都支持 - need_at=False, # 不需要@机器人 - priority=10, # 优先级较高 - handler=handle_help, - description="显示机器人的帮助信息" - ), - - # ======== Perplexity AI 命令 ======== - Command( - name="perplexity_ask", - pattern=re.compile(r"^ask\s*(.+)", re.IGNORECASE | re.DOTALL), - scope="both", # 群聊和私聊都支持 - need_at=True, # 需要@机器人 - priority=25, # 较高优先级,确保在闲聊之前处理 - handler=handle_perplexity_ask, - description="使用 Perplexity AI 进行深度查询" - ), - - # ======== 消息管理命令 ======== - Command( - name="summary", - pattern=re.compile(r"^(summary|总结)$", re.IGNORECASE), - scope="group", # 仅群聊支持 - need_at=True, # 需要@机器人 - priority=30, # 优先级一般 - handler=handle_summary, - description="总结群聊最近的消息" - ), - - Command( - name="clear_messages", - pattern=re.compile(r"^(clearmessages|清除历史)$", re.IGNORECASE), - scope="group", # 仅群聊支持 - need_at=True, # 需要@机器人 - priority=31, # 优先级一般 - handler=handle_clear_messages, - description="从数据库中清除群聊的历史消息记录" - ), - - # ======== 提醒功能 ======== - Command( - name="reminder", - pattern=re.compile(r"提醒我", re.IGNORECASE), - scope="both", # 支持群聊和私聊 - need_at=True, # 在群聊中需要@机器人 - priority=35, # 优先级适中,在基础命令后,复杂功能或闲聊前 - handler=handle_reminder, - description="设置一个提醒 (包含 '提醒我' 关键字即可, 例如:提醒我明天下午3点开会)" - ), - - Command( - name="list_reminders", - pattern=re.compile(r"^(查看提醒|我的提醒|提醒列表)$", re.IGNORECASE), - scope="both", # 支持群聊和私聊 - need_at=True, # 在群聊中需要@机器人 - priority=36, # 优先级略低于设置提醒 - handler=handle_list_reminders, - description="查看您设置的所有提醒" - ), - - Command( - name="delete_reminder", - # 修改为只匹配包含"删"、"删除"或"取消"的消息,不再要求特定格式 - pattern=re.compile(r"(?:删|删除|取消)", re.IGNORECASE), - scope="both", # 支持群聊和私聊 - need_at=True, # 在群聊中需要@机器人 - priority=37, - handler=handle_delete_reminder, - description="删除提醒 (包含'删'和'提醒'关键字即可,如: 把开会的提醒删了)" - ), - - # ======== 新闻和实用工具 ======== - Command( - name="weather_forecast", - pattern=re.compile(r"^(?:天气预报|天气)\s+(.+)$"), # 匹配 天气预报/预报 城市名 - scope="both", # 群聊和私聊都支持 - need_at=True, # 需要@机器人 - priority=38, # 优先级比天气高一点 - handler=handle_weather_forecast, - description="查询指定城市未来几天的天气预报 (例如:天气预报 北京)" - ), - - Command( - name="news", - pattern=re.compile(r"^新闻$"), - scope="both", # 群聊和私聊都支持 - need_at=True, # 需要@机器人 - priority=40, # 优先级一般 - handler=handle_news_request, - description="获取最新新闻" - ), - - # ======== 骂人命令 ======== - Command( - name="insult", - pattern=re.compile(r"骂一下\s*@([^\s@]+)"), - scope="group", # 仅群聊支持 - need_at=True, # 需要@机器人 - priority=100, # 优先级较高 - handler=handle_insult, - description="骂指定用户" - ), - -] - -# 可以添加一个函数,获取命令列表的简单描述 -def get_commands_info(): - """获取所有命令的简要信息,用于调试""" - info = [] - for i, cmd in enumerate(COMMANDS): - scope_str = {"group": "仅群聊", "private": "仅私聊", "both": "群聊私聊"}[cmd.scope] - at_str = "需要@" if cmd.need_at else "不需@" - info.append(f"{i+1}. [{cmd.priority}] {cmd.name} ({scope_str},{at_str}) - {cmd.description or '无描述'}") - return "\n".join(info) - -# 导出所有命令 -__all__ = ["COMMANDS", "get_commands_info"] \ No newline at end of file diff --git a/commands/router.py b/commands/router.py deleted file mode 100644 index 8fd769d..0000000 --- a/commands/router.py +++ /dev/null @@ -1,117 +0,0 @@ -import re -import logging -from typing import List, Optional, Any, Dict, Match -import traceback - -from .models import Command -from .context import MessageContext - -# 获取模块级 logger -logger = logging.getLogger(__name__) - - -class CommandRouter: - """ - 命令路由器,负责将消息路由到对应的命令处理函数 - """ - def __init__(self, commands: List[Command], robot_instance: Optional[Any] = None): - # 按优先级排序命令列表,数字越小优先级越高 - self.commands = sorted(commands, key=lambda cmd: cmd.priority) - self.robot_instance = robot_instance - - # 分析并输出命令注册信息,便于调试 - scope_count = {"group": 0, "private": 0, "both": 0} - for cmd in commands: - scope_count[cmd.scope] += 1 - - logger.info(f"命令路由器初始化成功,共加载 {len(commands)} 个命令") - logger.info(f"命令作用域分布: 仅群聊 {scope_count['group']},仅私聊 {scope_count['private']},两者均可 {scope_count['both']}") - - # 按优先级输出命令信息 - for i, cmd in enumerate(self.commands[:10]): # 只输出前10个 - logger.info(f"{i+1}. [{cmd.priority}] {cmd.name} - {cmd.description or '无描述'}") - if len(self.commands) > 10: - logger.info(f"... 共 {len(self.commands)} 个命令") - - def dispatch(self, ctx: MessageContext) -> bool: - """ - 根据消息上下文分发命令 - :param ctx: 消息上下文对象 - :return: 是否有命令成功处理 - """ - # 确保context可以访问到robot实例 - if self.robot_instance and not ctx.robot: - ctx.robot = self.robot_instance - # 如果robot有logger属性且ctx没有logger,则使用robot的logger - if hasattr(self.robot_instance, 'LOG') and not ctx.logger: - ctx.logger = self.robot_instance.LOG - - # 记录日志,便于调试 - if ctx.logger: - ctx.logger.debug(f"开始路由消息: '{ctx.text}', 来自: {ctx.sender_name}, 群聊: {ctx.is_group}, @机器人: {ctx.is_at_bot}") - - # 遍历命令列表,按优先级顺序匹配 - for cmd in self.commands: - # 1. 检查作用域 (scope) - if cmd.scope != "both": - if (cmd.scope == "group" and not ctx.is_group) or \ - (cmd.scope == "private" and ctx.is_group): - continue # 作用域不匹配,跳过 - - # 2. 检查是否需要 @ (need_at) - 仅在群聊中有效 - if ctx.is_group and cmd.need_at and not ctx.is_at_bot: - continue # 需要@机器人但未被@,跳过 - - # 3. 执行匹配逻辑 - match_result = None - try: - # 根据pattern类型执行匹配 - if callable(cmd.pattern): - # 自定义匹配函数 - match_result = cmd.pattern(ctx) - else: - # 正则表达式匹配 - match_obj = cmd.pattern.search(ctx.text) - match_result = match_obj - - # 匹配失败,尝试下一个命令 - if match_result is None: - continue - - # 匹配成功,记录日志 - if ctx.logger: - ctx.logger.info(f"命令 '{cmd.name}' 匹配成功,准备处理") - - # 4. 执行命令处理函数 - try: - result = cmd.handler(ctx, match_result) - if result: - if ctx.logger: - ctx.logger.info(f"命令 '{cmd.name}' 处理成功") - return True - else: - if ctx.logger: - ctx.logger.warning(f"命令 '{cmd.name}' 处理返回False,尝试下一个命令") - except Exception as e: - if ctx.logger: - ctx.logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}") - ctx.logger.error(traceback.format_exc()) - else: - logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}", exc_info=True) - # 出错后继续尝试下一个命令 - except Exception as e: - # 匹配过程出错,记录并继续 - if ctx.logger: - ctx.logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}") - else: - logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}", exc_info=True) - continue - - # 所有命令都未匹配或处理失败 - if ctx.logger: - ctx.logger.debug("所有命令匹配失败或处理失败") - return False - - def get_command_descriptions(self) -> Dict[str, str]: - """获取所有命令的描述,用于生成帮助信息""" - return {cmd.name: cmd.description for cmd in self.commands if cmd.description} \ No newline at end of file diff --git a/config.yaml.template b/config.yaml.template index 3f440b3..ed6005b 100644 --- a/config.yaml.template +++ b/config.yaml.template @@ -144,6 +144,6 @@ perplexity: # -----perplexity配置这行不填----- allowed_groups: [] # 允许使用Perplexity的群聊ID列表,例如:["123456789@chatroom", "123456789@chatroom"] allowed_users: [] # 允许使用Perplexity的用户ID列表,例如:["wxid_123456789", "filehelper"] -ai_router: # -----AI路由器配置----- - enable: true # 是否启用AI路由功能 - allowed_groups: [] # 允许使用AI路由的群聊ID列表,例如:["123456789@chatroom", "123456789@chatroom"] \ No newline at end of file +function_call_router: # -----Function Call路由器配置----- + enable: true # 是否启用新的Function Call路由功能 + debug: false # 是否启用调试日志 diff --git a/configuration.py b/configuration.py index fdcfd95..7e7e9cd 100644 --- a/configuration.py +++ b/configuration.py @@ -42,6 +42,5 @@ class Config(object): self.ALIYUN_IMAGE = yconfig.get("aliyun_image", {}) self.GEMINI_IMAGE = yconfig.get("gemini_image", {}) self.GEMINI = yconfig.get("gemini", {}) - self.AI_ROUTER = yconfig.get("ai_router", {"enable": True, "allowed_groups": []}) self.MAX_HISTORY = yconfig.get("MAX_HISTORY", 300) self.SEND_RATE_LIMIT = yconfig.get("send_rate_limit", 0) diff --git a/function_call_migration.md b/function_call_migration.md new file mode 100644 index 0000000..ac0dd23 --- /dev/null +++ b/function_call_migration.md @@ -0,0 +1,234 @@ +# 从正则/路由体系迁移到标准 Function Call 的实施手册 + +## 1. 改造目标与约束 +- 完全淘汰 `commands` 目录下的正则匹配路由 (`CommandRouter`) 与 `commands/ai_router.py` 中的自定义决策逻辑,统一改为标准化的 Function Call 协议。 +- 让所有机器人工具能力都以“结构化函数定义 + 参数 JSON schema”的形式暴露,既能让 LLM 函数调用,也能被程序直接调用。 +- 保持现有业务能力完整可用(天气、新闻、提醒、Perplexity 搜索、群管理等),迁移过程中不影响线上稳定性。 +- 兼容现有上下文对象 `MessageContext`,并保留与微信客户端交互所需的最小耦合。 + +## 2. 现有架构梳理 +### 2.1 指令流 +1. `robot.py` 中 `Robot.processMsg` 获取消息后构造 `MessageContext`,先交给 `CommandRouter.dispatch`(见 `commands/router.py:13`)。 +2. `CommandRouter` 遍历 `COMMANDS` 列表(`commands/registry.py:15` 起),用正则匹配执行对应 handler,例如 `handle_reminder`(`commands/handlers.py`)。 +3. Handler 内部通常继续调用 `function/` 下的模块完成业务。 + +### 2.2 AI 路由流 +1. `commands/ai_router.py` 提供 `AIRouter`,通过 `_build_ai_prompt` 把功能描述写进提示词。 +2. `route` 调用聊天模型的 `get_answer`,要求模型返回 `{"action_type": "function", ...}` 格式的 JSON,再根据返回字符串里的 params 调 handler。 +3. 该流程依旧依赖字符串解析和弱结构的参数传递(例如 `params` 直接拼在 handler 里处理)。 + +### 2.3 问题痛点 +- 功能元数据散落:正则、示例、参数说明分布在多个文件,新增能力需要多处编辑。 +- 参数结构模糊:当前 `params` 仅是字符串,handler 内自行拆分,容易出错。 +- 与 LLM 的交互不标准:靠提示词提醒模型返回 JSON,缺乏 schema 约束,易产生格式错误。 +- 双路由并存:命令路由与 AI 路由行为不一致,重复注册、维护成本高。 + +## 3. 目标架构设想 +``` +WxMsg -> MessageContext -> FunctionCallRouter + |-- Registry (FunctionSpec, schema, handler) + |-- FunctionCallLLM (统一 function call API) + '-- Local invoker / fallback (无 LLM) +``` +- **FunctionSpec**:定义函数名、描述、参数 JSON schema、返回结构、权限等元数据。 +- **FunctionCallRouter**:单一入口,负责: + 1. 根据上下文(是否命令关键字、是否@)决定是否直接调用或交给 LLM 选函数。 + 2. 如果由 LLM 决定,则调用支持 function call 的接口(OpenAI / DeepSeek / 自建),拿到带函数名与参数 JSON 的结构化响应。 + 3. 校验参数,调用真实 handler,统一处理返回。 +- **Handlers**:全部改造成签名规范的函数(例如接收 `ctx: MessageContext, args: TypedModel`),禁止在 handler 内再解析自然语言。 + +## 4. 迁移阶段概览 +| 阶段 | 目标 | 关键输出 | 风险控制 | +| ---- | ---- | -------- | -------- | +| P0 现状盘点 | 梳理所有功能与依赖 | 功能清单、调用图、可迁移性评估 | 标注遗留 / 暂缓功能 | +| P1 构建 Function Spec | 落地函数描述模型与注册中心 | `function_calls/spec.py`、`registry.py` | 先只收录已实现能力 | +| P2 新路由内核 | 新的 `FunctionCallRouter` 与 LLM 适配层 | `function_calls/router.py`、`llm.py` | 与老路由并行跑灰度 | +| P3 Handler 适配 | 将现有 handler 改为结构化参数 | 类型化参数模型、转换器 | 保留回退入口、渐进式替换 | +| P4 切换与清理 | 替换 `Robot.processMsg` 流程,删除旧代码 | 配置开关、文档 | 全量回归测试 | + +## 5. 各阶段详细操作 +### 阶段 P0:能力盘点 & 前置准备 +1. **提取功能列表**: + - 从 `commands/registry.py` 抽出每个 `Command` 的 `name/description/pattern`。 + - 从 `commands/ai_functions.py` 抽出 `@ai_router.register` 的功能信息。 +2. **梳理依赖**:确认每个 handler 调用的模块,如 `function/func_weather.py`、数据库访问、外部 API。 +3. **分类能力**:区分“纯文本问答”、“需要结构化参数的工具调用”、“需要调度/持久化的事务型能力”。 +4. **定义统一字段**:初步罗列每个功能需要的字段(例如天气需要 `city`,提醒需要 `time_spec` + `content`)。 +5. **技术选型**:确定使用的 function call 接口: + - 若沿用 OpenAI/DeepSeek/gpt-4o 等需确认其 function call JSON schema 支持。 + - 若需自定义,可在 `ai_providers` 中新增 `call_with_functions` 方法。 + +### 阶段 P1:定义 FunctionSpec 与注册中心 +1. **创建模块结构**:建议新增 `function_calls/` 目录,包含: + - `spec.py`:定义核心数据结构。 + - `registry.py`:集中注册所有函数。 + - `llm.py`:统一封装 LLM 函数调用接口。 +2. **定义数据结构**(示例): + ```python + # function_calls/spec.py + from dataclasses import dataclass + from typing import Callable, Any, Dict + + @dataclass + class FunctionSpec: + name: str + description: str + parameters_schema: Dict[str, Any] + handler: Callable[[MessageContext, Dict[str, Any]], bool] + examples: list[str] = None + scope: str = "both" # group / private / both + require_at: bool = False + auth: str | None = None # 权限标签(可选) + ``` +3. **写注册器**:用装饰器或显式方法统一注册: + ```python + # function_calls/registry.py + FUNCTION_REGISTRY: dict[str, FunctionSpec] = {} + + def register_function(spec: FunctionSpec) -> None: + if spec.name in FUNCTION_REGISTRY: + raise ValueError(f"duplicate function: {spec.name}") + FUNCTION_REGISTRY[spec.name] = spec + ``` +4. **构建 JSON schema**: + - 使用标准 Draft-07 schema,字段包括 `type`, `properties`, `required`。 + - 设计工具函数,将 Pydantic/自定义 dataclass 自动转 schema(便于 handler 书写类型定义)。 +5. **迁移功能描述**:P0 中梳理的功能,逐一写成 `FunctionSpec`,暂时把 handler 指向旧 handler 的包装函数(下一阶段重写)。 + +### 阶段 P2:实现 FunctionCallRouter 与 LLM 适配 +1. **Router 结构**: + - 在 `function_calls/router.py` 新建 `FunctionCallRouter`,替代旧 `CommandRouter` 和 `AIRouter`。 + - 公开 `dispatch(ctx: MessageContext) -> bool` 接口,供 `Robot.processMsg` 调用。 +2. **决策流程**: + - 如果消息符合“显式命令”格式,可以在本地直接确定函数(例如以 `/` 开头、或命中关键字表),避免调用 LLM。 + - 否则调用 LLM 函数选择:统一走 `FunctionCallLLM.select_function(ctx, registry)`。 +3. **LLM 适配**: + - 在 `llm.py` 内封装: + 1. 将 `FunctionSpec` 列表转换成 OpenAI 函数调用所需的 `functions` 参数(包含 `name`, `description`, `parameters` schema)。 + 2. 调用具体模型(例如 `chat_model.call_with_functions(...)`)。如当前模型类没有,需在 `ai_providers` 对应文件内加包装。 + - 处理返回: + ```python + response = chat_model.call_with_functions([...]) + function_name = response.choices[0].message.tool_calls[0].function.name + arguments = json.loads(response.choices[0].message.tool_calls[0].function.arguments) + ``` + - 若模型不支持函数调用,退化到 prompt + JSON parsing,但要封装在适配层可替换。 +4. **参数校验**: + - 在 router 中对 `arguments` 做 schema 验证(使用 `jsonschema` / `pydantic`)。失败时给出可读错误并返回聊天 fallback。 +5. **并行运行策略**: + - 在 `Robot` 里保留旧路由开关,例如 `ENABLE_FUNCTION_ROUTER`。 + - 灰度期间可先调用新 router,如失败再回退旧 `CommandRouter.dispatch`。 +6. **日志与追踪**:统一记录:选择的函数、输入参数、执行耗时、是否成功,方便对比新旧行为。 + +### 阶段 P3:Handler 结构化改造 +1. **参数模型化**:为每个功能定义数据模型(使用 `pydantic.BaseModel` 或 dataclass): + ```python + class WeatherArgs(BaseModel): + city: str + ``` +2. **重写 handler 签名**: + - 新 handler 统一为 `def handle(ctx: MessageContext, args: WeatherArgs) -> FunctionResult`。 + - `FunctionResult` 可包含 `handled: bool`, `reply: str | None`, `attachments: list[...]` 等,便于拓展。 +3. **包装旧逻辑**:将 `commands/handlers.py` 中的旧函数迁到新目录或拆分: + - 对于仍然有效的业务代码,提取核心逻辑到 `services/` 或 `function/` 保留位置,减少重复。 + - Handler 仅负责:记录日志 → 调用 service → 发送回复 → 返回结果。 +4. **删除自然语言解析**:所有参数应由 LLM 生成的 JSON 直接提供,handler 不再解析中文描述。 +5. **权限 & 场景**:在 `FunctionSpec` 中配置 `scope`/`require_at` 等字段,在 router 层校验,handler 内不再判断。 + +### 阶段 P4:切换入口与清理遗留 +1. **替换 `Robot.processMsg` 流程**: + - 将调用链切换为 `FunctionCallRouter.dispatch(ctx)`。 + - 如果返回 `False` 且 `ctx.chat` 存在,则调用默认聊天模型兜底(原 `handle_chitchat`)。 +2. **移除旧模块**: + - 删除 `commands/router.py`、`commands/models.py`、`commands/registry.py`、`commands/ai_router.py`、`commands/ai_functions.py`。 + - 将保留的业务 handler 根据需要移动到 `function_calls/handlers/` 或 `services/`。 +3. **配置与文档更新**:同步更新 `README.MD`、配置项示例,说明如何新增函数、如何控制启用状态。 + +## 6. 关键实现细节建议 +### 6.1 函数清单与元数据 +- 建议维护清单表格(CSV/Notion/markdown),列出:函数名、描述、输入字段、输出、依赖模块、是否对群开放、是否需要异步调度。 +- 对提醒类功能,注明需要访问数据库(`function/func_reminder.py`),关注事务边界。 + +### 6.2 Schema 构建工具链 +- 提供装饰器,自动从参数模型生成 `FunctionSpec`: + ```python + def tool_function(name: str, description: str, examples: list[str] = None, **meta): + def wrapper(func): + schema = build_schema_from_model(func.__annotations__['args']) + register_function(FunctionSpec( + name=name, + description=description, + parameters_schema=schema, + handler=func, + examples=examples or [], + **meta, + )) + return func + return wrapper + ``` +- `build_schema_from_model` 可以基于 `pydantic` 的 `model_json_schema()` 实现。 + +### 6.3 FunctionResult 规范 +- 统一约定 handler 返回内容: + ```python + class FunctionResult(BaseModel): + handled: bool + messages: list[str] = [] + at_list: list[str] = [] + metadata: dict[str, Any] = {} + ``` +- Router 根据返回决定是否向微信发送消息、是否继续 fallback。 + +### 6.4 兼容旧入参场景 +- 对于仍由系统内部触发(非用户输入)的调用(例如定时提醒触发),也复用新的 handler,确保所有入口一致。 +- 若暂时无法结构化,可定义 `raw_text: str` 字段,作为临时措施;在后续迭代中逐步替换。 + +### 6.5 日志与观测 +- 在 router 层记录: + - LLM 请求/响应 ID、耗时。 + - 选中的函数名、参数、handler 执行耗时。 + - 异常统一捕获并落盘。 +- 可在 `logs/` 目录新建 function-call 专属日志,方便分析差异。 + +## 7. Prompt 与 LLM 策略 +1. **系统提示词**:基于 `FunctionSpec` 自动生成。如目标模型支持原生 function call,可省略大量提示词,改用 `functions` 参数。 +2. **多轮策略**:对不确定的响应,可以: + - 若模型返回 `none` 或 `insufficient_arguments`,让 router 回退到聊天或引导用户补全。 + - 对重要函数设置 `confirmation_prompt`,在参数缺失时自动追问。 +3. **上下文拼接**:继续使用 `MessageContext` 中的群聊消息、时间等信息,作为 LLM 输入的一部分。 +4. **安全校验**:对高风险函数(如“骂人”类)可增加 LLM 分类或黑名单过滤。 + +## 8. 测试计划 +### 8.1 单元测试 +- 为每个 handler 编写结构化入参测试,确保直接调用函数即能得到正确输出。 +- 为 schema 生成器写测试,保证 JSON schema 与模型字段同步。 + +### 8.2 集成测试 +- 对 `FunctionCallRouter` 构建伪造的 `MessageContext`,模拟关键场景:天气、提醒、新闻等。 +- Mock LLM 返回特定函数名和参数,验证 Router 行为正确。 +- 针对权限/Scope/need_at 校验写覆盖测试。 + +### 8.3 回归测试 +- 梳理历史日志,挑选典型输入,构建回归用例。 +- 增加脚本:读取样本输入 → 调用 router(跳过真实 LLM,直接指定函数)→ 核对输出。 + +### 8.4 线上灰度验证 +- 启用双写模式:新 router 实际处理,旧 router 记录判定结果但不执行,用于对比。 +- 制作监控面板(成功率、异常率、响应时间)。 + +## 9. 发布与回滚策略 +- 配置化开关(例如 `config.AI_ROUTER["enable_function_call"]`)。上线时默认灰度群聊,逐步扩大。 +- 保留旧命令表与 handler 至至少一个版本周期,确认无回滚需求后再彻底移除。 +- 出现问题时,关闭新开关,恢复 `CommandRouter` 行为,确保稳定。 + +## 10. 验收清单 +- [ ] 所有功能均在 `FUNCTION_REGISTRY` 中有唯一条目。 +- [ ] 每个函数的参数 schema 通过 `jsonschema.validate` 校验。 +- [ ] Handler 不再包含自然语言解析逻辑。 +- [ ] LLM 响应处理支持至少一种原生 function call 协议。 +- [ ] 所有单测、集测通过,回归样本验证通过。 +- [ ] 文档更新:新增功能如何注册、如何编写参数模型、如何调试。 + +--- + +> 按上述阶段实施,可在保持现有业务能力的前提下,将整个机器人指令体系迁移到统一的 Function Call 架构,实现更易维护、更稳定的工具调用体系。 diff --git a/function_call_next_steps.md b/function_call_next_steps.md new file mode 100644 index 0000000..4ff632c --- /dev/null +++ b/function_call_next_steps.md @@ -0,0 +1,61 @@ +# Function Call 架构改进指南 + +## 1. 架构一致性如何提升? +- **现状诊断** + - `function_calls/handlers.py:147` 等 handler 仍依赖 `commands/handlers.py` 中的正则/自然语言逻辑,导致新旧两套体系并存,增大耦合与维护成本。 + - `function_calls/router.py:41` 的直接匹配逻辑与 LLM 选择逻辑混在一起,职责边界不清晰;而参数校验仅在 LLM 分支执行(`function_calls/router.py:150`)。 + - 业务逻辑散落:例如新闻查询同时存在 `_get_news_info` 与命令 handler 版本,缺少统一 service 层。 +- **建议的架构骨架** + 1. **分层组织代码**: + - `function_calls/spec.py`:仅保留数据结构。 + - `function_calls/registry.py`:集中注册所有函数。 + - `function_calls/router.py`:只负责入口路由、模型互操作和参数校验。 + - `function_calls/services/`(新增目录):存放与业务相关的纯函数(例如天气、提醒、骂人等),对外只接受结构化参数。 + - `function_calls/handlers/`(可拆分模块):每个 handler 只做 (ctx, args) → 调 service → 组装 FunctionResult。 + 2. **统一入口**:`robot.py` 只初始化 `FunctionCallRouter`,其余旧路由移除,避免双写状态。 + 3. **约束约定**:所有 handler 必须声明 `args` 的 Pydantic 模型;禁止 handler 内再次解析自然语言或修改 `ctx.msg.content`。 + 4. **配置与日志**:为 `FunctionCallRouter` 添加统一的日志上下文(如 request_id),方便追踪函数调用链路。 + +## 2. 如何让 function 直接填参数、避免重复 LLM 解析? +- **目标**:路由阶段完成所有参数解析与校验,handler 全部消费结构化 `args`,不再依赖二次自然语言处理。 +- **改造步骤**: + 1. **强制类型转换**:在 `_create_args_instance` 后无论 direct/LLM 均执行 `args_type.model_validate`,并捕获校验错误,向用户返回提示。 + 2. **拆分提醒业务逻辑**: + - 编写 `function_calls/services/reminder.py`,内含 `create_reminder(ctx, ReminderArgs)` 等函数,直接调用 `ctx.robot.reminder_manager`。 + - 调整 `ReminderArgs` 为真正的结构字段(例如 `schedule: ScheduleArgs`),由 FunctionCallRouter/LLM 负责生成 JSON。 + - 对 direct 命令的需求,若要保留,可做一个轻量的自然语言 → `ReminderArgs` 的解析器,独立成 util,避免回写 `ctx.msg.content`。 + 3. **其他 handler 同理**: + - `insult`:直接调用封装好的 `generate_random_insult(target_user)`,不要构造 fake match(见 `function_calls/handlers.py:216`)。 + - `perplexity_search`:把 `query` 直接传给 service,service 负责与 `ctx.robot.perplexity` 交互。 + 4. **落地校验**:在 `function_calls/llm.py:123` 的决策结果里,若 schema 校验不通过,构建友好错误提示并返回 `FunctionResult`。 + 5. **测试**:为每个 handler 编写参数驱动的单测,例如 `test_reminder_set_structured_args` → 传 `ReminderArgs(time_spec="2024-05-01 15:00", content="开会")`,验证生成的提醒记录。 + +## 3. 目前改造是否达标?还需修什么? +- **仍存在的问题**: + - Handler 依赖旧命令逻辑(`function_calls/handlers.py:147`, `:189`, `:216`),说明“function 直接消费参数”的目标尚未实现。 + - 直接命令分支跳过 `validate_arguments`,导致 `ReminderArgs` 这类模型的必填字段不会被校验(`function_calls/router.py:118`)。 + - `FunctionResult.at_list` 虽已在 `_execute_function` 中 join,但数据类仍声明为 `list[str]`。如果后续有人直接调用 `ctx.send_text(message, result.at_list)` 将再次踩坑。建议统一改成 `str` 或封装发送逻辑。 + - 仍缺少针对新路由的自动化测试,仅有打印式脚本(`test_function_calls.py`)。建议补充 pytest 单测或集成测试。 +- **建议修复顺序**: + 1. 清理 handler 对旧命令的依赖,迁移业务逻辑到 service 层。 + 2. 在 router 中统一调用 `validate_arguments` → `_create_args_instance` → handler。 + 3. 更新 `FunctionResult` 类型定义,提供 helper 方法如 `result.send_via(ctx)` 集中处理消息发送。 + 4. 编写覆盖天气/新闻/提醒/骂人等核心流程的单测,确保 Function Call 路径稳定。 + +## 4. 只保留 Function Call,旧路由是否移除到位? +- **现状**:`robot.py:172-272` 仍初始化并调用 `CommandRouter`、`ai_router`,函数回退逻辑依旧存在。 +- **移除建议**: + 1. 删除 `self.command_router = CommandRouter(...)` 及相关 import;同时移除 `CommandRouter.dispatch` 调用与辅助日志。 + 2. 移除 `ai_router` 回退逻辑和配置项 `FUNCTION_CALL_ROUTER.fallback_to_legacy`。确保配置文件同步更新(`config.yaml.template:151`)。 + 3. 将闲聊 fallback 改为:当 `FunctionCallRouter` 返回 `False` 时直接走 `handle_chitchat`,并记录原因日志。 + 4. 清理不再使用的命令注册表与正则代码(`commands/registry.py`、`commands/router.py` 等),确认没有别的模块引用后可删。 + 5. 回归测试:运行原有功能用例,确保删除旧路由不会影响提醒、天气等功能;同时观察日志,确认不再出现“命令路由器”相关输出。 + +## 5. 推荐行动清单(按优先级) +1. **剥离 handler 对旧命令体系的依赖**:完成 service 层拆分,更新所有 handler 为结构化实现。 +2. **统一参数校验与错误返回**:调整 router 逻辑,新增校验失败提示,并完善 `FunctionResult` 类型。 +3. **移除旧路由与配置**:清理 `robot.py` 中的命令/AI 路由初始化与 fallback,更新配置模板。 +4. **补全测试**:为 Function Call 核心流程编写 pytest 单元与集成测试,覆盖 direct/LLM 两条路径。 +5. **整理文档**:更新开发文档,说明如何新增 function、如何编写参数模型与 service,确保团队成员按统一规范扩展功能。 + +执行完以上步骤后,你将拥有一套纯 Function Call、结构化且易维护的机器人指令体系,满足题述的四个目标。 diff --git a/function_call_review.md b/function_call_review.md new file mode 100644 index 0000000..cf45820 --- /dev/null +++ b/function_call_review.md @@ -0,0 +1,32 @@ +# Function Call 改造代码审核 + +## 阻塞问题(必须修复) +- **[已完成] FunctionResult 结果封装统一** + - 处理位置:`function_calls/spec.py:9`、`function_calls/router.py:195`、`commands/context.py:66`、`robot.py:296` + - 现状:结果模型已改为 `at: str` 并提供 `dispatch` 方法,路由器不再手动拼接 `@` 列表。`python3 -m compileall function_calls` 验证通过,群聊场景使用新的 `at` 字段。 + +- **[已完成] 提醒功能使用结构化参数** + - 处理位置:`function_calls/models.py:19`、`function_calls/services/reminder.py:36`、`function_calls/handlers.py:70`、`function_calls/router.py:70` + - 将 `ReminderArgs` 改为 `type/time/content/weekday` 等字段,移除了对旧 `commands.handlers` 的依赖,并删除路由层对提醒的直接自然语言拼装。 + +## 重要改进项 +- **[已完成] Handler 逻辑脱离旧命令体系** + - 现状:所有 handler 均迁移到 `function_calls/services`(天气、新闻、提醒、Perplexity、骂人、群工具等),不再篡改 `ctx.msg.content` 或调用旧命令模块。 + +- **[已完成] 直接命令路径参数校验** + - 现状:`function_calls/router.py:102-119` 对直接匹配的函数调用 `validate_arguments`,与 LLM 分支保持一致。 + +- **[已完成] FunctionSpec 类型标注同步** + - 现状:`function_calls/spec.py:27-34` 中的 `handler` 类型现为 `Callable[[MessageContext, Any], FunctionResult]`。 + +## 架构一致性评估 +- 当前所有功能均通过 Function Call 服务层完成,提醒/骂人/搜索等不再依赖自然语言解析。 +- LLM 适配层维持兼容,必要时可扩展 jsonschema 校验和重试策略。 +- `robot.py:163-231` 仅初始化和调用 `FunctionCallRouter`,旧的命令/AI 路由器已移除,配置项也同步精简。 + +## 建议的下一步 +1. 扩充 `function_calls/services` 层的单元测试(例如提醒设置、Perplexity fallback 等),确保服务纯函数行为稳定。 +2. 若后续新增工具函数,遵循 `FunctionResult` + service 的模式,并及时更新 `FUNCTION_CALL_USAGE.md`。 +3. 观察线上日志,确认精简后的路由无遗漏场景;如需更多指令,优先在 direct-match 表中补充结构化参数生成逻辑。 + +如按以上步骤推进,可逐步达到“标准 Function Call 模式”预期:所有工具能力通过结构化 schema 暴露,handler 仅消费结构化参数,无需再回退自然语言解析。 diff --git a/function_calls/__init__.py b/function_calls/__init__.py new file mode 100644 index 0000000..9cab715 --- /dev/null +++ b/function_calls/__init__.py @@ -0,0 +1,3 @@ +""" +Function Call 系统核心模块 +""" \ No newline at end of file diff --git a/function_calls/handlers.py b/function_calls/handlers.py new file mode 100644 index 0000000..6f314ed --- /dev/null +++ b/function_calls/handlers.py @@ -0,0 +1,180 @@ +"""Function Call handlers built on top of structured services.""" +from __future__ import annotations + +import logging +from commands.context import MessageContext + +from .models import ( + WeatherArgs, + NewsArgs, + ReminderArgs, + ReminderListArgs, + ReminderDeleteArgs, + PerplexityArgs, + HelpArgs, + SummaryArgs, + ClearMessagesArgs, + InsultArgs, +) +from .registry import tool_function +from .spec import FunctionResult +from .services import ( + build_help_text, + build_insult, + clear_group_messages, + create_reminder, + delete_reminder, + get_news_digest, + get_weather_report, + list_reminders, + run_perplexity, + summarize_messages, +) + +logger = logging.getLogger(__name__) + + +@tool_function( + name="weather_query", + description="查询城市天气预报", + examples=["北京天气怎么样", "上海天气", "深圳明天会下雨吗"], + scope="both", + require_at=True, +) +def handle_weather(ctx: MessageContext, args: WeatherArgs) -> FunctionResult: + result = get_weather_report(args.city) + at = ctx.msg.sender if ctx.is_group else "" + return FunctionResult(handled=True, messages=[result.message], at=at if at else "") + + +@tool_function( + name="news_query", + description="获取今日新闻", + examples=["看看今天的新闻", "今日要闻", "新闻"], + scope="both", + require_at=True, +) +def handle_news(ctx: MessageContext, args: NewsArgs) -> FunctionResult: + result = get_news_digest() + at = ctx.msg.sender if ctx.is_group else "" + return FunctionResult(handled=True, messages=[result.message], at=at if at else "") + + +@tool_function( + name="reminder_set", + description="设置提醒", + examples=["提醒我明天下午3点开会", "每天早上8点提醒我吃早餐"], + scope="both", + require_at=True, +) +def handle_reminder_set(ctx: MessageContext, args: ReminderArgs) -> FunctionResult: + manager = getattr(ctx.robot, 'reminder_manager', None) + at = ctx.msg.sender if ctx.is_group else "" + if not manager: + return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at) + + service_result = create_reminder( + manager=manager, + sender_wxid=ctx.msg.sender, + data=args.model_dump(), + roomid=ctx.msg.roomid if ctx.is_group else None, + ) + return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "") + + +@tool_function( + name="reminder_list", + description="查看所有提醒", + examples=["查看我的提醒", "我有哪些提醒", "提醒列表"], + scope="both", + require_at=True, +) +def handle_reminder_list(ctx: MessageContext, args: ReminderListArgs) -> FunctionResult: + manager = getattr(ctx.robot, 'reminder_manager', None) + at = ctx.msg.sender if ctx.is_group else "" + if not manager: + return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at) + + service_result = list_reminders(manager, ctx.msg.sender, ctx.all_contacts) + return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "") + + +@tool_function( + name="reminder_delete", + description="删除提醒", + examples=["删除开会的提醒", "取消明天的提醒"], + scope="both", + require_at=True, +) +def handle_reminder_delete(ctx: MessageContext, args: ReminderDeleteArgs) -> FunctionResult: + manager = getattr(ctx.robot, 'reminder_manager', None) + at = ctx.msg.sender if ctx.is_group else "" + if not manager: + return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at) + + service_result = delete_reminder(manager, ctx.msg.sender, args.reminder_id) + return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "") + + +@tool_function( + name="perplexity_search", + description="使用Perplexity进行深度搜索查询", + examples=["搜索Python最新特性", "查查机器学习教程", "ask什么是量子计算"], + scope="both", + require_at=True, +) +def handle_perplexity_search(ctx: MessageContext, args: PerplexityArgs) -> FunctionResult: + service_result = run_perplexity(ctx, args.query) + if service_result.handled_externally: + return FunctionResult(handled=True, messages=[]) + + at = ctx.msg.sender if ctx.is_group else "" + return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "") + + +@tool_function( + name="help", + description="显示机器人帮助信息", + examples=["help", "帮助", "指令"], + scope="both", + require_at=False, +) +def handle_help(ctx: MessageContext, args: HelpArgs) -> FunctionResult: + help_text = build_help_text() + return FunctionResult(handled=True, messages=[help_text]) + + +@tool_function( + name="summary", + description="总结群聊最近的消息", + examples=["summary", "总结"], + scope="group", + require_at=True, +) +def handle_summary(ctx: MessageContext, args: SummaryArgs) -> FunctionResult: + result = summarize_messages(ctx) + return FunctionResult(handled=True, messages=[result.message]) + + +@tool_function( + name="clear_messages", + description="清除群聊历史消息记录", + examples=["clearmessages", "清除历史"], + scope="group", + require_at=True, +) +def handle_clear_messages(ctx: MessageContext, args: ClearMessagesArgs) -> FunctionResult: + result = clear_group_messages(ctx) + return FunctionResult(handled=True, messages=[result.message]) + + +@tool_function( + name="insult", + description="骂指定用户(仅限群聊)", + examples=["骂一下@某人"], + scope="group", + require_at=True, +) +def handle_insult(ctx: MessageContext, args: InsultArgs) -> FunctionResult: + result = build_insult(ctx, args.target_user) + return FunctionResult(handled=True, messages=[result.message]) diff --git a/function_calls/init_handlers.py b/function_calls/init_handlers.py new file mode 100644 index 0000000..2bf61fd --- /dev/null +++ b/function_calls/init_handlers.py @@ -0,0 +1,10 @@ +""" +初始化所有Function Call处理器 +导入这个模块会自动注册所有处理器到全局注册表 +""" +from . import handlers # 导入handlers模块会自动执行所有装饰器注册 + +# 可以在这里添加一些初始化日志 +import logging +logger = logging.getLogger(__name__) +logger.info("Function Call 处理器初始化完成") \ No newline at end of file diff --git a/function_calls/llm.py b/function_calls/llm.py new file mode 100644 index 0000000..35f6036 --- /dev/null +++ b/function_calls/llm.py @@ -0,0 +1,263 @@ +"""LLM function-call orchestration utilities.""" +from __future__ import annotations + +import json +import logging +import re +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional + +from commands.context import MessageContext + +from .spec import FunctionResult, FunctionSpec + +logger = logging.getLogger(__name__) + + +@dataclass +class LLMRunResult: + """Result of the LLM routing pipeline.""" + + handled: bool + final_response: Optional[str] = None + error: Optional[str] = None + + +class FunctionCallLLM: + """Coordinate function-call capable models with router handlers.""" + + def __init__(self, max_function_rounds: int = 5) -> None: + self.logger = logger + self.max_function_rounds = max_function_rounds + + def run( + self, + ctx: MessageContext, + functions: Dict[str, FunctionSpec], + executor: Callable[[FunctionSpec, Dict[str, Any]], FunctionResult], + formatter: Callable[[FunctionResult], str], + ) -> LLMRunResult: + """Execute the function-call loop and return the final assistant response.""" + if not ctx.text: + return LLMRunResult(handled=False) + + chat_model = getattr(ctx, "chat", None) + if not chat_model and ctx.robot: + chat_model = getattr(ctx.robot, "chat", None) + + if not chat_model: + self.logger.error("无可用的AI模型") + return LLMRunResult(handled=False, error="no_model") + + try: + if hasattr(chat_model, "call_with_functions"): + return self._run_native_loop(ctx, chat_model, functions, executor, formatter) + return self._run_prompt_loop(ctx, chat_model, functions, executor) + except Exception as exc: # pragma: no cover - safeguard + self.logger.error(f"LLM 调用失败: {exc}") + return LLMRunResult(handled=False, error=str(exc)) + + # --------------------------------------------------------------------- + # Native function-call workflow + # --------------------------------------------------------------------- + + def _run_native_loop( + self, + ctx: MessageContext, + chat_model: Any, + functions: Dict[str, FunctionSpec], + executor: Callable[[FunctionSpec, Dict[str, Any]], FunctionResult], + formatter: Callable[[FunctionResult], str], + ) -> LLMRunResult: + openai_functions = self._build_functions_for_openai(functions) + messages: List[Dict[str, Any]] = [] + + system_prompt = ( + "You are an assistant that can call tools. " + "When you invoke a function, wait for the tool response before replying to the user. " + "Only deliver a final answer once you have enough information." + ) + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": ctx.text}) + + for round_index in range(self.max_function_rounds): + response = chat_model.call_with_functions( + messages=messages, + functions=openai_functions, + wxid=ctx.get_receiver(), + ) + + if not getattr(response, "choices", None): + self.logger.warning("函数调用返回空响应") + return LLMRunResult(handled=False) + + message = response.choices[0].message + assistant_entry = self._convert_assistant_message(message) + messages.append(assistant_entry) + + tool_calls = getattr(message, "tool_calls", None) or [] + if tool_calls: + for tool_call in tool_calls: + function_name = tool_call.function.name + if function_name not in functions: + self.logger.warning(f"模型请求未知函数: {function_name}") + tool_content = json.dumps( + { + "handled": False, + "messages": [f"Unknown function: {function_name}"], + "metadata": {"error": "unknown_function"}, + }, + ensure_ascii=False, + ) + else: + try: + arguments = json.loads(tool_call.function.arguments or "{}") + except json.JSONDecodeError: + arguments = {} + spec = functions[function_name] + result = executor(spec, arguments) + tool_content = formatter(result) + messages.append( + { + "role": "tool", + "tool_call_id": tool_call.id, + "content": tool_content, + } + ) + continue + + # 没有工具调用,认为模型给出了最终回答 + final_content = message.content or "" + return LLMRunResult(handled=True, final_response=final_content) + + self.logger.warning("达到最大函数调用轮数,未得到最终回答") + return LLMRunResult(handled=False, error="max_rounds") + + # --------------------------------------------------------------------- + # Prompt-based fallback workflow + # --------------------------------------------------------------------- + + def _run_prompt_loop( + self, + ctx: MessageContext, + chat_model: Any, + functions: Dict[str, FunctionSpec], + executor: Callable[[FunctionSpec, Dict[str, Any]], FunctionResult], + ) -> LLMRunResult: + system_prompt = self._build_prompt_system_text(functions) + user_input = f"用户输入:{ctx.text}" + + ai_response = chat_model.get_answer( + user_input, + wxid=ctx.get_receiver(), + system_prompt_override=system_prompt, + ) + + json_match = re.search(r"\{.*\}", ai_response, re.DOTALL) + if not json_match: + self.logger.warning(f"提示词模式下无法解析JSON: {ai_response}") + return LLMRunResult(handled=False) + + try: + decision = json.loads(json_match.group(0)) + except json.JSONDecodeError as exc: + self.logger.error(f"提示词模式 JSON 解析失败: {exc}") + return LLMRunResult(handled=False) + + action_type = decision.get("action_type") + if action_type == "chat": + # 提示词模式下无法获得模型最终回答,交给上层兜底 + return LLMRunResult(handled=False) + + if action_type != "function": + self.logger.warning(f"未知的action_type: {action_type}") + return LLMRunResult(handled=False) + + function_name = decision.get("function_name") + if function_name not in functions: + self.logger.warning(f"未知的功能名 - {function_name}") + return LLMRunResult(handled=False) + + arguments = decision.get("arguments", {}) + result = executor(functions[function_name], arguments) + if not result.handled: + return LLMRunResult(handled=False) + + return LLMRunResult(handled=True, final_response="\n".join(result.messages)) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _convert_assistant_message(message: Any) -> Dict[str, Any]: + entry: Dict[str, Any] = { + "role": "assistant", + "content": message.content, + } + tool_calls = getattr(message, "tool_calls", None) + if tool_calls: + entry["tool_calls"] = [] + for tool_call in tool_calls: + entry["tool_calls"].append( + { + "id": tool_call.id, + "type": "function", + "function": { + "name": tool_call.function.name, + "arguments": tool_call.function.arguments, + }, + } + ) + return entry + + @staticmethod + def _build_functions_for_openai(functions: Dict[str, FunctionSpec]) -> List[Dict[str, Any]]: + openai_functions = [] + for spec in functions.values(): + openai_functions.append( + { + "name": spec.name, + "description": spec.description, + "parameters": spec.parameters_schema, + } + ) + return openai_functions + + @staticmethod + def _build_prompt_system_text(functions: Dict[str, FunctionSpec]) -> str: + prompt = """你是一个智能路由助手。根据用户输入判断是否需要调用以下函数之一。""" + for spec in functions.values(): + prompt += f"\n- {spec.name}: {spec.description}" + prompt += """ +请严格输出JSON:{"action_type": "chat"} 或 {"action_type": "function", "function_name": "...", "arguments": {...}} +""" + return prompt + + def validate_arguments(self, arguments: Dict[str, Any], schema: Dict[str, Any]) -> bool: + try: + required_fields = schema.get("required", []) + properties = schema.get("properties", {}) + + for field in required_fields: + if field not in arguments: + self.logger.warning(f"缺少必需参数: {field}") + return False + + for field, value in arguments.items(): + if field not in properties: + continue + expected_type = properties[field].get("type") + if expected_type == "string" and not isinstance(value, str): + self.logger.warning(f"参数 {field} 类型不正确,期望 string,得到 {type(value)}") + return False + if expected_type == "integer" and not isinstance(value, int): + self.logger.warning(f"参数 {field} 类型不正确,期望 integer,得到 {type(value)}") + return False + if expected_type == "number" and not isinstance(value, (int, float)): + self.logger.warning(f"参数 {field} 类型不正确,期望 number,得到 {type(value)}") + return False + return True + except Exception as exc: + self.logger.error(f"参数验证失败: {exc}") + return False diff --git a/function_calls/models.py b/function_calls/models.py new file mode 100644 index 0000000..87bada2 --- /dev/null +++ b/function_calls/models.py @@ -0,0 +1,61 @@ +""" +Function Call 参数模型定义 +""" +from typing import Literal, Optional + +from pydantic import BaseModel + + +class WeatherArgs(BaseModel): + """天气查询参数""" + city: str + + +class NewsArgs(BaseModel): + """新闻查询参数 - 无需参数""" + pass + + +class ReminderArgs(BaseModel): + """设置提醒参数""" + + type: Literal["once", "daily", "weekly"] + time: str + content: str + weekday: Optional[int] = None + + +class ReminderListArgs(BaseModel): + """查看提醒列表参数 - 无需参数""" + pass + + +class ReminderDeleteArgs(BaseModel): + """删除提醒参数""" + + reminder_id: str + + +class PerplexityArgs(BaseModel): + """Perplexity搜索参数""" + query: str + + +class HelpArgs(BaseModel): + """帮助信息参数 - 无需参数""" + pass + + +class SummaryArgs(BaseModel): + """消息总结参数 - 无需参数""" + pass + + +class ClearMessagesArgs(BaseModel): + """清除消息参数 - 无需参数""" + pass + + +class InsultArgs(BaseModel): + """骂人功能参数""" + target_user: str diff --git a/function_calls/registry.py b/function_calls/registry.py new file mode 100644 index 0000000..7a180d5 --- /dev/null +++ b/function_calls/registry.py @@ -0,0 +1,108 @@ +""" +函数注册中心 +""" +import logging +from typing import Dict, Any, get_type_hints +from pydantic import BaseModel + +from .spec import FunctionSpec + +logger = logging.getLogger(__name__) + +# 全局函数注册表 +FUNCTION_REGISTRY: Dict[str, FunctionSpec] = {} + + +def register_function(spec: FunctionSpec) -> None: + """注册函数到全局注册表""" + if spec.name in FUNCTION_REGISTRY: + raise ValueError(f"重复的函数名: {spec.name}") + FUNCTION_REGISTRY[spec.name] = spec + logger.info(f"注册函数: {spec.name} - {spec.description}") + + +def get_function(name: str) -> FunctionSpec: + """获取指定名称的函数规格""" + if name not in FUNCTION_REGISTRY: + raise ValueError(f"未找到函数: {name}") + return FUNCTION_REGISTRY[name] + + +def list_functions() -> Dict[str, FunctionSpec]: + """获取所有已注册的函数""" + return FUNCTION_REGISTRY.copy() + + +def build_schema_from_model(model_class) -> Dict[str, Any]: + """从 Pydantic 模型构建 JSON Schema""" + if issubclass(model_class, BaseModel): + return model_class.model_json_schema() + else: + # 简单的dataclass或类型注解支持 + hints = get_type_hints(model_class) + properties = {} + required = [] + + for field_name, field_type in hints.items(): + if field_name.startswith('_'): + continue + + properties[field_name] = _type_to_schema(field_type) + required.append(field_name) + + return { + "type": "object", + "properties": properties, + "required": required + } + + +def _type_to_schema(field_type) -> Dict[str, Any]: + """将Python类型转换为JSON Schema""" + if field_type == str: + return {"type": "string"} + elif field_type == int: + return {"type": "integer"} + elif field_type == float: + return {"type": "number"} + elif field_type == bool: + return {"type": "boolean"} + else: + return {"type": "string", "description": f"类型: {field_type}"} + + +def tool_function(name: str, description: str, examples: list[str] = None, **meta): + """ + 装饰器:自动注册函数到Function Call系统 + + @tool_function( + name="weather_query", + description="查询天气", + examples=["北京天气怎么样"] + ) + def handle_weather(ctx: MessageContext, args: WeatherArgs) -> FunctionResult: + pass + """ + def wrapper(func): + # 获取函数参数类型注解 + hints = get_type_hints(func) + args_type = hints.get('args') + + if args_type: + schema = build_schema_from_model(args_type) + else: + schema = {"type": "object", "properties": {}, "required": []} + + spec = FunctionSpec( + name=name, + description=description, + parameters_schema=schema, + handler=func, + examples=examples or [], + **meta + ) + + register_function(spec) + return func + + return wrapper \ No newline at end of file diff --git a/function_calls/router.py b/function_calls/router.py new file mode 100644 index 0000000..c94decd --- /dev/null +++ b/function_calls/router.py @@ -0,0 +1,227 @@ +"""Function Call 路由器""" + +import logging +from typing import Any, Dict, Optional + +from commands.context import MessageContext + +from .spec import FunctionResult, FunctionSpec +from .registry import list_functions +from .llm import FunctionCallLLM + +logger = logging.getLogger(__name__) + + +class FunctionCallRouter: + """函数调用路由器""" + + def __init__(self, robot_instance=None): + self.robot_instance = robot_instance + self.llm = FunctionCallLLM() + self.logger = logger + + def _check_scope_and_permissions(self, ctx: MessageContext, spec: FunctionSpec) -> bool: + """检查作用域和权限""" + # 1. 检查作用域 + if spec.scope != "both": + if (spec.scope == "group" and not ctx.is_group) or \ + (spec.scope == "private" and ctx.is_group): + return False + + # 2. 检查是否需要@机器人(仅在群聊中有效) + if ctx.is_group and spec.require_at and not ctx.is_at_bot: + return False + + # 3. 检查权限(如果有auth字段) + if spec.auth: + # TODO: 实现权限检查逻辑 + pass + + return True + + def _try_direct_command_match(self, ctx: MessageContext) -> Optional[str]: + """ + 尝试直接命令匹配,避免不必要的LLM调用 + + 返回匹配的函数名,如果没有匹配则返回None + """ + text = ctx.text.strip().lower() + + # 定义一些明确的命令关键字映射 + direct_commands = { + "help": "help", + "帮助": "help", + "指令": "help", + "新闻": "news_query", + "summary": "summary", + "总结": "summary", + "clearmessages": "clear_messages", + "清除历史": "clear_messages" + } + + # 检查完全匹配 + if text in direct_commands: + return direct_commands[text] + + # 检查以特定前缀开头的命令 + if text.startswith("ask ") and len(text) > 4: + return "perplexity_search" + + if text.startswith("天气") or text.startswith("天气预报"): + return "weather_query" + + if text in ["查看提醒", "我的提醒", "提醒列表"]: + return "reminder_list" + + if text.startswith("骂一下"): + return "insult" + + return None + + def dispatch(self, ctx: MessageContext) -> bool: + """ + 分发消息到函数处理器 + + 返回: 是否成功处理 + """ + try: + # 确保context可以访问到robot实例 + if self.robot_instance and not ctx.robot: + ctx.robot = self.robot_instance + if hasattr(self.robot_instance, 'LOG') and not ctx.logger: + ctx.logger = self.robot_instance.LOG + + if ctx.logger: + ctx.logger.debug(f"FunctionCallRouter 开始处理: '{ctx.text}', 来自: {ctx.sender_name}") + + # 获取所有可用函数 + functions = list_functions() + if not functions: + self.logger.warning("没有注册任何函数") + return False + + # 第一步:尝试直接命令匹配 + direct_function = self._try_direct_command_match(ctx) + if direct_function and direct_function in functions: + spec = functions[direct_function] + + if not self._check_scope_and_permissions(ctx, spec): + return False + + arguments = self._extract_arguments_for_direct_command(ctx, direct_function) + if not self.llm.validate_arguments(arguments, spec.parameters_schema): + self.logger.warning(f"直接命令 {direct_function} 参数验证失败") + return False + + result = self._invoke_function(ctx, spec, arguments) + if result.handled: + result.dispatch(ctx) + return True + # 如果没有处理成功,继续尝试LLM流程 + + # 第二步:使用LLM执行多轮函数调用 + llm_result = self.llm.run( + ctx, + functions, + lambda spec, args: self._invoke_function(ctx, spec, args), + self._format_tool_response, + ) + + if not llm_result.handled: + return False + + if llm_result.final_response: + at = ctx.msg.sender if ctx.is_group else "" + ctx.send_text(llm_result.final_response, at) + return True + + return True + + except Exception as e: + self.logger.error(f"FunctionCallRouter dispatch 异常: {e}") + return False + + def _extract_arguments_for_direct_command(self, ctx: MessageContext, function_name: str) -> Dict[str, Any]: + """为直接命令提取参数""" + text = ctx.text.strip() + + if function_name == "weather_query": + # 提取城市名 + if text.startswith("天气预报 "): + city = text[4:].strip() + elif text.startswith("天气 "): + city = text[3:].strip() + else: + city = "" + return {"city": city} + + elif function_name == "perplexity_search": + # 提取搜索查询 + if text.startswith("ask "): + query = text[4:].strip() + else: + query = text + return {"query": query} + + elif function_name == "insult": + # 提取要骂的用户 + import re + match = re.search(r"骂一下\s*@([^\s@]+)", text) + target_user = match.group(1) if match else "" + return {"target_user": target_user} + + # 对于不需要参数的函数,返回空字典 + return {} + + def _invoke_function(self, ctx: MessageContext, spec: FunctionSpec, arguments: Dict[str, Any]) -> FunctionResult: + """调用函数处理器,返回结构化结果""" + try: + if ctx.logger: + ctx.logger.info(f"执行函数: {spec.name}, 参数: {arguments}") + + args_instance = self._create_args_instance(spec, arguments) + result = spec.handler(ctx, args_instance) + + if not isinstance(result, FunctionResult): + raise TypeError(f"函数 {spec.name} 返回了非 FunctionResult 类型: {type(result)}") + + if ctx.logger and not result.handled: + ctx.logger.warning(f"函数 {spec.name} 返回未处理状态") + + return result + + except Exception as exc: + self.logger.error(f"执行函数 {spec.name} 异常: {exc}") + return FunctionResult( + handled=False, + messages=[f"函数 {spec.name} 执行失败: {exc}"], + metadata={"error": str(exc)}, + ) + + def _create_args_instance(self, spec: FunctionSpec, arguments: Dict[str, Any]): + """根据函数规格创建参数实例""" + try: + # 获取函数的类型注解 + from typing import get_type_hints + hints = get_type_hints(spec.handler) + args_type = hints.get('args') + + if args_type: + # 如果是Pydantic模型 + if hasattr(args_type, 'model_validate'): + return args_type.model_validate(arguments) + elif hasattr(args_type, '__init__'): + # 普通类 + return args_type(**arguments) + + # 如果没有类型注解,返回参数字典 + return arguments + + except Exception as exc: + self.logger.error(f"创建参数实例失败: {exc}") + raise + + @staticmethod + def _format_tool_response(result: FunctionResult) -> str: + """将 FunctionResult 格式化为供 LLM 读取的 tool 响应""" + return result.to_tool_content() diff --git a/function_calls/services/__init__.py b/function_calls/services/__init__.py new file mode 100644 index 0000000..c340c18 --- /dev/null +++ b/function_calls/services/__init__.py @@ -0,0 +1,22 @@ +"""Service helpers for Function Call handlers.""" + +from .weather import get_weather_report +from .news import get_news_digest +from .reminder import create_reminder, list_reminders, delete_reminder +from .help import build_help_text +from .group_tools import summarize_messages, clear_group_messages +from .perplexity import run_perplexity +from .insult import build_insult + +__all__ = [ + "get_weather_report", + "get_news_digest", + "create_reminder", + "list_reminders", + "delete_reminder", + "build_help_text", + "summarize_messages", + "clear_group_messages", + "run_perplexity", + "build_insult", +] diff --git a/function_calls/services/group_tools.py b/function_calls/services/group_tools.py new file mode 100644 index 0000000..7650391 --- /dev/null +++ b/function_calls/services/group_tools.py @@ -0,0 +1,47 @@ +"""Group related utilities for Function Call handlers.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from commands.context import MessageContext + + +@dataclass +class GroupToolResult: + success: bool + message: str + + +def summarize_messages(ctx: MessageContext) -> GroupToolResult: + if not ctx.is_group: + return GroupToolResult(success=True, message="⚠️ 消息总结功能仅支持群聊") + + if not ctx.robot or not hasattr(ctx.robot, "message_summary") or not hasattr(ctx.robot, "chat"): + return GroupToolResult(success=False, message="⚠️ 消息总结功能不可用") + + try: + summary = ctx.robot.message_summary.summarize_messages(ctx.msg.roomid, ctx.robot.chat) + return GroupToolResult(success=True, message=summary) + except Exception as exc: + if ctx.logger: + ctx.logger.error(f"生成消息总结出错: {exc}") + return GroupToolResult(success=False, message="⚠️ 生成消息总结失败") + + +def clear_group_messages(ctx: MessageContext) -> GroupToolResult: + if not ctx.is_group: + return GroupToolResult(success=True, message="⚠️ 消息历史管理功能仅支持群聊") + + if not ctx.robot or not hasattr(ctx.robot, "message_summary"): + return GroupToolResult(success=False, message="⚠️ 消息历史管理功能不可用") + + try: + cleared = ctx.robot.message_summary.clear_message_history(ctx.msg.roomid) + if cleared: + return GroupToolResult(success=True, message="✅ 已清除本群的消息历史记录") + return GroupToolResult(success=True, message="⚠️ 本群没有消息历史记录") + except Exception as exc: + if ctx.logger: + ctx.logger.error(f"清除消息历史出错: {exc}") + return GroupToolResult(success=False, message="⚠️ 清除消息历史失败") diff --git a/function_calls/services/help.py b/function_calls/services/help.py new file mode 100644 index 0000000..3de91db --- /dev/null +++ b/function_calls/services/help.py @@ -0,0 +1,34 @@ +"""Static help text utility.""" +from __future__ import annotations + +HELP_LINES = [ + "🤖 泡泡的指令列表 🤖", + "", + "【实用工具】", + "- 天气/温度 [城市名]", + "- 天气预报/预报 [城市名]", + "- 新闻", + "- ask [问题]", + "", + "【决斗 & 偷袭】", + "- 决斗@XX", + "- 偷袭@XX", + "- 决斗排行/排行榜", + "- 我的战绩/决斗战绩", + "- 我的装备/查看装备", + "- 改名 [旧名] [新名]", + "", + "【提醒】", + "- 提醒xxxxx:一次性、每日、每周", + "- 查看提醒/我的提醒/提醒列表", + "- 删..提醒..", + "", + "【群聊工具】", + "- summary/总结", + "- clearmessages/清除历史", +] + + +def build_help_text() -> str: + """Return formatted help text.""" + return "\n".join(HELP_LINES) diff --git a/function_calls/services/insult.py b/function_calls/services/insult.py new file mode 100644 index 0000000..ff2b4da --- /dev/null +++ b/function_calls/services/insult.py @@ -0,0 +1,50 @@ +"""Group insult helper utilities.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from commands.context import MessageContext +from function.func_insult import generate_random_insult + + +@dataclass +class InsultResult: + success: bool + message: str + + +def build_insult(ctx: MessageContext, target_name: str) -> InsultResult: + if not ctx.is_group: + return InsultResult(success=True, message="❌ 骂人功能只支持群聊哦~") + + cleaned = target_name.strip() + if not cleaned: + return InsultResult(success=False, message="❌ 需要提供要骂的对象") + + actual_target = cleaned + target_wxid: Optional[str] = None + + try: + members = ctx.room_members + if members: + for wxid, name in members.items(): + if cleaned == name: + target_wxid = wxid + actual_target = name + break + if target_wxid is None: + for wxid, name in members.items(): + if cleaned in name and wxid != ctx.robot_wxid: + target_wxid = wxid + actual_target = name + break + except Exception as exc: + if ctx.logger: + ctx.logger.error(f"查找群成员信息时出错: {exc}") + + if target_wxid and target_wxid == ctx.robot_wxid: + return InsultResult(success=True, message="😅 不行,我不能骂我自己。") + + insult_text = generate_random_insult(actual_target) + return InsultResult(success=True, message=insult_text) diff --git a/function_calls/services/news.py b/function_calls/services/news.py new file mode 100644 index 0000000..bbc0a93 --- /dev/null +++ b/function_calls/services/news.py @@ -0,0 +1,36 @@ +"""News related service helpers for Function Call handlers.""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Optional + +from function.func_news import News + + +logger = logging.getLogger(__name__) + + +@dataclass +class NewsResult: + success: bool + message: str + is_today: Optional[bool] = None + + +def get_news_digest() -> NewsResult: + """Fetch latest news digest.""" + try: + news_instance = News() + is_today, content = news_instance.get_important_news() + if is_today: + message = f"📰 今日要闻来啦:\n{content}" + else: + if content: + message = f"ℹ️ 今日新闻暂未发布,为您找到最近的一条新闻:\n{content}" + else: + message = "❌ 获取新闻失败,请稍后重试" + return NewsResult(success=True, message=message, is_today=is_today) + except Exception as exc: + logger.error(f"获取新闻失败: {exc}") + return NewsResult(success=False, message="❌ 获取新闻时发生错误") diff --git a/function_calls/services/perplexity.py b/function_calls/services/perplexity.py new file mode 100644 index 0000000..571bc55 --- /dev/null +++ b/function_calls/services/perplexity.py @@ -0,0 +1,62 @@ +"""Perplexity integration helpers.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import List + +from commands.context import MessageContext + + +@dataclass +class PerplexityResult: + success: bool + messages: List[str] + handled_externally: bool = False + + +def run_perplexity(ctx: MessageContext, query: str) -> PerplexityResult: + query = query.strip() + if not query: + at = ctx.msg.sender if ctx.is_group else "" + return PerplexityResult(success=True, messages=["请告诉我你想搜索什么内容"], handled_externally=False) + + perplexity_instance = getattr(ctx.robot, 'perplexity', None) + if not perplexity_instance: + return PerplexityResult(success=True, messages=["❌ Perplexity搜索功能当前不可用"], handled_externally=False) + + content_for_perplexity = f"ask {query}" + chat_id = ctx.get_receiver() + sender_wxid = ctx.msg.sender + room_id = ctx.msg.roomid if ctx.is_group else None + + was_handled, fallback_prompt = perplexity_instance.process_message( + content=content_for_perplexity, + chat_id=chat_id, + sender=sender_wxid, + roomid=room_id, + from_group=ctx.is_group, + send_text_func=ctx.send_text + ) + + if was_handled: + return PerplexityResult(success=True, messages=[], handled_externally=True) + + if fallback_prompt: + chat_model = getattr(ctx, 'chat', None) or (getattr(ctx.robot, 'chat', None) if ctx.robot else None) + if chat_model: + try: + import time + current_time = time.strftime("%H:%M", time.localtime()) + formatted_question = f"[{current_time}] {ctx.sender_name}: {query}" + answer = chat_model.get_answer( + question=formatted_question, + wxid=ctx.get_receiver(), + system_prompt_override=fallback_prompt + ) + if answer: + return PerplexityResult(success=True, messages=[answer], handled_externally=False) + except Exception as exc: + if ctx.logger: + ctx.logger.error(f"默认AI处理失败: {exc}") + + return PerplexityResult(success=True, messages=["❌ Perplexity搜索时发生错误"], handled_externally=False) diff --git a/function_calls/services/reminder.py b/function_calls/services/reminder.py new file mode 100644 index 0000000..0067ef0 --- /dev/null +++ b/function_calls/services/reminder.py @@ -0,0 +1,101 @@ +"""Reminder related services for Function Call handlers.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from function.func_reminder import ReminderManager + + +@dataclass +class ReminderServiceResult: + success: bool + messages: List[str] + + +_WEEKDAY_LABELS = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] +_TYPE_LABELS = {"once": "一次性", "daily": "每日", "weekly": "每周"} + + +def _format_schedule(data: Dict[str, Any]) -> str: + reminder_type = data.get("type", "once") + time_str = data.get("time", "?") + + if reminder_type == "once": + return f"{time_str} (一次性)" + if reminder_type == "daily": + return f"每天 {time_str}" + if reminder_type == "weekly": + weekday = data.get("weekday") + if isinstance(weekday, int) and 0 <= weekday < len(_WEEKDAY_LABELS): + return f"每周{_WEEKDAY_LABELS[weekday]} {time_str}" + return f"每周 {time_str}" + return f"{time_str}" + + +def create_reminder( + manager: ReminderManager, + sender_wxid: str, + data: Dict[str, Any], + roomid: Optional[str] +) -> ReminderServiceResult: + payload = { + "type": data["type"], + "time": data["time"], + "content": data["content"], + } + if data.get("weekday") is not None: + payload["weekday"] = data["weekday"] + + success, info = manager.add_reminder(sender_wxid, payload, roomid=roomid) + if not success: + return ReminderServiceResult(success=False, messages=[f"❌ 设置提醒失败:{info}"]) + + schedule = payload.copy() + message = ( + "✅ 已为您设置{type_label}提醒\n" + "时间: {schedule}\n" + "内容: {content}" + ).format( + type_label=_TYPE_LABELS.get(payload["type"], ""), + schedule=_format_schedule(payload), + content=payload["content"], + ) + return ReminderServiceResult(success=True, messages=[message]) + + +def list_reminders( + manager: ReminderManager, + sender_wxid: str, + contacts: Dict[str, str] +) -> ReminderServiceResult: + reminders = manager.list_reminders(sender_wxid) + if not reminders: + return ReminderServiceResult(success=True, messages=["您还没有设置任何提醒。"]) + + lines: List[str] = ["📝 您设置的提醒列表(包括私聊和群聊):"] + for idx, reminder in enumerate(reminders, start=1): + schedule_display = _format_schedule({ + "type": reminder.get("type"), + "time": reminder.get("time_str"), + "weekday": reminder.get("weekday"), + }) + if reminder.get("type") == "once": + schedule_display = reminder.get("time_str", "?") + scope = "[私聊]" + roomid = reminder.get("roomid") + if roomid: + room_name = contacts.get(roomid) or roomid[:8] + scope = f"[群:{room_name}]" + lines.append( + f"{idx}. [ID: {reminder.get('id', '')[:6]}] {scope}{schedule_display}: {reminder.get('content', '')}" + ) + + return ReminderServiceResult(success=True, messages=["\n".join(lines)]) + + +def delete_reminder(manager: ReminderManager, sender_wxid: str, reminder_id: str) -> ReminderServiceResult: + success, info = manager.delete_reminder(sender_wxid, reminder_id) + if success: + return ReminderServiceResult(success=True, messages=[f"✅ {info}"]) + return ReminderServiceResult(success=False, messages=[f"❌ {info}"]) diff --git a/function_calls/services/weather.py b/function_calls/services/weather.py new file mode 100644 index 0000000..8fc238a --- /dev/null +++ b/function_calls/services/weather.py @@ -0,0 +1,65 @@ +"""Weather related service helpers for Function Call handlers.""" +from __future__ import annotations + +import json +import logging +import os +from dataclasses import dataclass +from typing import Optional + +from function.func_weather import Weather + + +logger = logging.getLogger(__name__) + + +@dataclass +class WeatherResult: + success: bool + message: str + city: Optional[str] = None + + +def _load_city_codes() -> dict[str, str]: + """Load mapping between city names and weather codes.""" + city_code_path = os.path.join(os.path.dirname(__file__), '..', '..', 'function', 'main_city.json') + with open(city_code_path, 'r', encoding='utf-8') as f: + return json.load(f) + + +def get_weather_report(city_name: str) -> WeatherResult: + """Return a weather report for a given city. + + Args: + city_name: City provided by the user. + + Returns: + WeatherResult containing success status and message. + """ + city = city_name.strip() + if not city: + return WeatherResult(success=False, message="🤔 请告诉我你想查询哪个城市的天气") + + try: + city_codes = _load_city_codes() + except Exception as exc: # pragma: no cover - IO failure is rare + logger.error(f"加载城市代码失败: {exc}") + return WeatherResult(success=False, message="⚠️ 抱歉,天气功能暂时不可用") + + code = city_codes.get(city) + if not code: + for name, value in city_codes.items(): + if city in name: + code = value + city = name + break + + if not code: + return WeatherResult(success=False, message=f"😕 找不到城市 '{city_name}' 的天气信息") + + try: + weather_text = Weather(code).get_weather(include_forecast=True) + return WeatherResult(success=True, message=weather_text, city=city) + except Exception as exc: # pragma: no cover - upstream API failure + logger.error(f"获取天气数据失败: {exc}") + return WeatherResult(success=False, message=f"😥 获取 {city} 天气时遇到问题") diff --git a/function_calls/spec.py b/function_calls/spec.py new file mode 100644 index 0000000..470930a --- /dev/null +++ b/function_calls/spec.py @@ -0,0 +1,50 @@ +"""函数规格定义与相关数据结构""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Optional + +from commands.context import MessageContext + + +@dataclass +class FunctionResult: + """Standardized execution result returned by handlers.""" + + handled: bool + messages: list[str] = field(default_factory=list) + at: str = "" + metadata: dict[str, Any] = field(default_factory=dict) + + def dispatch(self, ctx: MessageContext) -> None: + """Send messages through the context when handled successfully.""" + if not self.handled: + return + + for message in self.messages: + ctx.send_text(message, self.at) + + def to_tool_content(self) -> str: + """Serialize result for LLM tool messages.""" + payload = { + "handled": self.handled, + "messages": self.messages, + "metadata": self.metadata or {}, + } + return json.dumps(payload, ensure_ascii=False) + + +@dataclass +class FunctionSpec: + """函数规格定义""" + + name: str + description: str + parameters_schema: Dict[str, Any] + handler: Callable[[MessageContext, Any], FunctionResult] + examples: list[str] = field(default_factory=list) + scope: str = "both" # group / private / both + require_at: bool = False + auth: Optional[str] = None # 权限标签(可选) diff --git a/main.py b/main.py index dffe2a4..b2b0c32 100644 --- a/main.py +++ b/main.py @@ -32,8 +32,6 @@ logging.getLogger("httpx").setLevel(logging.ERROR) # 提高为 ERROR logging.getLogger("Weather").setLevel(logging.WARNING) logging.getLogger("ai_providers").setLevel(logging.WARNING) logging.getLogger("commands").setLevel(logging.WARNING) -# 临时调试:为AI路由器设置更详细的日志级别 -logging.getLogger("commands.ai_router").setLevel(logging.INFO) from function.func_report_reminder import ReportReminder from configuration import Config diff --git a/robot.py b/robot.py index f452b00..20ee095 100644 --- a/robot.py +++ b/robot.py @@ -28,15 +28,13 @@ from constants import ChatType from job_mgmt import Job from function.func_xml_process import XmlProcessor -# 导入命令路由系统 +# 导入命令上下文与闲聊处理 from commands.context import MessageContext -from commands.router import CommandRouter -from commands.registry import COMMANDS, get_commands_info from commands.handlers import handle_chitchat # 导入闲聊处理函数 -# 导入AI路由系统 -from commands.ai_router import ai_router -import commands.ai_functions # 导入以注册所有AI功能 +# 导入新的Function Call系统 +from function_calls.router import FunctionCallRouter +import function_calls.init_handlers # 导入以注册所有Function Call处理器 __version__ = "39.2.4.0" @@ -165,12 +163,11 @@ class Robot(Job): # 初始化图像生成管理器 self.image_manager = ImageGenerationManager(self.config, self.wcf, self.LOG, self.sendTextMsg) - # 初始化命令路由器 - self.command_router = CommandRouter(COMMANDS, robot_instance=self) - self.LOG.info(f"命令路由系统初始化完成,共加载 {len(COMMANDS)} 条命令") - - # 初始化AI路由器 - self.LOG.info(f"AI路由系统初始化完成,共加载 {len(ai_router.functions)} 个AI功能") + # 初始化Function Call路由器 + self.function_call_router = FunctionCallRouter(robot_instance=self) + from function_calls.registry import list_functions + functions = list_functions() + self.LOG.info(f"Function Call路由系统初始化完成,共注册 {len(functions)} 个函数") # 初始化提醒管理器 try: @@ -212,23 +209,27 @@ class Robot(Job): setattr(ctx, 'chat', self.chat) setattr(ctx, 'specific_max_history', specific_limit) - # 5. 使用命令路由器分发处理消息 - handled = self.command_router.dispatch(ctx) - - # 6. 如果正则路由器没有处理,尝试AI路由器 - if not handled: - # 只在被@或私聊时才使用AI路由 - if (msg.from_group() and msg.is_at(self.wxid)) or not msg.from_group(): - print(f"[AI路由调试] 准备调用AI路由器处理消息: {msg.content}") - ai_handled = ai_router.dispatch(ctx) - print(f"[AI路由调试] AI路由器处理结果: {ai_handled}") - if ai_handled: - self.LOG.info("消息已由AI路由器处理") - print("[AI路由调试] 消息已成功由AI路由器处理") - return - else: - print("[AI路由调试] AI路由器未处理该消息") - + # 5. 根据配置选择路由系统 + handled = False + + function_call_config = getattr(self.config, 'FUNCTION_CALL_ROUTER', {}) + use_function_call = function_call_config.get('enable', True) + debug_function_call = function_call_config.get('debug', False) + + if use_function_call: + try: + if debug_function_call: + self.LOG.debug(f"[Function Call] 开始处理消息: {msg.content}") + handled = self.function_call_router.dispatch(ctx) + if debug_function_call: + self.LOG.debug(f"[Function Call] 处理结果: {handled}") + except Exception as e: + self.LOG.error(f"Function Call路由器处理异常: {e}") + + if handled: + self.LOG.info("消息已由Function Call路由器处理") + return + # 7. 如果没有命令处理器处理,则进行特殊逻辑处理 if not handled: # 7.1 好友请求自动处理 @@ -672,4 +673,3 @@ class Robot(Job): self.LOG.debug(f"预处理消息: text='{ctx.text}', is_group={ctx.is_group}, is_at_bot={ctx.is_at_bot}, sender='{ctx.sender_name}', is_quoted_image={is_quoted_image}") return ctx -