This commit is contained in:
zihanjian
2025-09-24 20:01:22 +08:00
parent a1b3799c0c
commit 33731cb83b
34 changed files with 1982 additions and 939 deletions

92
FUNCTION_CALL_USAGE.md Normal file
View File

@@ -0,0 +1,92 @@
# Function Call 系统使用指南
## 概述
已成功完成从正则/路由体系到标准 Function Call 的迁移,新系统提供了更标准化、更易维护的函数调用能力。
## 系统架构
```
用户消息 -> FunctionCallRouter -> (直接命令匹配 / LLM选择函数) -> 参数验证 -> 执行Handler -> 返回结果
```
## 已迁移的功能
| 函数名 | 描述 | 作用域 | 需要@ |
|-------|------|--------|-------|
| `weather_query` | 查询城市天气预报 | both | 是 |
| `news_query` | 获取今日新闻 | both | 是 |
| `help` | 显示帮助信息 | both | 否 |
| `summary` | 总结群聊消息 | group | 是 |
| `reminder_set` | 设置提醒 | both | 是 |
| `reminder_list` | 查看提醒列表 | both | 是 |
| `reminder_delete` | 删除提醒 | both | 是 |
| `perplexity_search` | Perplexity搜索 | both | 是 |
| `clear_messages` | 清除消息历史 | group | 是 |
| `insult` | 骂人功能 | group | 是 |
## 配置说明
`config.yaml` 中添加了以下配置:
```yaml
function_call_router:
enable: true # 是否启用Function Call路由
debug: false # 是否启用调试日志
```
## 如何添加新功能
1. **定义参数模型** (在 `function_calls/models.py`)
```python
class MyFunctionArgs(BaseModel):
param1: str
param2: int
```
2. **实现处理器** (在 `function_calls/handlers.py`)
```python
@tool_function(
name="my_function",
description="我的功能描述",
examples=["示例1", "示例2"],
scope="both",
require_at=True
)
def handle_my_function(ctx: MessageContext, args: MyFunctionArgs) -> FunctionResult:
# 实现功能逻辑
return FunctionResult(
handled=True,
messages=["处理结果"],
at=ctx.msg.sender if ctx.is_group else ""
)
```
## 工作原理
1. **直接命令匹配**:对于明确的命令(如"help"、"新闻"),仍可直接调用对应函数。
2. **多轮函数调用**:在原生 function call 模型下,助手会循环选择函数→等待工具输出→再决定是否继续调用或生成最终答复。
3. **参数提取与验证**:每次调用前都会根据 JSON Schema 校验参数,确保类型与必填字段正确。
4. **统一回复**:最终的用户回复由模型生成,工具返回的 `FunctionResult` 只作为 LLM 的工具消息输入。
5. **无回退逻辑**:系统已移除传统正则路由与 AI 路由,所有功能均通过 Function Call 管理。
## 测试验证
系统通过了完整的集成测试:
- ✅ 函数注册表正常工作
- ✅ 直接命令匹配准确
- ✅ 参数提取正确
- ✅ 类型验证有效
## 兼容性
- 保持与现有业务逻辑完全兼容
- 精简路由体系,不再依赖旧正则路由
- 不影响现有的微信客户端交互
## 性能优势
- 减少不必要的LLM调用直接命令匹配
- 标准化的参数处理
- 统一的错误处理和日志记录
- 更好的代码可维护性

111
README.MD
View File

@@ -257,11 +257,14 @@ Bubbles-WechatAI/
├── ai_providers/ # AI 模块
│ ├── ai_name.py # AI 模型接口实现
│ └── ...
├── commands/ # 命令系统
│ ├── registry.py # 正则命令注册
│ ├── handlers.py # 命令处理函数
── ai_router.py # AI智能路由器
├── ai_functions.py # AI路由功能注册
├── commands/ # 命令辅助模块(保留上下文、闲聊等遗留逻辑)
│ ├── context.py
│ ├── handlers.py
── ...
├── function_calls/ # 标准 Function Call 架构
│ ├── handlers.py # 工具注册入口
│ ├── services/ # 业务逻辑封装
│ ├── router.py # 函数路由器
│ └── ...
├── data/ # 数据文件
@@ -274,101 +277,13 @@ Bubbles-WechatAI/
### ✨ 如何添加新功能
本项目提供两种方式添加新功能
当前架构基于统一的 Function Call 体系,开发流程如下
```mermaid
graph LR
A[新功能] --> B{选择路由方式}
B --> C[命令路由]
B --> D[AI路由]
C --> E[精确匹配]
C --> F[固定格式命令]
C --> G[例如:天气北京]
D --> H[自然语言理解]
D --> I[灵活表达]
D --> J[例如:北京天气怎么样]
style A fill:#f9f,stroke:#333,stroke-width:2px
style C fill:#bbf,stroke:#333,stroke-width:2px
style D fill:#bfb,stroke:#333,stroke-width:2px
```
1. **定义参数模型**:在 `function_calls/models.py` 中添加 `BaseModel` 子类,描述工具所需字段。
2. **实现业务服务**:在 `function_calls/services/` 下编写纯函数,封装真实业务逻辑并返回文本描述。
3. **注册处理器**:在 `function_calls/handlers.py` 使用 `@tool_function` 装饰器注册工具,返回 `FunctionResult`。
4. **了解更多**:详见 `FUNCTION_CALL_USAGE.md`,其中记录了多轮函数调用流程与调试建议。
#### 方式一:使用命令路由系统(适合有明确触发词的功能)
1. **定义功能逻辑 (可选但推荐)**:
* 如果你的功能逻辑比较复杂,建议在 `function/` 目录下创建一个新的 Python 文件 (例如 `func_your_feature.py`)。
* 在这个文件中实现你的核心功能代码,例如定义类或函数。这有助于保持代码结构清晰。
2. **创建命令处理器**:
* 打开 `commands/handlers.py` 文件。
* 添加一个新的处理函数,例如 `handle_your_feature(ctx: 'MessageContext', match: Optional[Match]) -> bool:`。
* 这个函数接收 `MessageContext` (包含消息上下文信息) 和 `match` (正则表达式匹配结果) 作为参数。
* 在函数内部,你可以:
* 调用你在 `function/` 目录下创建的功能模块。
* 使用 `ctx.send_text()` 发送回复消息。
* 根据需要处理 `match` 对象提取用户输入的参数。
* 函数应返回 `True` 表示命令已被处理,`False` 则表示未处理 (会继续尝试匹配后续命令或进行闲聊)。
* 确保从 `function` 目录导入必要的模块。
3. **注册命令**:
* 打开 `commands/registry.py` 文件。
* 在 `COMMANDS` 列表中,按照优先级顺序添加一个新的 `Command` 对象。
* 配置 `Command` 参数:
* `name`: 命令的唯一标识名 (小写下划线)。
* `pattern`: 用于匹配用户输入的正则表达式 (`re.compile`)。注意捕获用户参数。
* `scope`: 命令适用范围 (`"group"`, `"private"`, `"both"`)。
* `need_at`: 在群聊中是否需要 `@` 机器人才能触发 (`True`/`False`)。
* `priority`: 命令的优先级 (数字越小越优先匹配)。
* `handler`: 指向你在 `handlers.py` 中创建的处理函数 (例如 `handle_your_feature`)。
* `description`: 命令的简短描述,用于帮助信息。
* 确保从 `handlers.py` 导入你的新处理函数。
4. **更新帮助信息 (可选)**:
* 如果希望用户能在 `帮助` 命令中看到你的新功能,可以更新 `commands/handlers.py` 中的 `handle_help` 函数,将新命令的用法添加到帮助文本中。
#### 方式二使用AI智能路由适合自然语言交互的功能
AI路由系统让用户可以用自然语言触发功能无需记住特定命令格式
1. **实现功能逻辑**:
* 在 `function/` 目录下创建功能模块(如已有则跳过)
2. **注册AI路由功能**:
* 打开 `commands/ai_functions.py`
* 使用装饰器注册你的功能:
```python
@ai_router.register(
name="your_function_name",
description="功能描述AI会根据这个判断用户意图",
examples=[
"示例用法1",
"示例用法2",
"示例用法3"
],
params_description="参数说明"
)
def ai_handle_your_function(ctx: MessageContext, params: str) -> bool:
# params 是AI从用户输入中提取的参数
# 调用你的功能逻辑
# 使用 ctx.send_text() 发送回复
return True
```
3. **工作原理**:
* 用户发送消息时如果正则路由未匹配AI会分析用户意图
* AI根据功能描述和示例判断应该调用哪个功能
* AI会自动提取参数并传递给功能处理函数
例如,注册了天气查询功能后,用户可以说:
- "北京天气怎么样"
- "查一下上海的天气"
- "明天深圳会下雨吗"
AI都能理解并调用天气查询功能。
完成以上步骤后,重启机器人即可测试你的新功能!
## 📄 许可证

View File

@@ -7,6 +7,7 @@ import base64
import os
from datetime import datetime
import time # 引入 time 模块
from typing import Any, Dict, List
import httpx
from openai import APIConnectionError, APIError, AuthenticationError, OpenAI
@@ -136,6 +137,38 @@ class ChatGPT():
return rsp
def call_with_functions(self, messages: List[Dict[str, Any]], functions: list, wxid: str):
"""
使用函数调用功能的ChatGPT接口
Args:
message: 用户消息
functions: 函数定义列表
wxid: 用户ID
Returns:
OpenAI响应对象
"""
try:
# 调用函数调用接口
params = {
"model": self.model,
"messages": messages,
"functions": functions,
"function_call": "auto"
}
# 只有非o系列模型才设置temperature
if not self.model.startswith("o"):
params["temperature"] = 0.2
response = self.client.chat.completions.create(**params)
return response
except Exception as e:
self.LOG.error(f"函数调用失败: {e}")
raise e
def encode_image_to_base64(self, image_path: str) -> str:
"""将图片文件转换为Base64编码
@@ -226,4 +259,4 @@ if __name__ == "__main__":
# --- 测试代码需要调整 ---
# 需要模拟 MessageSummary 和提供 bot_wxid 才能测试
print("请注意:直接运行此文件进行测试需要模拟 MessageSummary 并提供 bot_wxid。")
pass # 避免直接运行时出错
pass # 避免直接运行时出错

160
check_system.py Normal file
View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""
系统完整性检查
"""
import sys
import os
sys.path.append(os.path.dirname(__file__))
def check_imports():
"""检查所有关键模块是否能正常导入"""
print("🔍 检查模块导入...")
try:
# 检查Function Call系统
from function_calls.spec import FunctionSpec, FunctionResult
from function_calls.registry import register_function, list_functions
from function_calls.router import FunctionCallRouter
from function_calls.llm import FunctionCallLLM
import function_calls.init_handlers
print("✅ Function Call核心模块导入成功")
# 检查处理器
from function_calls.handlers import (
handle_weather, handle_news, handle_help,
handle_reminder_set, handle_reminder_list, handle_reminder_delete,
handle_perplexity_search, handle_summary, handle_clear_messages
)
print("✅ 所有处理器导入成功")
# 检查参数模型
from function_calls.models import (
WeatherArgs, NewsArgs, HelpArgs, ReminderArgs,
PerplexityArgs, SummaryArgs, ClearMessagesArgs
)
print("✅ 参数模型导入成功")
return True
except Exception as e:
print(f"❌ 导入失败: {e}")
return False
def check_function_registration():
"""检查函数注册是否正常"""
print("\n🔍 检查函数注册...")
try:
from function_calls.registry import list_functions
functions = list_functions()
expected_count = 10
if len(functions) != expected_count:
print(f"⚠️ 函数数量异常: 期望{expected_count}个,实际{len(functions)}")
return False
required_functions = [
'weather_query', 'news_query', 'help', 'summary',
'reminder_set', 'reminder_list', 'reminder_delete',
'perplexity_search', 'clear_messages', 'insult'
]
missing_functions = []
for func_name in required_functions:
if func_name not in functions:
missing_functions.append(func_name)
if missing_functions:
print(f"❌ 缺少函数: {missing_functions}")
return False
print("✅ 所有必需函数都已正确注册")
return True
except Exception as e:
print(f"❌ 函数注册检查失败: {e}")
return False
def check_router_initialization():
"""检查路由器初始化"""
print("\n🔍 检查路由器初始化...")
try:
from function_calls.router import FunctionCallRouter
router = FunctionCallRouter()
print("✅ FunctionCallRouter初始化成功")
# 测试直接命令匹配
class MockCtx:
def __init__(self, text):
self.text = text
test_cases = [
("help", "help"),
("新闻", "news_query"),
("天气 北京", "weather_query")
]
for input_text, expected in test_cases:
ctx = MockCtx(input_text)
result = router._try_direct_command_match(ctx)
if result != expected:
print(f"❌ 直接匹配失败: '{input_text}' -> {result} (期望: {expected})")
return False
print("✅ 直接命令匹配正常")
return True
except Exception as e:
print(f"❌ 路由器初始化失败: {e}")
return False
def check_config_compatibility():
"""检查配置兼容性"""
print("\n🔍 检查配置文件...")
try:
# 检查模板文件是否包含新配置
with open('config.yaml.template', 'r', encoding='utf-8') as f:
content = f.read()
if 'function_call_router:' not in content:
print("❌ config.yaml.template缺少function_call_router配置")
return False
print("✅ 配置文件包含Function Call配置")
return True
except Exception as e:
print(f"❌ 配置检查失败: {e}")
return False
def main():
print("🚀 Function Call系统完整性检查\n")
checks = [
("模块导入", check_imports),
("函数注册", check_function_registration),
("路由器初始化", check_router_initialization),
("配置兼容性", check_config_compatibility)
]
passed = 0
total = len(checks)
for name, check_func in checks:
if check_func():
passed += 1
else:
print(f"{name}检查失败")
print(f"\n📊 检查结果: {passed}/{total} 通过")
if passed == total:
print("🎉 Function Call系统完整性检查全部通过系统已准备就绪。")
return 0
else:
print("⚠️ 部分检查失败,请检查上述错误信息。")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,11 +1,8 @@
# commands package
"""
命令路由系统包
命令辅助模块
此包包含了命令路由系统的所有组件:
该包保留了消息上下文与部分遗留处理器,供 Function Call 架构复用:
- context: 消息上下文类
- models: 命令数据模型
- router: 命令路由器
- registry: 命令注册表
- handlers: 命令处理函数
"""
- handlers: 基础命令/闲聊逻辑
"""

View File

@@ -1,217 +0,0 @@
"""
AI路由功能注册
将需要通过AI路由的功能在这里注册
"""
import re
import json
import os
from typing import Optional, Match
from datetime import datetime
from .ai_router import ai_router
from .context import MessageContext
# ======== 天气功能 ========
@ai_router.register(
name="weather_query",
description="查询城市未来五天的简要天气预报",
examples=["北京天气怎么样", "上海天气"],
params_description="城市名称"
)
def ai_handle_weather(ctx: MessageContext, params: str) -> bool:
"""AI路由的天气查询处理"""
city_name = params.strip()
if not city_name:
ctx.send_text("🤔 请告诉我你想查询哪个城市的天气")
return True
# 加载城市代码
city_codes = {}
city_code_path = os.path.join(os.path.dirname(__file__), '..', 'function', 'main_city.json')
try:
with open(city_code_path, 'r', encoding='utf-8') as f:
city_codes = json.load(f)
except Exception as e:
if ctx.logger:
ctx.logger.error(f"加载城市代码文件失败: {e}")
ctx.send_text("⚠️ 抱歉,天气功能暂时不可用")
return True
# 查找城市代码
city_code = city_codes.get(city_name)
if not city_code:
# 尝试模糊匹配
for name, code in city_codes.items():
if city_name in name:
city_code = code
city_name = name
break
if not city_code:
ctx.send_text(f"😕 找不到城市 '{city_name}' 的天气信息")
return True
# 获取天气信息
try:
from function.func_weather import Weather
weather_info = Weather(city_code).get_weather(include_forecast=True)
ctx.send_text(weather_info)
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"获取天气信息失败: {e}")
ctx.send_text(f"😥 获取 {city_name} 天气时遇到问题")
return True
# ======== 新闻功能 ========
@ai_router.register(
name="news_query",
description="获取今日新闻",
examples=["看看今天的新闻", "今日要闻"],
params_description="无需参数"
)
def ai_handle_news(ctx: MessageContext, params: str) -> bool:
"""AI路由的新闻查询处理"""
try:
from function.func_news import News
news_instance = News()
is_today, news_content = news_instance.get_important_news()
if is_today:
ctx.send_text(f"📰 今日要闻来啦:\n{news_content}")
else:
if news_content:
ctx.send_text(f" 今日新闻暂未发布,为您找到最近的一条新闻:\n{news_content}")
else:
ctx.send_text("❌ 获取新闻失败,请稍后重试")
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"获取新闻失败: {e}")
ctx.send_text("❌ 获取新闻时发生错误")
return True
# ======== 提醒功能 ========
@ai_router.register(
name="reminder_set",
description="设置提醒",
examples=["提醒我明天下午3点开会", "每天早上8点提醒我吃早餐"],
params_description="时间和内容"
)
def ai_handle_reminder_set(ctx: MessageContext, params: str) -> bool:
"""AI路由的提醒设置处理"""
if not params.strip():
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("请告诉我需要提醒什么内容和时间呀~", at_list)
return True
# 调用原有的提醒处理逻辑
from .handlers import handle_reminder
# 临时修改消息内容以适配原有处理器
original_content = ctx.msg.content
ctx.msg.content = f"提醒我{params}"
# handle_reminder不使用match参数直接传None
result = handle_reminder(ctx, None)
# 恢复原始内容
ctx.msg.content = original_content
return result
@ai_router.register(
name="reminder_list",
description="查看所有提醒",
examples=["查看我的提醒", "我有哪些提醒"],
params_description="无需参数"
)
def ai_handle_reminder_list(ctx: MessageContext, params: str) -> bool:
"""AI路由的提醒列表查看处理"""
from .handlers import handle_list_reminders
return handle_list_reminders(ctx, None)
@ai_router.register(
name="reminder_delete",
description="删除提醒",
examples=["删除开会的提醒", "取消明天的提醒"],
params_description="提醒描述"
)
def ai_handle_reminder_delete(ctx: MessageContext, params: str) -> bool:
"""AI路由的提醒删除处理"""
# 调用原有的删除提醒逻辑
from .handlers import handle_delete_reminder
# 临时修改消息内容
original_content = ctx.msg.content
ctx.msg.content = f"删除提醒 {params}"
# handle_delete_reminder不使用match参数直接传None
result = handle_delete_reminder(ctx, None)
# 恢复原始内容
ctx.msg.content = original_content
return result
# ======== Perplexity搜索功能 ========
@ai_router.register(
name="perplexity_search",
description="搜索查询资料并深度研究某个专业问题",
examples=["搜索Python最新特性", "查查机器学习教程"],
params_description="搜索内容"
)
def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool:
"""AI路由的Perplexity搜索处理"""
if not params.strip():
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("请告诉我你想搜索什么内容", at_list)
return True
# 获取Perplexity实例
perplexity_instance = getattr(ctx.robot, 'perplexity', None)
if not perplexity_instance:
ctx.send_text("❌ Perplexity搜索功能当前不可用")
return True
# 调用Perplexity处理
content_for_perplexity = f"ask {params}"
chat_id = ctx.get_receiver()
sender_wxid = ctx.msg.sender
room_id = ctx.msg.roomid if ctx.is_group else None
is_group = ctx.is_group
was_handled, fallback_prompt = perplexity_instance.process_message(
content=content_for_perplexity,
chat_id=chat_id,
sender=sender_wxid,
roomid=room_id,
from_group=is_group,
send_text_func=ctx.send_text
)
# 如果Perplexity无法处理使用默认AI
if not was_handled and fallback_prompt:
chat_model = getattr(ctx, 'chat', None) or (getattr(ctx.robot, 'chat', None) if ctx.robot else None)
if chat_model:
try:
import time
current_time = time.strftime("%H:%M", time.localtime())
q_with_info = f"[{current_time}] {ctx.sender_name}: {params}"
rsp = chat_model.get_answer(
question=q_with_info,
wxid=ctx.get_receiver(),
system_prompt_override=fallback_prompt
)
if rsp:
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text(rsp, at_list)
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"默认AI处理失败: {e}")
return was_handled

View File

@@ -1,252 +0,0 @@
import re
import json
import logging
from typing import Dict, Callable, Optional, Any, Tuple
from dataclasses import dataclass, field
from .context import MessageContext
logger = logging.getLogger(__name__)
@dataclass
class AIFunction:
"""AI可调用的功能定义"""
name: str # 功能唯一标识名
handler: Callable # 处理函数
description: str # 功能描述给AI看的
examples: list[str] = field(default_factory=list) # 示例用法
params_description: str = "" # 参数说明
class AIRouter:
"""AI智能路由器"""
def __init__(self):
self.functions: Dict[str, AIFunction] = {}
self.logger = logger
def register(self, name: str, description: str, examples: list[str] = None, params_description: str = ""):
"""
装饰器注册一个功能到AI路由器
@ai_router.register(
name="weather_query",
description="查询指定城市的天气预报",
examples=["北京天气怎么样", "查一下上海的天气", "明天深圳会下雨吗"],
params_description="城市名称"
)
def handle_weather(ctx: MessageContext, params: str) -> bool:
# 实现天气查询逻辑
pass
"""
def decorator(func: Callable) -> Callable:
ai_func = AIFunction(
name=name,
handler=func,
description=description,
examples=examples or [],
params_description=params_description
)
self.functions[name] = ai_func
self.logger.info(f"AI路由器注册功能: {name} - {description}")
return func
return decorator
def _build_ai_prompt(self) -> str:
"""构建给AI的系统提示词包含所有可用功能的信息"""
prompt = """你是一个智能路由助手。根据用户的输入判断用户的意图并返回JSON格式的响应。
### 注意:
1. 你需要优先判断自己是否可以直接回答用户的问题,如果你可以直接回答,则返回 "chat",无需返回 "function"
2. 如果用户输入中包含多个功能,请优先匹配最符合用户意图的功能。如果无法判断,则返回 "chat"
3. 优先考虑使用 chat 处理,需要外部资料或其他功能逻辑时,再返回 "function"
### 可用的功能列表:
"""
for name, func in self.functions.items():
prompt += f"\n- {name}: {func.description}"
if func.params_description:
prompt += f"\n 参数: {func.params_description}"
if func.examples:
prompt += f"\n 示例: {', '.join(func.examples[:3])}"
prompt += "\n"
prompt += """
请你分析用户输入严格按照以下格式返回JSON
### 返回格式:
1. 如果用户只是聊天或者不匹配任何功能,返回:
{
"action_type": "chat"
}
2.如果用户需要使用上述功能之一,返回:
{
"action_type": "function",
"function_name": "上述功能列表中的功能名",
"params": "从用户输入中提取的参数"
}
#### 示例:
- 用户输入"北京天气怎么样" -> {"action_type": "function", "function_name": "weather_query", "params": "北京"}
- 用户输入"看看新闻" -> {"action_type": "function", "function_name": "news_query", "params": ""}
- 用户输入"你好" -> {"action_type": "chat"}
- 用户输入"查一下Python教程" -> {"action_type": "function", "function_name": "perplexity_search", "params": "Python教程"}
#### 格式注意事项:
1. action_type 只能是 "function""chat"
2. 只返回JSON无需其他解释
3. function_name 必须完全匹配上述功能列表中的名称
"""
return prompt
def route(self, ctx: MessageContext) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
AI路由决策
返回: (是否处理成功, AI决策结果)
"""
print(f"[AI路由器] route方法被调用")
if not ctx.text:
print("[AI路由器] ctx.text为空返回False")
return False, None
# 获取AI模型
chat_model = getattr(ctx, 'chat', None)
if not chat_model:
chat_model = getattr(ctx.robot, 'chat', None) if ctx.robot else None
if not chat_model:
print("[AI路由器] 无可用的AI模型")
self.logger.error("AI路由器无可用的AI模型")
return False, None
print(f"[AI路由器] 找到AI模型: {type(chat_model)}")
try:
# 构建系统提示词
system_prompt = self._build_ai_prompt()
print(f"[AI路由器] 已构建系统提示词,长度: {len(system_prompt)}")
# 让AI分析用户意图
user_input = f"用户输入:{ctx.text}"
print(f"[AI路由器] 准备调用AI分析意图: {user_input}")
ai_response = chat_model.get_answer(
user_input,
wxid=ctx.get_receiver(),
system_prompt_override=system_prompt
)
print(f"[AI路由器] AI响应: {ai_response}")
# 解析AI返回的JSON
json_match = re.search(r'\{.*\}', ai_response, re.DOTALL)
if not json_match:
self.logger.warning(f"AI路由器无法从AI响应中提取JSON - {ai_response}")
return False, None
decision = json.loads(json_match.group(0))
# 验证决策格式
action_type = decision.get("action_type")
if action_type not in ["chat", "function"]:
self.logger.warning(f"AI路由器未知的action_type - {action_type}")
return False, None
# 如果是功能调用,验证功能名
if action_type == "function":
function_name = decision.get("function_name")
if function_name not in self.functions:
self.logger.warning(f"AI路由器未知的功能名 - {function_name}")
return False, None
self.logger.info(f"AI路由决策: {decision}")
return True, decision
except json.JSONDecodeError as e:
self.logger.error(f"AI路由器解析JSON失败 - {e}")
return False, None
except Exception as e:
self.logger.error(f"AI路由器处理异常 - {e}")
return False, None
def _check_permission(self, ctx: MessageContext) -> bool:
"""
检查是否有权限使用AI路由功能
:param ctx: 消息上下文
:return: 是否有权限
"""
# 检查是否启用AI路由
ai_router_config = getattr(ctx.config, 'AI_ROUTER', {})
if not ai_router_config.get('enable', True):
self.logger.info("AI路由功能已禁用")
return False
# 私聊始终允许
if not ctx.is_group:
return True
# 群聊需要检查白名单
allowed_groups = ai_router_config.get('allowed_groups', [])
current_group = ctx.get_receiver()
if current_group in allowed_groups:
self.logger.info(f"群聊 {current_group} 在AI路由白名单中允许使用")
return True
else:
self.logger.info(f"群聊 {current_group} 不在AI路由白名单中禁止使用")
return False
def dispatch(self, ctx: MessageContext) -> bool:
"""
执行AI路由分发
返回: 是否成功处理
"""
print(f"[AI路由器] dispatch被调用消息内容: {ctx.text}")
# 检查权限
if not self._check_permission(ctx):
print("[AI路由器] 权限检查失败返回False")
return False
# 获取AI路由决策
success, decision = self.route(ctx)
print(f"[AI路由器] route返回 - success: {success}, decision: {decision}")
if not success or not decision:
print("[AI路由器] route失败或无决策返回False")
return False
action_type = decision.get("action_type")
# 如果是聊天返回False让后续处理器处理
if action_type == "chat":
self.logger.info("AI路由器识别为聊天意图交给聊天处理器")
return False
# 如果是功能调用
if action_type == "function":
function_name = decision.get("function_name")
params = decision.get("params", "")
func = self.functions.get(function_name)
if not func:
self.logger.error(f"AI路由器功能 {function_name} 未找到")
return False
try:
self.logger.info(f"AI路由器调用功能 {function_name},参数: {params}")
result = func.handler(ctx, params)
return result
except Exception as e:
self.logger.error(f"AI路由器执行功能 {function_name} 出错 - {e}")
return False
return False
# 创建全局AI路由器实例
ai_router = AIRouter()

View File

@@ -49,43 +49,6 @@ def handle_help(ctx: 'MessageContext', match: Optional[Match]) -> bool:
# 发送消息
return ctx.send_text(help_text)
def handle_check_equipment(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
处理 "查看装备" 命令
匹配: 我的装备/查看装备
"""
if not ctx.is_group:
ctx.send_text("❌ 装备查看功能只支持群聊")
return True
try:
from function.func_duel import DuelRankSystem
player_name = ctx.sender_name
rank_system = DuelRankSystem(ctx.msg.roomid)
player_data = rank_system.get_player_data(player_name)
if not player_data:
ctx.send_text(f"⚠️ 没有找到 {player_name} 的数据")
return True
items = player_data.get("items", {"elder_wand": 0, "magic_stone": 0, "invisibility_cloak": 0})
result = [
f"🧙‍♂️ {player_name} 的魔法装备:",
f"🪄 老魔杖: {items.get('elder_wand', 0)}",
f"💎 魔法石: {items.get('magic_stone', 0)}",
f"🧥 隐身衣: {items.get('invisibility_cloak', 0)}"
]
ctx.send_text("\n".join(result))
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"查看装备出错: {e}")
ctx.send_text("⚠️ 查看装备失败")
return False
def handle_summary(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""

View File

@@ -1,38 +0,0 @@
import re
from dataclasses import dataclass
from typing import Pattern, Callable, Literal, Optional, Any, Union, Match
# 导入 MessageContext使用前向引用避免循环导入
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .context import MessageContext
@dataclass
class Command:
"""
命令定义类,封装命令的匹配条件和处理函数
"""
name: str # 命令名称,用于日志和调试
pattern: Union[Pattern, Callable[['MessageContext'], Optional[Match]]] # 匹配规则:正则表达式或自定义匹配函数
scope: Literal["group", "private", "both"] # 生效范围: "group"-仅群聊, "private"-仅私聊, "both"-两者都可
handler: Callable[['MessageContext', Optional[Match]], bool] # 处理函数
need_at: bool = False # 在群聊中是否必须@机器人才能触发
priority: int = 100 # 优先级,数字越小越先匹配
description: str = "" # 命令的描述,用于生成帮助信息
def __post_init__(self):
"""验证命令配置的有效性"""
if self.scope not in ["group", "private", "both"]:
raise ValueError(f"无效的作用域: {self.scope},必须是 'group', 'private''both'")
# 检查pattern是否为正则表达式或可调用对象
if not isinstance(self.pattern, (Pattern, Callable)):
# 如果是字符串,尝试转换为正则表达式
if isinstance(self.pattern, str):
try:
self.pattern = re.compile(self.pattern)
except re.error:
raise ValueError(f"无效的正则表达式: {self.pattern}")
else:
raise TypeError(f"pattern 必须是正则表达式或可调用对象,而不是 {type(self.pattern)}")

View File

@@ -1,136 +0,0 @@
import re
from .models import Command
from .handlers import (
handle_help,
# handle_duel, handle_sneak_attack, handle_duel_rank,
# handle_duel_stats, handle_check_equipment, handle_rename,
handle_summary, handle_clear_messages, handle_news_request,
handle_chitchat, handle_insult,
handle_perplexity_ask, handle_reminder, handle_list_reminders, handle_delete_reminder,
handle_weather_forecast
)
# 命令列表,按优先级排序
# 优先级越小越先匹配
COMMANDS = [
# ======== 基础系统命令 ========
Command(
name="help",
pattern=re.compile(r"^(info|帮助|指令)$", re.IGNORECASE),
scope="both", # 群聊和私聊都支持
need_at=False, # 不需要@机器人
priority=10, # 优先级较高
handler=handle_help,
description="显示机器人的帮助信息"
),
# ======== Perplexity AI 命令 ========
Command(
name="perplexity_ask",
pattern=re.compile(r"^ask\s*(.+)", re.IGNORECASE | re.DOTALL),
scope="both", # 群聊和私聊都支持
need_at=True, # 需要@机器人
priority=25, # 较高优先级,确保在闲聊之前处理
handler=handle_perplexity_ask,
description="使用 Perplexity AI 进行深度查询"
),
# ======== 消息管理命令 ========
Command(
name="summary",
pattern=re.compile(r"^(summary|总结)$", re.IGNORECASE),
scope="group", # 仅群聊支持
need_at=True, # 需要@机器人
priority=30, # 优先级一般
handler=handle_summary,
description="总结群聊最近的消息"
),
Command(
name="clear_messages",
pattern=re.compile(r"^(clearmessages|清除历史)$", re.IGNORECASE),
scope="group", # 仅群聊支持
need_at=True, # 需要@机器人
priority=31, # 优先级一般
handler=handle_clear_messages,
description="从数据库中清除群聊的历史消息记录"
),
# ======== 提醒功能 ========
Command(
name="reminder",
pattern=re.compile(r"提醒我", re.IGNORECASE),
scope="both", # 支持群聊和私聊
need_at=True, # 在群聊中需要@机器人
priority=35, # 优先级适中,在基础命令后,复杂功能或闲聊前
handler=handle_reminder,
description="设置一个提醒 (包含 '提醒我' 关键字即可, 例如提醒我明天下午3点开会)"
),
Command(
name="list_reminders",
pattern=re.compile(r"^(查看提醒|我的提醒|提醒列表)$", re.IGNORECASE),
scope="both", # 支持群聊和私聊
need_at=True, # 在群聊中需要@机器人
priority=36, # 优先级略低于设置提醒
handler=handle_list_reminders,
description="查看您设置的所有提醒"
),
Command(
name="delete_reminder",
# 修改为只匹配包含"删"、"删除"或"取消"的消息,不再要求特定格式
pattern=re.compile(r"(?:删|删除|取消)", re.IGNORECASE),
scope="both", # 支持群聊和私聊
need_at=True, # 在群聊中需要@机器人
priority=37,
handler=handle_delete_reminder,
description="删除提醒 (包含'''提醒'关键字即可,如: 把开会的提醒删了)"
),
# ======== 新闻和实用工具 ========
Command(
name="weather_forecast",
pattern=re.compile(r"^(?:天气预报|天气)\s+(.+)$"), # 匹配 天气预报/预报 城市名
scope="both", # 群聊和私聊都支持
need_at=True, # 需要@机器人
priority=38, # 优先级比天气高一点
handler=handle_weather_forecast,
description="查询指定城市未来几天的天气预报 (例如:天气预报 北京)"
),
Command(
name="news",
pattern=re.compile(r"^新闻$"),
scope="both", # 群聊和私聊都支持
need_at=True, # 需要@机器人
priority=40, # 优先级一般
handler=handle_news_request,
description="获取最新新闻"
),
# ======== 骂人命令 ========
Command(
name="insult",
pattern=re.compile(r"骂一下\s*@([^\s@]+)"),
scope="group", # 仅群聊支持
need_at=True, # 需要@机器人
priority=100, # 优先级较高
handler=handle_insult,
description="骂指定用户"
),
]
# 可以添加一个函数,获取命令列表的简单描述
def get_commands_info():
"""获取所有命令的简要信息,用于调试"""
info = []
for i, cmd in enumerate(COMMANDS):
scope_str = {"group": "仅群聊", "private": "仅私聊", "both": "群聊私聊"}[cmd.scope]
at_str = "需要@" if cmd.need_at else "不需@"
info.append(f"{i+1}. [{cmd.priority}] {cmd.name} ({scope_str},{at_str}) - {cmd.description or '无描述'}")
return "\n".join(info)
# 导出所有命令
__all__ = ["COMMANDS", "get_commands_info"]

View File

@@ -1,117 +0,0 @@
import re
import logging
from typing import List, Optional, Any, Dict, Match
import traceback
from .models import Command
from .context import MessageContext
# 获取模块级 logger
logger = logging.getLogger(__name__)
class CommandRouter:
"""
命令路由器,负责将消息路由到对应的命令处理函数
"""
def __init__(self, commands: List[Command], robot_instance: Optional[Any] = None):
# 按优先级排序命令列表,数字越小优先级越高
self.commands = sorted(commands, key=lambda cmd: cmd.priority)
self.robot_instance = robot_instance
# 分析并输出命令注册信息,便于调试
scope_count = {"group": 0, "private": 0, "both": 0}
for cmd in commands:
scope_count[cmd.scope] += 1
logger.info(f"命令路由器初始化成功,共加载 {len(commands)} 个命令")
logger.info(f"命令作用域分布: 仅群聊 {scope_count['group']},仅私聊 {scope_count['private']},两者均可 {scope_count['both']}")
# 按优先级输出命令信息
for i, cmd in enumerate(self.commands[:10]): # 只输出前10个
logger.info(f"{i+1}. [{cmd.priority}] {cmd.name} - {cmd.description or '无描述'}")
if len(self.commands) > 10:
logger.info(f"... 共 {len(self.commands)} 个命令")
def dispatch(self, ctx: MessageContext) -> bool:
"""
根据消息上下文分发命令
:param ctx: 消息上下文对象
:return: 是否有命令成功处理
"""
# 确保context可以访问到robot实例
if self.robot_instance and not ctx.robot:
ctx.robot = self.robot_instance
# 如果robot有logger属性且ctx没有logger则使用robot的logger
if hasattr(self.robot_instance, 'LOG') and not ctx.logger:
ctx.logger = self.robot_instance.LOG
# 记录日志,便于调试
if ctx.logger:
ctx.logger.debug(f"开始路由消息: '{ctx.text}', 来自: {ctx.sender_name}, 群聊: {ctx.is_group}, @机器人: {ctx.is_at_bot}")
# 遍历命令列表,按优先级顺序匹配
for cmd in self.commands:
# 1. 检查作用域 (scope)
if cmd.scope != "both":
if (cmd.scope == "group" and not ctx.is_group) or \
(cmd.scope == "private" and ctx.is_group):
continue # 作用域不匹配,跳过
# 2. 检查是否需要 @ (need_at) - 仅在群聊中有效
if ctx.is_group and cmd.need_at and not ctx.is_at_bot:
continue # 需要@机器人但未被@,跳过
# 3. 执行匹配逻辑
match_result = None
try:
# 根据pattern类型执行匹配
if callable(cmd.pattern):
# 自定义匹配函数
match_result = cmd.pattern(ctx)
else:
# 正则表达式匹配
match_obj = cmd.pattern.search(ctx.text)
match_result = match_obj
# 匹配失败,尝试下一个命令
if match_result is None:
continue
# 匹配成功,记录日志
if ctx.logger:
ctx.logger.info(f"命令 '{cmd.name}' 匹配成功,准备处理")
# 4. 执行命令处理函数
try:
result = cmd.handler(ctx, match_result)
if result:
if ctx.logger:
ctx.logger.info(f"命令 '{cmd.name}' 处理成功")
return True
else:
if ctx.logger:
ctx.logger.warning(f"命令 '{cmd.name}' 处理返回False尝试下一个命令")
except Exception as e:
if ctx.logger:
ctx.logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}")
ctx.logger.error(traceback.format_exc())
else:
logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}", exc_info=True)
# 出错后继续尝试下一个命令
except Exception as e:
# 匹配过程出错,记录并继续
if ctx.logger:
ctx.logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}")
else:
logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}", exc_info=True)
continue
# 所有命令都未匹配或处理失败
if ctx.logger:
ctx.logger.debug("所有命令匹配失败或处理失败")
return False
def get_command_descriptions(self) -> Dict[str, str]:
"""获取所有命令的描述,用于生成帮助信息"""
return {cmd.name: cmd.description for cmd in self.commands if cmd.description}

View File

@@ -144,6 +144,6 @@ perplexity: # -----perplexity配置这行不填-----
allowed_groups: [] # 允许使用Perplexity的群聊ID列表例如["123456789@chatroom", "123456789@chatroom"]
allowed_users: [] # 允许使用Perplexity的用户ID列表例如["wxid_123456789", "filehelper"]
ai_router: # -----AI路由器配置-----
enable: true # 是否启用AI路由功能
allowed_groups: [] # 允许使用AI路由的群聊ID列表例如["123456789@chatroom", "123456789@chatroom"]
function_call_router: # -----Function Call路由器配置-----
enable: true # 是否启用新的Function Call路由功能
debug: false # 是否启用调试日志

View File

@@ -42,6 +42,5 @@ class Config(object):
self.ALIYUN_IMAGE = yconfig.get("aliyun_image", {})
self.GEMINI_IMAGE = yconfig.get("gemini_image", {})
self.GEMINI = yconfig.get("gemini", {})
self.AI_ROUTER = yconfig.get("ai_router", {"enable": True, "allowed_groups": []})
self.MAX_HISTORY = yconfig.get("MAX_HISTORY", 300)
self.SEND_RATE_LIMIT = yconfig.get("send_rate_limit", 0)

234
function_call_migration.md Normal file
View File

@@ -0,0 +1,234 @@
# 从正则/路由体系迁移到标准 Function Call 的实施手册
## 1. 改造目标与约束
- 完全淘汰 `commands` 目录下的正则匹配路由 (`CommandRouter`) 与 `commands/ai_router.py` 中的自定义决策逻辑,统一改为标准化的 Function Call 协议。
- 让所有机器人工具能力都以“结构化函数定义 + 参数 JSON schema”的形式暴露既能让 LLM 函数调用,也能被程序直接调用。
- 保持现有业务能力完整可用天气、新闻、提醒、Perplexity 搜索、群管理等),迁移过程中不影响线上稳定性。
- 兼容现有上下文对象 `MessageContext`,并保留与微信客户端交互所需的最小耦合。
## 2. 现有架构梳理
### 2.1 指令流
1. `robot.py``Robot.processMsg` 获取消息后构造 `MessageContext`,先交给 `CommandRouter.dispatch`(见 `commands/router.py:13`)。
2. `CommandRouter` 遍历 `COMMANDS` 列表(`commands/registry.py:15` 起),用正则匹配执行对应 handler例如 `handle_reminder``commands/handlers.py`)。
3. Handler 内部通常继续调用 `function/` 下的模块完成业务。
### 2.2 AI 路由流
1. `commands/ai_router.py` 提供 `AIRouter`,通过 `_build_ai_prompt` 把功能描述写进提示词。
2. `route` 调用聊天模型的 `get_answer`,要求模型返回 `{"action_type": "function", ...}` 格式的 JSON再根据返回字符串里的 params 调 handler。
3. 该流程依旧依赖字符串解析和弱结构的参数传递(例如 `params` 直接拼在 handler 里处理)。
### 2.3 问题痛点
- 功能元数据散落:正则、示例、参数说明分布在多个文件,新增能力需要多处编辑。
- 参数结构模糊:当前 `params` 仅是字符串handler 内自行拆分,容易出错。
- 与 LLM 的交互不标准:靠提示词提醒模型返回 JSON缺乏 schema 约束,易产生格式错误。
- 双路由并存:命令路由与 AI 路由行为不一致,重复注册、维护成本高。
## 3. 目标架构设想
```
WxMsg -> MessageContext -> FunctionCallRouter
|-- Registry (FunctionSpec, schema, handler)
|-- FunctionCallLLM (统一 function call API)
'-- Local invoker / fallback (无 LLM)
```
- **FunctionSpec**:定义函数名、描述、参数 JSON schema、返回结构、权限等元数据。
- **FunctionCallRouter**:单一入口,负责:
1. 根据上下文(是否命令关键字、是否@)决定是否直接调用或交给 LLM 选函数。
2. 如果由 LLM 决定,则调用支持 function call 的接口OpenAI / DeepSeek / 自建),拿到带函数名与参数 JSON 的结构化响应。
3. 校验参数,调用真实 handler统一处理返回。
- **Handlers**:全部改造成签名规范的函数(例如接收 `ctx: MessageContext, args: TypedModel`),禁止在 handler 内再解析自然语言。
## 4. 迁移阶段概览
| 阶段 | 目标 | 关键输出 | 风险控制 |
| ---- | ---- | -------- | -------- |
| P0 现状盘点 | 梳理所有功能与依赖 | 功能清单、调用图、可迁移性评估 | 标注遗留 / 暂缓功能 |
| P1 构建 Function Spec | 落地函数描述模型与注册中心 | `function_calls/spec.py``registry.py` | 先只收录已实现能力 |
| P2 新路由内核 | 新的 `FunctionCallRouter` 与 LLM 适配层 | `function_calls/router.py``llm.py` | 与老路由并行跑灰度 |
| P3 Handler 适配 | 将现有 handler 改为结构化参数 | 类型化参数模型、转换器 | 保留回退入口、渐进式替换 |
| P4 切换与清理 | 替换 `Robot.processMsg` 流程,删除旧代码 | 配置开关、文档 | 全量回归测试 |
## 5. 各阶段详细操作
### 阶段 P0能力盘点 & 前置准备
1. **提取功能列表**
-`commands/registry.py` 抽出每个 `Command``name/description/pattern`
-`commands/ai_functions.py` 抽出 `@ai_router.register` 的功能信息。
2. **梳理依赖**:确认每个 handler 调用的模块,如 `function/func_weather.py`、数据库访问、外部 API。
3. **分类能力**:区分“纯文本问答”、“需要结构化参数的工具调用”、“需要调度/持久化的事务型能力”。
4. **定义统一字段**:初步罗列每个功能需要的字段(例如天气需要 `city`,提醒需要 `time_spec` + `content`)。
5. **技术选型**:确定使用的 function call 接口:
- 若沿用 OpenAI/DeepSeek/gpt-4o 等需确认其 function call JSON schema 支持。
- 若需自定义,可在 `ai_providers` 中新增 `call_with_functions` 方法。
### 阶段 P1定义 FunctionSpec 与注册中心
1. **创建模块结构**:建议新增 `function_calls/` 目录,包含:
- `spec.py`:定义核心数据结构。
- `registry.py`:集中注册所有函数。
- `llm.py`:统一封装 LLM 函数调用接口。
2. **定义数据结构**(示例):
```python
# function_calls/spec.py
from dataclasses import dataclass
from typing import Callable, Any, Dict
@dataclass
class FunctionSpec:
name: str
description: str
parameters_schema: Dict[str, Any]
handler: Callable[[MessageContext, Dict[str, Any]], bool]
examples: list[str] = None
scope: str = "both" # group / private / both
require_at: bool = False
auth: str | None = None # 权限标签(可选)
```
3. **写注册器**:用装饰器或显式方法统一注册:
```python
# function_calls/registry.py
FUNCTION_REGISTRY: dict[str, FunctionSpec] = {}
def register_function(spec: FunctionSpec) -> None:
if spec.name in FUNCTION_REGISTRY:
raise ValueError(f"duplicate function: {spec.name}")
FUNCTION_REGISTRY[spec.name] = spec
```
4. **构建 JSON schema**
- 使用标准 Draft-07 schema字段包括 `type`, `properties`, `required`。
- 设计工具函数,将 Pydantic/自定义 dataclass 自动转 schema便于 handler 书写类型定义)。
5. **迁移功能描述**P0 中梳理的功能,逐一写成 `FunctionSpec`,暂时把 handler 指向旧 handler 的包装函数(下一阶段重写)。
### 阶段 P2实现 FunctionCallRouter 与 LLM 适配
1. **Router 结构**
- 在 `function_calls/router.py` 新建 `FunctionCallRouter`,替代旧 `CommandRouter` 和 `AIRouter`。
- 公开 `dispatch(ctx: MessageContext) -> bool` 接口,供 `Robot.processMsg` 调用。
2. **决策流程**
- 如果消息符合“显式命令”格式,可以在本地直接确定函数(例如以 `/` 开头、或命中关键字表),避免调用 LLM。
- 否则调用 LLM 函数选择:统一走 `FunctionCallLLM.select_function(ctx, registry)`。
3. **LLM 适配**
- 在 `llm.py` 内封装:
1. 将 `FunctionSpec` 列表转换成 OpenAI 函数调用所需的 `functions` 参数(包含 `name`, `description`, `parameters` schema
2. 调用具体模型(例如 `chat_model.call_with_functions(...)`)。如当前模型类没有,需在 `ai_providers` 对应文件内加包装。
- 处理返回:
```python
response = chat_model.call_with_functions([...])
function_name = response.choices[0].message.tool_calls[0].function.name
arguments = json.loads(response.choices[0].message.tool_calls[0].function.arguments)
```
- 若模型不支持函数调用,退化到 prompt + JSON parsing但要封装在适配层可替换。
4. **参数校验**
- 在 router 中对 `arguments` 做 schema 验证(使用 `jsonschema` / `pydantic`)。失败时给出可读错误并返回聊天 fallback。
5. **并行运行策略**
- 在 `Robot` 里保留旧路由开关,例如 `ENABLE_FUNCTION_ROUTER`。
- 灰度期间可先调用新 router如失败再回退旧 `CommandRouter.dispatch`。
6. **日志与追踪**:统一记录:选择的函数、输入参数、执行耗时、是否成功,方便对比新旧行为。
### 阶段 P3Handler 结构化改造
1. **参数模型化**:为每个功能定义数据模型(使用 `pydantic.BaseModel` 或 dataclass
```python
class WeatherArgs(BaseModel):
city: str
```
2. **重写 handler 签名**
- 新 handler 统一为 `def handle(ctx: MessageContext, args: WeatherArgs) -> FunctionResult`。
- `FunctionResult` 可包含 `handled: bool`, `reply: str | None`, `attachments: list[...]` 等,便于拓展。
3. **包装旧逻辑**:将 `commands/handlers.py` 中的旧函数迁到新目录或拆分:
- 对于仍然有效的业务代码,提取核心逻辑到 `services/` 或 `function/` 保留位置,减少重复。
- Handler 仅负责:记录日志 → 调用 service → 发送回复 → 返回结果。
4. **删除自然语言解析**:所有参数应由 LLM 生成的 JSON 直接提供handler 不再解析中文描述。
5. **权限 & 场景**:在 `FunctionSpec` 中配置 `scope`/`require_at` 等字段,在 router 层校验handler 内不再判断。
### 阶段 P4切换入口与清理遗留
1. **替换 `Robot.processMsg` 流程**
- 将调用链切换为 `FunctionCallRouter.dispatch(ctx)`。
- 如果返回 `False` 且 `ctx.chat` 存在,则调用默认聊天模型兜底(原 `handle_chitchat`)。
2. **移除旧模块**
- 删除 `commands/router.py`、`commands/models.py`、`commands/registry.py`、`commands/ai_router.py`、`commands/ai_functions.py`。
- 将保留的业务 handler 根据需要移动到 `function_calls/handlers/` 或 `services/`。
3. **配置与文档更新**:同步更新 `README.MD`、配置项示例,说明如何新增函数、如何控制启用状态。
## 6. 关键实现细节建议
### 6.1 函数清单与元数据
- 建议维护清单表格CSV/Notion/markdown列出函数名、描述、输入字段、输出、依赖模块、是否对群开放、是否需要异步调度。
- 对提醒类功能,注明需要访问数据库(`function/func_reminder.py`),关注事务边界。
### 6.2 Schema 构建工具链
- 提供装饰器,自动从参数模型生成 `FunctionSpec`
```python
def tool_function(name: str, description: str, examples: list[str] = None, **meta):
def wrapper(func):
schema = build_schema_from_model(func.__annotations__['args'])
register_function(FunctionSpec(
name=name,
description=description,
parameters_schema=schema,
handler=func,
examples=examples or [],
**meta,
))
return func
return wrapper
```
- `build_schema_from_model` 可以基于 `pydantic` 的 `model_json_schema()` 实现。
### 6.3 FunctionResult 规范
- 统一约定 handler 返回内容:
```python
class FunctionResult(BaseModel):
handled: bool
messages: list[str] = []
at_list: list[str] = []
metadata: dict[str, Any] = {}
```
- Router 根据返回决定是否向微信发送消息、是否继续 fallback。
### 6.4 兼容旧入参场景
- 对于仍由系统内部触发(非用户输入)的调用(例如定时提醒触发),也复用新的 handler确保所有入口一致。
- 若暂时无法结构化,可定义 `raw_text: str` 字段,作为临时措施;在后续迭代中逐步替换。
### 6.5 日志与观测
- 在 router 层记录:
- LLM 请求/响应 ID、耗时。
- 选中的函数名、参数、handler 执行耗时。
- 异常统一捕获并落盘。
- 可在 `logs/` 目录新建 function-call 专属日志,方便分析差异。
## 7. Prompt 与 LLM 策略
1. **系统提示词**:基于 `FunctionSpec` 自动生成。如目标模型支持原生 function call可省略大量提示词改用 `functions` 参数。
2. **多轮策略**:对不确定的响应,可以:
- 若模型返回 `none` 或 `insufficient_arguments`,让 router 回退到聊天或引导用户补全。
- 对重要函数设置 `confirmation_prompt`,在参数缺失时自动追问。
3. **上下文拼接**:继续使用 `MessageContext` 中的群聊消息、时间等信息,作为 LLM 输入的一部分。
4. **安全校验**:对高风险函数(如“骂人”类)可增加 LLM 分类或黑名单过滤。
## 8. 测试计划
### 8.1 单元测试
- 为每个 handler 编写结构化入参测试,确保直接调用函数即能得到正确输出。
- 为 schema 生成器写测试,保证 JSON schema 与模型字段同步。
### 8.2 集成测试
- 对 `FunctionCallRouter` 构建伪造的 `MessageContext`,模拟关键场景:天气、提醒、新闻等。
- Mock LLM 返回特定函数名和参数,验证 Router 行为正确。
- 针对权限/Scope/need_at 校验写覆盖测试。
### 8.3 回归测试
- 梳理历史日志,挑选典型输入,构建回归用例。
- 增加脚本:读取样本输入 → 调用 router跳过真实 LLM直接指定函数→ 核对输出。
### 8.4 线上灰度验证
- 启用双写模式:新 router 实际处理,旧 router 记录判定结果但不执行,用于对比。
- 制作监控面板(成功率、异常率、响应时间)。
## 9. 发布与回滚策略
- 配置化开关(例如 `config.AI_ROUTER["enable_function_call"]`)。上线时默认灰度群聊,逐步扩大。
- 保留旧命令表与 handler 至至少一个版本周期,确认无回滚需求后再彻底移除。
- 出现问题时,关闭新开关,恢复 `CommandRouter` 行为,确保稳定。
## 10. 验收清单
- [ ] 所有功能均在 `FUNCTION_REGISTRY` 中有唯一条目。
- [ ] 每个函数的参数 schema 通过 `jsonschema.validate` 校验。
- [ ] Handler 不再包含自然语言解析逻辑。
- [ ] LLM 响应处理支持至少一种原生 function call 协议。
- [ ] 所有单测、集测通过,回归样本验证通过。
- [ ] 文档更新:新增功能如何注册、如何编写参数模型、如何调试。
---
> 按上述阶段实施,可在保持现有业务能力的前提下,将整个机器人指令体系迁移到统一的 Function Call 架构,实现更易维护、更稳定的工具调用体系。

View File

@@ -0,0 +1,61 @@
# Function Call 架构改进指南
## 1. 架构一致性如何提升?
- **现状诊断**
- `function_calls/handlers.py:147` 等 handler 仍依赖 `commands/handlers.py` 中的正则/自然语言逻辑,导致新旧两套体系并存,增大耦合与维护成本。
- `function_calls/router.py:41` 的直接匹配逻辑与 LLM 选择逻辑混在一起,职责边界不清晰;而参数校验仅在 LLM 分支执行(`function_calls/router.py:150`)。
- 业务逻辑散落:例如新闻查询同时存在 `_get_news_info` 与命令 handler 版本,缺少统一 service 层。
- **建议的架构骨架**
1. **分层组织代码**
- `function_calls/spec.py`:仅保留数据结构。
- `function_calls/registry.py`:集中注册所有函数。
- `function_calls/router.py`:只负责入口路由、模型互操作和参数校验。
- `function_calls/services/`(新增目录):存放与业务相关的纯函数(例如天气、提醒、骂人等),对外只接受结构化参数。
- `function_calls/handlers/`(可拆分模块):每个 handler 只做 (ctx, args) → 调 service → 组装 FunctionResult。
2. **统一入口**`robot.py` 只初始化 `FunctionCallRouter`,其余旧路由移除,避免双写状态。
3. **约束约定**:所有 handler 必须声明 `args` 的 Pydantic 模型;禁止 handler 内再次解析自然语言或修改 `ctx.msg.content`
4. **配置与日志**:为 `FunctionCallRouter` 添加统一的日志上下文(如 request_id方便追踪函数调用链路。
## 2. 如何让 function 直接填参数、避免重复 LLM 解析?
- **目标**路由阶段完成所有参数解析与校验handler 全部消费结构化 `args`,不再依赖二次自然语言处理。
- **改造步骤**
1. **强制类型转换**:在 `_create_args_instance` 后无论 direct/LLM 均执行 `args_type.model_validate`,并捕获校验错误,向用户返回提示。
2. **拆分提醒业务逻辑**
- 编写 `function_calls/services/reminder.py`,内含 `create_reminder(ctx, ReminderArgs)` 等函数,直接调用 `ctx.robot.reminder_manager`
- 调整 `ReminderArgs` 为真正的结构字段(例如 `schedule: ScheduleArgs`),由 FunctionCallRouter/LLM 负责生成 JSON。
- 对 direct 命令的需求,若要保留,可做一个轻量的自然语言 → `ReminderArgs` 的解析器,独立成 util避免回写 `ctx.msg.content`
3. **其他 handler 同理**
- `insult`:直接调用封装好的 `generate_random_insult(target_user)`,不要构造 fake match`function_calls/handlers.py:216`)。
- `perplexity_search`:把 `query` 直接传给 serviceservice 负责与 `ctx.robot.perplexity` 交互。
4. **落地校验**:在 `function_calls/llm.py:123` 的决策结果里,若 schema 校验不通过,构建友好错误提示并返回 `FunctionResult`
5. **测试**:为每个 handler 编写参数驱动的单测,例如 `test_reminder_set_structured_args` → 传 `ReminderArgs(time_spec="2024-05-01 15:00", content="开会")`,验证生成的提醒记录。
## 3. 目前改造是否达标?还需修什么?
- **仍存在的问题**
- Handler 依赖旧命令逻辑(`function_calls/handlers.py:147`, `:189`, `:216`说明“function 直接消费参数”的目标尚未实现。
- 直接命令分支跳过 `validate_arguments`,导致 `ReminderArgs` 这类模型的必填字段不会被校验(`function_calls/router.py:118`)。
- `FunctionResult.at_list` 虽已在 `_execute_function` 中 join但数据类仍声明为 `list[str]`。如果后续有人直接调用 `ctx.send_text(message, result.at_list)` 将再次踩坑。建议统一改成 `str` 或封装发送逻辑。
- 仍缺少针对新路由的自动化测试,仅有打印式脚本(`test_function_calls.py`)。建议补充 pytest 单测或集成测试。
- **建议修复顺序**
1. 清理 handler 对旧命令的依赖,迁移业务逻辑到 service 层。
2. 在 router 中统一调用 `validate_arguments``_create_args_instance` → handler。
3. 更新 `FunctionResult` 类型定义,提供 helper 方法如 `result.send_via(ctx)` 集中处理消息发送。
4. 编写覆盖天气/新闻/提醒/骂人等核心流程的单测,确保 Function Call 路径稳定。
## 4. 只保留 Function Call旧路由是否移除到位
- **现状**`robot.py:172-272` 仍初始化并调用 `CommandRouter``ai_router`,函数回退逻辑依旧存在。
- **移除建议**
1. 删除 `self.command_router = CommandRouter(...)` 及相关 import同时移除 `CommandRouter.dispatch` 调用与辅助日志。
2. 移除 `ai_router` 回退逻辑和配置项 `FUNCTION_CALL_ROUTER.fallback_to_legacy`。确保配置文件同步更新(`config.yaml.template:151`)。
3. 将闲聊 fallback 改为:当 `FunctionCallRouter` 返回 `False` 时直接走 `handle_chitchat`,并记录原因日志。
4. 清理不再使用的命令注册表与正则代码(`commands/registry.py``commands/router.py` 等),确认没有别的模块引用后可删。
5. 回归测试:运行原有功能用例,确保删除旧路由不会影响提醒、天气等功能;同时观察日志,确认不再出现“命令路由器”相关输出。
## 5. 推荐行动清单(按优先级)
1. **剥离 handler 对旧命令体系的依赖**:完成 service 层拆分,更新所有 handler 为结构化实现。
2. **统一参数校验与错误返回**:调整 router 逻辑,新增校验失败提示,并完善 `FunctionResult` 类型。
3. **移除旧路由与配置**:清理 `robot.py` 中的命令/AI 路由初始化与 fallback更新配置模板。
4. **补全测试**:为 Function Call 核心流程编写 pytest 单元与集成测试,覆盖 direct/LLM 两条路径。
5. **整理文档**:更新开发文档,说明如何新增 function、如何编写参数模型与 service确保团队成员按统一规范扩展功能。
执行完以上步骤后,你将拥有一套纯 Function Call、结构化且易维护的机器人指令体系满足题述的四个目标。

32
function_call_review.md Normal file
View File

@@ -0,0 +1,32 @@
# Function Call 改造代码审核
## 阻塞问题(必须修复)
- **[已完成] FunctionResult 结果封装统一**
- 处理位置:`function_calls/spec.py:9``function_calls/router.py:195``commands/context.py:66``robot.py:296`
- 现状:结果模型已改为 `at: str` 并提供 `dispatch` 方法,路由器不再手动拼接 `@` 列表。`python3 -m compileall function_calls` 验证通过,群聊场景使用新的 `at` 字段。
- **[已完成] 提醒功能使用结构化参数**
- 处理位置:`function_calls/models.py:19``function_calls/services/reminder.py:36``function_calls/handlers.py:70``function_calls/router.py:70`
-`ReminderArgs` 改为 `type/time/content/weekday` 等字段,移除了对旧 `commands.handlers` 的依赖,并删除路由层对提醒的直接自然语言拼装。
## 重要改进项
- **[已完成] Handler 逻辑脱离旧命令体系**
- 现状:所有 handler 均迁移到 `function_calls/services`天气、新闻、提醒、Perplexity、骂人、群工具等不再篡改 `ctx.msg.content` 或调用旧命令模块。
- **[已完成] 直接命令路径参数校验**
- 现状:`function_calls/router.py:102-119` 对直接匹配的函数调用 `validate_arguments`,与 LLM 分支保持一致。
- **[已完成] FunctionSpec 类型标注同步**
- 现状:`function_calls/spec.py:27-34` 中的 `handler` 类型现为 `Callable[[MessageContext, Any], FunctionResult]`
## 架构一致性评估
- 当前所有功能均通过 Function Call 服务层完成,提醒/骂人/搜索等不再依赖自然语言解析。
- LLM 适配层维持兼容,必要时可扩展 jsonschema 校验和重试策略。
- `robot.py:163-231` 仅初始化和调用 `FunctionCallRouter`,旧的命令/AI 路由器已移除,配置项也同步精简。
## 建议的下一步
1. 扩充 `function_calls/services` 层的单元测试例如提醒设置、Perplexity fallback 等),确保服务纯函数行为稳定。
2. 若后续新增工具函数,遵循 `FunctionResult` + service 的模式,并及时更新 `FUNCTION_CALL_USAGE.md`
3. 观察线上日志,确认精简后的路由无遗漏场景;如需更多指令,优先在 direct-match 表中补充结构化参数生成逻辑。
如按以上步骤推进,可逐步达到“标准 Function Call 模式”预期:所有工具能力通过结构化 schema 暴露handler 仅消费结构化参数,无需再回退自然语言解析。

View File

@@ -0,0 +1,3 @@
"""
Function Call 系统核心模块
"""

180
function_calls/handlers.py Normal file
View File

@@ -0,0 +1,180 @@
"""Function Call handlers built on top of structured services."""
from __future__ import annotations
import logging
from commands.context import MessageContext
from .models import (
WeatherArgs,
NewsArgs,
ReminderArgs,
ReminderListArgs,
ReminderDeleteArgs,
PerplexityArgs,
HelpArgs,
SummaryArgs,
ClearMessagesArgs,
InsultArgs,
)
from .registry import tool_function
from .spec import FunctionResult
from .services import (
build_help_text,
build_insult,
clear_group_messages,
create_reminder,
delete_reminder,
get_news_digest,
get_weather_report,
list_reminders,
run_perplexity,
summarize_messages,
)
logger = logging.getLogger(__name__)
@tool_function(
name="weather_query",
description="查询城市天气预报",
examples=["北京天气怎么样", "上海天气", "深圳明天会下雨吗"],
scope="both",
require_at=True,
)
def handle_weather(ctx: MessageContext, args: WeatherArgs) -> FunctionResult:
result = get_weather_report(args.city)
at = ctx.msg.sender if ctx.is_group else ""
return FunctionResult(handled=True, messages=[result.message], at=at if at else "")
@tool_function(
name="news_query",
description="获取今日新闻",
examples=["看看今天的新闻", "今日要闻", "新闻"],
scope="both",
require_at=True,
)
def handle_news(ctx: MessageContext, args: NewsArgs) -> FunctionResult:
result = get_news_digest()
at = ctx.msg.sender if ctx.is_group else ""
return FunctionResult(handled=True, messages=[result.message], at=at if at else "")
@tool_function(
name="reminder_set",
description="设置提醒",
examples=["提醒我明天下午3点开会", "每天早上8点提醒我吃早餐"],
scope="both",
require_at=True,
)
def handle_reminder_set(ctx: MessageContext, args: ReminderArgs) -> FunctionResult:
manager = getattr(ctx.robot, 'reminder_manager', None)
at = ctx.msg.sender if ctx.is_group else ""
if not manager:
return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at)
service_result = create_reminder(
manager=manager,
sender_wxid=ctx.msg.sender,
data=args.model_dump(),
roomid=ctx.msg.roomid if ctx.is_group else None,
)
return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "")
@tool_function(
name="reminder_list",
description="查看所有提醒",
examples=["查看我的提醒", "我有哪些提醒", "提醒列表"],
scope="both",
require_at=True,
)
def handle_reminder_list(ctx: MessageContext, args: ReminderListArgs) -> FunctionResult:
manager = getattr(ctx.robot, 'reminder_manager', None)
at = ctx.msg.sender if ctx.is_group else ""
if not manager:
return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at)
service_result = list_reminders(manager, ctx.msg.sender, ctx.all_contacts)
return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "")
@tool_function(
name="reminder_delete",
description="删除提醒",
examples=["删除开会的提醒", "取消明天的提醒"],
scope="both",
require_at=True,
)
def handle_reminder_delete(ctx: MessageContext, args: ReminderDeleteArgs) -> FunctionResult:
manager = getattr(ctx.robot, 'reminder_manager', None)
at = ctx.msg.sender if ctx.is_group else ""
if not manager:
return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at)
service_result = delete_reminder(manager, ctx.msg.sender, args.reminder_id)
return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "")
@tool_function(
name="perplexity_search",
description="使用Perplexity进行深度搜索查询",
examples=["搜索Python最新特性", "查查机器学习教程", "ask什么是量子计算"],
scope="both",
require_at=True,
)
def handle_perplexity_search(ctx: MessageContext, args: PerplexityArgs) -> FunctionResult:
service_result = run_perplexity(ctx, args.query)
if service_result.handled_externally:
return FunctionResult(handled=True, messages=[])
at = ctx.msg.sender if ctx.is_group else ""
return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "")
@tool_function(
name="help",
description="显示机器人帮助信息",
examples=["help", "帮助", "指令"],
scope="both",
require_at=False,
)
def handle_help(ctx: MessageContext, args: HelpArgs) -> FunctionResult:
help_text = build_help_text()
return FunctionResult(handled=True, messages=[help_text])
@tool_function(
name="summary",
description="总结群聊最近的消息",
examples=["summary", "总结"],
scope="group",
require_at=True,
)
def handle_summary(ctx: MessageContext, args: SummaryArgs) -> FunctionResult:
result = summarize_messages(ctx)
return FunctionResult(handled=True, messages=[result.message])
@tool_function(
name="clear_messages",
description="清除群聊历史消息记录",
examples=["clearmessages", "清除历史"],
scope="group",
require_at=True,
)
def handle_clear_messages(ctx: MessageContext, args: ClearMessagesArgs) -> FunctionResult:
result = clear_group_messages(ctx)
return FunctionResult(handled=True, messages=[result.message])
@tool_function(
name="insult",
description="骂指定用户(仅限群聊)",
examples=["骂一下@某人"],
scope="group",
require_at=True,
)
def handle_insult(ctx: MessageContext, args: InsultArgs) -> FunctionResult:
result = build_insult(ctx, args.target_user)
return FunctionResult(handled=True, messages=[result.message])

View File

@@ -0,0 +1,10 @@
"""
初始化所有Function Call处理器
导入这个模块会自动注册所有处理器到全局注册表
"""
from . import handlers # 导入handlers模块会自动执行所有装饰器注册
# 可以在这里添加一些初始化日志
import logging
logger = logging.getLogger(__name__)
logger.info("Function Call 处理器初始化完成")

263
function_calls/llm.py Normal file
View File

@@ -0,0 +1,263 @@
"""LLM function-call orchestration utilities."""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional
from commands.context import MessageContext
from .spec import FunctionResult, FunctionSpec
logger = logging.getLogger(__name__)
@dataclass
class LLMRunResult:
"""Result of the LLM routing pipeline."""
handled: bool
final_response: Optional[str] = None
error: Optional[str] = None
class FunctionCallLLM:
"""Coordinate function-call capable models with router handlers."""
def __init__(self, max_function_rounds: int = 5) -> None:
self.logger = logger
self.max_function_rounds = max_function_rounds
def run(
self,
ctx: MessageContext,
functions: Dict[str, FunctionSpec],
executor: Callable[[FunctionSpec, Dict[str, Any]], FunctionResult],
formatter: Callable[[FunctionResult], str],
) -> LLMRunResult:
"""Execute the function-call loop and return the final assistant response."""
if not ctx.text:
return LLMRunResult(handled=False)
chat_model = getattr(ctx, "chat", None)
if not chat_model and ctx.robot:
chat_model = getattr(ctx.robot, "chat", None)
if not chat_model:
self.logger.error("无可用的AI模型")
return LLMRunResult(handled=False, error="no_model")
try:
if hasattr(chat_model, "call_with_functions"):
return self._run_native_loop(ctx, chat_model, functions, executor, formatter)
return self._run_prompt_loop(ctx, chat_model, functions, executor)
except Exception as exc: # pragma: no cover - safeguard
self.logger.error(f"LLM 调用失败: {exc}")
return LLMRunResult(handled=False, error=str(exc))
# ---------------------------------------------------------------------
# Native function-call workflow
# ---------------------------------------------------------------------
def _run_native_loop(
self,
ctx: MessageContext,
chat_model: Any,
functions: Dict[str, FunctionSpec],
executor: Callable[[FunctionSpec, Dict[str, Any]], FunctionResult],
formatter: Callable[[FunctionResult], str],
) -> LLMRunResult:
openai_functions = self._build_functions_for_openai(functions)
messages: List[Dict[str, Any]] = []
system_prompt = (
"You are an assistant that can call tools. "
"When you invoke a function, wait for the tool response before replying to the user. "
"Only deliver a final answer once you have enough information."
)
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": ctx.text})
for round_index in range(self.max_function_rounds):
response = chat_model.call_with_functions(
messages=messages,
functions=openai_functions,
wxid=ctx.get_receiver(),
)
if not getattr(response, "choices", None):
self.logger.warning("函数调用返回空响应")
return LLMRunResult(handled=False)
message = response.choices[0].message
assistant_entry = self._convert_assistant_message(message)
messages.append(assistant_entry)
tool_calls = getattr(message, "tool_calls", None) or []
if tool_calls:
for tool_call in tool_calls:
function_name = tool_call.function.name
if function_name not in functions:
self.logger.warning(f"模型请求未知函数: {function_name}")
tool_content = json.dumps(
{
"handled": False,
"messages": [f"Unknown function: {function_name}"],
"metadata": {"error": "unknown_function"},
},
ensure_ascii=False,
)
else:
try:
arguments = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError:
arguments = {}
spec = functions[function_name]
result = executor(spec, arguments)
tool_content = formatter(result)
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_content,
}
)
continue
# 没有工具调用,认为模型给出了最终回答
final_content = message.content or ""
return LLMRunResult(handled=True, final_response=final_content)
self.logger.warning("达到最大函数调用轮数,未得到最终回答")
return LLMRunResult(handled=False, error="max_rounds")
# ---------------------------------------------------------------------
# Prompt-based fallback workflow
# ---------------------------------------------------------------------
def _run_prompt_loop(
self,
ctx: MessageContext,
chat_model: Any,
functions: Dict[str, FunctionSpec],
executor: Callable[[FunctionSpec, Dict[str, Any]], FunctionResult],
) -> LLMRunResult:
system_prompt = self._build_prompt_system_text(functions)
user_input = f"用户输入:{ctx.text}"
ai_response = chat_model.get_answer(
user_input,
wxid=ctx.get_receiver(),
system_prompt_override=system_prompt,
)
json_match = re.search(r"\{.*\}", ai_response, re.DOTALL)
if not json_match:
self.logger.warning(f"提示词模式下无法解析JSON: {ai_response}")
return LLMRunResult(handled=False)
try:
decision = json.loads(json_match.group(0))
except json.JSONDecodeError as exc:
self.logger.error(f"提示词模式 JSON 解析失败: {exc}")
return LLMRunResult(handled=False)
action_type = decision.get("action_type")
if action_type == "chat":
# 提示词模式下无法获得模型最终回答,交给上层兜底
return LLMRunResult(handled=False)
if action_type != "function":
self.logger.warning(f"未知的action_type: {action_type}")
return LLMRunResult(handled=False)
function_name = decision.get("function_name")
if function_name not in functions:
self.logger.warning(f"未知的功能名 - {function_name}")
return LLMRunResult(handled=False)
arguments = decision.get("arguments", {})
result = executor(functions[function_name], arguments)
if not result.handled:
return LLMRunResult(handled=False)
return LLMRunResult(handled=True, final_response="\n".join(result.messages))
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _convert_assistant_message(message: Any) -> Dict[str, Any]:
entry: Dict[str, Any] = {
"role": "assistant",
"content": message.content,
}
tool_calls = getattr(message, "tool_calls", None)
if tool_calls:
entry["tool_calls"] = []
for tool_call in tool_calls:
entry["tool_calls"].append(
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
},
}
)
return entry
@staticmethod
def _build_functions_for_openai(functions: Dict[str, FunctionSpec]) -> List[Dict[str, Any]]:
openai_functions = []
for spec in functions.values():
openai_functions.append(
{
"name": spec.name,
"description": spec.description,
"parameters": spec.parameters_schema,
}
)
return openai_functions
@staticmethod
def _build_prompt_system_text(functions: Dict[str, FunctionSpec]) -> str:
prompt = """你是一个智能路由助手。根据用户输入判断是否需要调用以下函数之一。"""
for spec in functions.values():
prompt += f"\n- {spec.name}: {spec.description}"
prompt += """
请严格输出JSON{"action_type": "chat"} 或 {"action_type": "function", "function_name": "...", "arguments": {...}}
"""
return prompt
def validate_arguments(self, arguments: Dict[str, Any], schema: Dict[str, Any]) -> bool:
try:
required_fields = schema.get("required", [])
properties = schema.get("properties", {})
for field in required_fields:
if field not in arguments:
self.logger.warning(f"缺少必需参数: {field}")
return False
for field, value in arguments.items():
if field not in properties:
continue
expected_type = properties[field].get("type")
if expected_type == "string" and not isinstance(value, str):
self.logger.warning(f"参数 {field} 类型不正确,期望 string得到 {type(value)}")
return False
if expected_type == "integer" and not isinstance(value, int):
self.logger.warning(f"参数 {field} 类型不正确,期望 integer得到 {type(value)}")
return False
if expected_type == "number" and not isinstance(value, (int, float)):
self.logger.warning(f"参数 {field} 类型不正确,期望 number得到 {type(value)}")
return False
return True
except Exception as exc:
self.logger.error(f"参数验证失败: {exc}")
return False

61
function_calls/models.py Normal file
View File

@@ -0,0 +1,61 @@
"""
Function Call 参数模型定义
"""
from typing import Literal, Optional
from pydantic import BaseModel
class WeatherArgs(BaseModel):
"""天气查询参数"""
city: str
class NewsArgs(BaseModel):
"""新闻查询参数 - 无需参数"""
pass
class ReminderArgs(BaseModel):
"""设置提醒参数"""
type: Literal["once", "daily", "weekly"]
time: str
content: str
weekday: Optional[int] = None
class ReminderListArgs(BaseModel):
"""查看提醒列表参数 - 无需参数"""
pass
class ReminderDeleteArgs(BaseModel):
"""删除提醒参数"""
reminder_id: str
class PerplexityArgs(BaseModel):
"""Perplexity搜索参数"""
query: str
class HelpArgs(BaseModel):
"""帮助信息参数 - 无需参数"""
pass
class SummaryArgs(BaseModel):
"""消息总结参数 - 无需参数"""
pass
class ClearMessagesArgs(BaseModel):
"""清除消息参数 - 无需参数"""
pass
class InsultArgs(BaseModel):
"""骂人功能参数"""
target_user: str

108
function_calls/registry.py Normal file
View File

@@ -0,0 +1,108 @@
"""
函数注册中心
"""
import logging
from typing import Dict, Any, get_type_hints
from pydantic import BaseModel
from .spec import FunctionSpec
logger = logging.getLogger(__name__)
# 全局函数注册表
FUNCTION_REGISTRY: Dict[str, FunctionSpec] = {}
def register_function(spec: FunctionSpec) -> None:
"""注册函数到全局注册表"""
if spec.name in FUNCTION_REGISTRY:
raise ValueError(f"重复的函数名: {spec.name}")
FUNCTION_REGISTRY[spec.name] = spec
logger.info(f"注册函数: {spec.name} - {spec.description}")
def get_function(name: str) -> FunctionSpec:
"""获取指定名称的函数规格"""
if name not in FUNCTION_REGISTRY:
raise ValueError(f"未找到函数: {name}")
return FUNCTION_REGISTRY[name]
def list_functions() -> Dict[str, FunctionSpec]:
"""获取所有已注册的函数"""
return FUNCTION_REGISTRY.copy()
def build_schema_from_model(model_class) -> Dict[str, Any]:
"""从 Pydantic 模型构建 JSON Schema"""
if issubclass(model_class, BaseModel):
return model_class.model_json_schema()
else:
# 简单的dataclass或类型注解支持
hints = get_type_hints(model_class)
properties = {}
required = []
for field_name, field_type in hints.items():
if field_name.startswith('_'):
continue
properties[field_name] = _type_to_schema(field_type)
required.append(field_name)
return {
"type": "object",
"properties": properties,
"required": required
}
def _type_to_schema(field_type) -> Dict[str, Any]:
"""将Python类型转换为JSON Schema"""
if field_type == str:
return {"type": "string"}
elif field_type == int:
return {"type": "integer"}
elif field_type == float:
return {"type": "number"}
elif field_type == bool:
return {"type": "boolean"}
else:
return {"type": "string", "description": f"类型: {field_type}"}
def tool_function(name: str, description: str, examples: list[str] = None, **meta):
"""
装饰器自动注册函数到Function Call系统
@tool_function(
name="weather_query",
description="查询天气",
examples=["北京天气怎么样"]
)
def handle_weather(ctx: MessageContext, args: WeatherArgs) -> FunctionResult:
pass
"""
def wrapper(func):
# 获取函数参数类型注解
hints = get_type_hints(func)
args_type = hints.get('args')
if args_type:
schema = build_schema_from_model(args_type)
else:
schema = {"type": "object", "properties": {}, "required": []}
spec = FunctionSpec(
name=name,
description=description,
parameters_schema=schema,
handler=func,
examples=examples or [],
**meta
)
register_function(spec)
return func
return wrapper

227
function_calls/router.py Normal file
View File

@@ -0,0 +1,227 @@
"""Function Call 路由器"""
import logging
from typing import Any, Dict, Optional
from commands.context import MessageContext
from .spec import FunctionResult, FunctionSpec
from .registry import list_functions
from .llm import FunctionCallLLM
logger = logging.getLogger(__name__)
class FunctionCallRouter:
"""函数调用路由器"""
def __init__(self, robot_instance=None):
self.robot_instance = robot_instance
self.llm = FunctionCallLLM()
self.logger = logger
def _check_scope_and_permissions(self, ctx: MessageContext, spec: FunctionSpec) -> bool:
"""检查作用域和权限"""
# 1. 检查作用域
if spec.scope != "both":
if (spec.scope == "group" and not ctx.is_group) or \
(spec.scope == "private" and ctx.is_group):
return False
# 2. 检查是否需要@机器人(仅在群聊中有效)
if ctx.is_group and spec.require_at and not ctx.is_at_bot:
return False
# 3. 检查权限如果有auth字段
if spec.auth:
# TODO: 实现权限检查逻辑
pass
return True
def _try_direct_command_match(self, ctx: MessageContext) -> Optional[str]:
"""
尝试直接命令匹配避免不必要的LLM调用
返回匹配的函数名如果没有匹配则返回None
"""
text = ctx.text.strip().lower()
# 定义一些明确的命令关键字映射
direct_commands = {
"help": "help",
"帮助": "help",
"指令": "help",
"新闻": "news_query",
"summary": "summary",
"总结": "summary",
"clearmessages": "clear_messages",
"清除历史": "clear_messages"
}
# 检查完全匹配
if text in direct_commands:
return direct_commands[text]
# 检查以特定前缀开头的命令
if text.startswith("ask ") and len(text) > 4:
return "perplexity_search"
if text.startswith("天气") or text.startswith("天气预报"):
return "weather_query"
if text in ["查看提醒", "我的提醒", "提醒列表"]:
return "reminder_list"
if text.startswith("骂一下"):
return "insult"
return None
def dispatch(self, ctx: MessageContext) -> bool:
"""
分发消息到函数处理器
返回: 是否成功处理
"""
try:
# 确保context可以访问到robot实例
if self.robot_instance and not ctx.robot:
ctx.robot = self.robot_instance
if hasattr(self.robot_instance, 'LOG') and not ctx.logger:
ctx.logger = self.robot_instance.LOG
if ctx.logger:
ctx.logger.debug(f"FunctionCallRouter 开始处理: '{ctx.text}', 来自: {ctx.sender_name}")
# 获取所有可用函数
functions = list_functions()
if not functions:
self.logger.warning("没有注册任何函数")
return False
# 第一步:尝试直接命令匹配
direct_function = self._try_direct_command_match(ctx)
if direct_function and direct_function in functions:
spec = functions[direct_function]
if not self._check_scope_and_permissions(ctx, spec):
return False
arguments = self._extract_arguments_for_direct_command(ctx, direct_function)
if not self.llm.validate_arguments(arguments, spec.parameters_schema):
self.logger.warning(f"直接命令 {direct_function} 参数验证失败")
return False
result = self._invoke_function(ctx, spec, arguments)
if result.handled:
result.dispatch(ctx)
return True
# 如果没有处理成功继续尝试LLM流程
# 第二步使用LLM执行多轮函数调用
llm_result = self.llm.run(
ctx,
functions,
lambda spec, args: self._invoke_function(ctx, spec, args),
self._format_tool_response,
)
if not llm_result.handled:
return False
if llm_result.final_response:
at = ctx.msg.sender if ctx.is_group else ""
ctx.send_text(llm_result.final_response, at)
return True
return True
except Exception as e:
self.logger.error(f"FunctionCallRouter dispatch 异常: {e}")
return False
def _extract_arguments_for_direct_command(self, ctx: MessageContext, function_name: str) -> Dict[str, Any]:
"""为直接命令提取参数"""
text = ctx.text.strip()
if function_name == "weather_query":
# 提取城市名
if text.startswith("天气预报 "):
city = text[4:].strip()
elif text.startswith("天气 "):
city = text[3:].strip()
else:
city = ""
return {"city": city}
elif function_name == "perplexity_search":
# 提取搜索查询
if text.startswith("ask "):
query = text[4:].strip()
else:
query = text
return {"query": query}
elif function_name == "insult":
# 提取要骂的用户
import re
match = re.search(r"骂一下\s*@([^\s@]+)", text)
target_user = match.group(1) if match else ""
return {"target_user": target_user}
# 对于不需要参数的函数,返回空字典
return {}
def _invoke_function(self, ctx: MessageContext, spec: FunctionSpec, arguments: Dict[str, Any]) -> FunctionResult:
"""调用函数处理器,返回结构化结果"""
try:
if ctx.logger:
ctx.logger.info(f"执行函数: {spec.name}, 参数: {arguments}")
args_instance = self._create_args_instance(spec, arguments)
result = spec.handler(ctx, args_instance)
if not isinstance(result, FunctionResult):
raise TypeError(f"函数 {spec.name} 返回了非 FunctionResult 类型: {type(result)}")
if ctx.logger and not result.handled:
ctx.logger.warning(f"函数 {spec.name} 返回未处理状态")
return result
except Exception as exc:
self.logger.error(f"执行函数 {spec.name} 异常: {exc}")
return FunctionResult(
handled=False,
messages=[f"函数 {spec.name} 执行失败: {exc}"],
metadata={"error": str(exc)},
)
def _create_args_instance(self, spec: FunctionSpec, arguments: Dict[str, Any]):
"""根据函数规格创建参数实例"""
try:
# 获取函数的类型注解
from typing import get_type_hints
hints = get_type_hints(spec.handler)
args_type = hints.get('args')
if args_type:
# 如果是Pydantic模型
if hasattr(args_type, 'model_validate'):
return args_type.model_validate(arguments)
elif hasattr(args_type, '__init__'):
# 普通类
return args_type(**arguments)
# 如果没有类型注解,返回参数字典
return arguments
except Exception as exc:
self.logger.error(f"创建参数实例失败: {exc}")
raise
@staticmethod
def _format_tool_response(result: FunctionResult) -> str:
"""将 FunctionResult 格式化为供 LLM 读取的 tool 响应"""
return result.to_tool_content()

View File

@@ -0,0 +1,22 @@
"""Service helpers for Function Call handlers."""
from .weather import get_weather_report
from .news import get_news_digest
from .reminder import create_reminder, list_reminders, delete_reminder
from .help import build_help_text
from .group_tools import summarize_messages, clear_group_messages
from .perplexity import run_perplexity
from .insult import build_insult
__all__ = [
"get_weather_report",
"get_news_digest",
"create_reminder",
"list_reminders",
"delete_reminder",
"build_help_text",
"summarize_messages",
"clear_group_messages",
"run_perplexity",
"build_insult",
]

View File

@@ -0,0 +1,47 @@
"""Group related utilities for Function Call handlers."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from commands.context import MessageContext
@dataclass
class GroupToolResult:
success: bool
message: str
def summarize_messages(ctx: MessageContext) -> GroupToolResult:
if not ctx.is_group:
return GroupToolResult(success=True, message="⚠️ 消息总结功能仅支持群聊")
if not ctx.robot or not hasattr(ctx.robot, "message_summary") or not hasattr(ctx.robot, "chat"):
return GroupToolResult(success=False, message="⚠️ 消息总结功能不可用")
try:
summary = ctx.robot.message_summary.summarize_messages(ctx.msg.roomid, ctx.robot.chat)
return GroupToolResult(success=True, message=summary)
except Exception as exc:
if ctx.logger:
ctx.logger.error(f"生成消息总结出错: {exc}")
return GroupToolResult(success=False, message="⚠️ 生成消息总结失败")
def clear_group_messages(ctx: MessageContext) -> GroupToolResult:
if not ctx.is_group:
return GroupToolResult(success=True, message="⚠️ 消息历史管理功能仅支持群聊")
if not ctx.robot or not hasattr(ctx.robot, "message_summary"):
return GroupToolResult(success=False, message="⚠️ 消息历史管理功能不可用")
try:
cleared = ctx.robot.message_summary.clear_message_history(ctx.msg.roomid)
if cleared:
return GroupToolResult(success=True, message="✅ 已清除本群的消息历史记录")
return GroupToolResult(success=True, message="⚠️ 本群没有消息历史记录")
except Exception as exc:
if ctx.logger:
ctx.logger.error(f"清除消息历史出错: {exc}")
return GroupToolResult(success=False, message="⚠️ 清除消息历史失败")

View File

@@ -0,0 +1,34 @@
"""Static help text utility."""
from __future__ import annotations
HELP_LINES = [
"🤖 泡泡的指令列表 🤖",
"",
"【实用工具】",
"- 天气/温度 [城市名]",
"- 天气预报/预报 [城市名]",
"- 新闻",
"- ask [问题]",
"",
"【决斗 & 偷袭】",
"- 决斗@XX",
"- 偷袭@XX",
"- 决斗排行/排行榜",
"- 我的战绩/决斗战绩",
"- 我的装备/查看装备",
"- 改名 [旧名] [新名]",
"",
"【提醒】",
"- 提醒xxxxx一次性、每日、每周",
"- 查看提醒/我的提醒/提醒列表",
"- 删..提醒..",
"",
"【群聊工具】",
"- summary/总结",
"- clearmessages/清除历史",
]
def build_help_text() -> str:
"""Return formatted help text."""
return "\n".join(HELP_LINES)

View File

@@ -0,0 +1,50 @@
"""Group insult helper utilities."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from commands.context import MessageContext
from function.func_insult import generate_random_insult
@dataclass
class InsultResult:
success: bool
message: str
def build_insult(ctx: MessageContext, target_name: str) -> InsultResult:
if not ctx.is_group:
return InsultResult(success=True, message="❌ 骂人功能只支持群聊哦~")
cleaned = target_name.strip()
if not cleaned:
return InsultResult(success=False, message="❌ 需要提供要骂的对象")
actual_target = cleaned
target_wxid: Optional[str] = None
try:
members = ctx.room_members
if members:
for wxid, name in members.items():
if cleaned == name:
target_wxid = wxid
actual_target = name
break
if target_wxid is None:
for wxid, name in members.items():
if cleaned in name and wxid != ctx.robot_wxid:
target_wxid = wxid
actual_target = name
break
except Exception as exc:
if ctx.logger:
ctx.logger.error(f"查找群成员信息时出错: {exc}")
if target_wxid and target_wxid == ctx.robot_wxid:
return InsultResult(success=True, message="😅 不行,我不能骂我自己。")
insult_text = generate_random_insult(actual_target)
return InsultResult(success=True, message=insult_text)

View File

@@ -0,0 +1,36 @@
"""News related service helpers for Function Call handlers."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Optional
from function.func_news import News
logger = logging.getLogger(__name__)
@dataclass
class NewsResult:
success: bool
message: str
is_today: Optional[bool] = None
def get_news_digest() -> NewsResult:
"""Fetch latest news digest."""
try:
news_instance = News()
is_today, content = news_instance.get_important_news()
if is_today:
message = f"📰 今日要闻来啦:\n{content}"
else:
if content:
message = f" 今日新闻暂未发布,为您找到最近的一条新闻:\n{content}"
else:
message = "❌ 获取新闻失败,请稍后重试"
return NewsResult(success=True, message=message, is_today=is_today)
except Exception as exc:
logger.error(f"获取新闻失败: {exc}")
return NewsResult(success=False, message="❌ 获取新闻时发生错误")

View File

@@ -0,0 +1,62 @@
"""Perplexity integration helpers."""
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from commands.context import MessageContext
@dataclass
class PerplexityResult:
success: bool
messages: List[str]
handled_externally: bool = False
def run_perplexity(ctx: MessageContext, query: str) -> PerplexityResult:
query = query.strip()
if not query:
at = ctx.msg.sender if ctx.is_group else ""
return PerplexityResult(success=True, messages=["请告诉我你想搜索什么内容"], handled_externally=False)
perplexity_instance = getattr(ctx.robot, 'perplexity', None)
if not perplexity_instance:
return PerplexityResult(success=True, messages=["❌ Perplexity搜索功能当前不可用"], handled_externally=False)
content_for_perplexity = f"ask {query}"
chat_id = ctx.get_receiver()
sender_wxid = ctx.msg.sender
room_id = ctx.msg.roomid if ctx.is_group else None
was_handled, fallback_prompt = perplexity_instance.process_message(
content=content_for_perplexity,
chat_id=chat_id,
sender=sender_wxid,
roomid=room_id,
from_group=ctx.is_group,
send_text_func=ctx.send_text
)
if was_handled:
return PerplexityResult(success=True, messages=[], handled_externally=True)
if fallback_prompt:
chat_model = getattr(ctx, 'chat', None) or (getattr(ctx.robot, 'chat', None) if ctx.robot else None)
if chat_model:
try:
import time
current_time = time.strftime("%H:%M", time.localtime())
formatted_question = f"[{current_time}] {ctx.sender_name}: {query}"
answer = chat_model.get_answer(
question=formatted_question,
wxid=ctx.get_receiver(),
system_prompt_override=fallback_prompt
)
if answer:
return PerplexityResult(success=True, messages=[answer], handled_externally=False)
except Exception as exc:
if ctx.logger:
ctx.logger.error(f"默认AI处理失败: {exc}")
return PerplexityResult(success=True, messages=["❌ Perplexity搜索时发生错误"], handled_externally=False)

View File

@@ -0,0 +1,101 @@
"""Reminder related services for Function Call handlers."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from function.func_reminder import ReminderManager
@dataclass
class ReminderServiceResult:
success: bool
messages: List[str]
_WEEKDAY_LABELS = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
_TYPE_LABELS = {"once": "一次性", "daily": "每日", "weekly": "每周"}
def _format_schedule(data: Dict[str, Any]) -> str:
reminder_type = data.get("type", "once")
time_str = data.get("time", "?")
if reminder_type == "once":
return f"{time_str} (一次性)"
if reminder_type == "daily":
return f"每天 {time_str}"
if reminder_type == "weekly":
weekday = data.get("weekday")
if isinstance(weekday, int) and 0 <= weekday < len(_WEEKDAY_LABELS):
return f"每周{_WEEKDAY_LABELS[weekday]} {time_str}"
return f"每周 {time_str}"
return f"{time_str}"
def create_reminder(
manager: ReminderManager,
sender_wxid: str,
data: Dict[str, Any],
roomid: Optional[str]
) -> ReminderServiceResult:
payload = {
"type": data["type"],
"time": data["time"],
"content": data["content"],
}
if data.get("weekday") is not None:
payload["weekday"] = data["weekday"]
success, info = manager.add_reminder(sender_wxid, payload, roomid=roomid)
if not success:
return ReminderServiceResult(success=False, messages=[f"❌ 设置提醒失败:{info}"])
schedule = payload.copy()
message = (
"✅ 已为您设置{type_label}提醒\n"
"时间: {schedule}\n"
"内容: {content}"
).format(
type_label=_TYPE_LABELS.get(payload["type"], ""),
schedule=_format_schedule(payload),
content=payload["content"],
)
return ReminderServiceResult(success=True, messages=[message])
def list_reminders(
manager: ReminderManager,
sender_wxid: str,
contacts: Dict[str, str]
) -> ReminderServiceResult:
reminders = manager.list_reminders(sender_wxid)
if not reminders:
return ReminderServiceResult(success=True, messages=["您还没有设置任何提醒。"])
lines: List[str] = ["📝 您设置的提醒列表(包括私聊和群聊):"]
for idx, reminder in enumerate(reminders, start=1):
schedule_display = _format_schedule({
"type": reminder.get("type"),
"time": reminder.get("time_str"),
"weekday": reminder.get("weekday"),
})
if reminder.get("type") == "once":
schedule_display = reminder.get("time_str", "?")
scope = "[私聊]"
roomid = reminder.get("roomid")
if roomid:
room_name = contacts.get(roomid) or roomid[:8]
scope = f"[群:{room_name}]"
lines.append(
f"{idx}. [ID: {reminder.get('id', '')[:6]}] {scope}{schedule_display}: {reminder.get('content', '')}"
)
return ReminderServiceResult(success=True, messages=["\n".join(lines)])
def delete_reminder(manager: ReminderManager, sender_wxid: str, reminder_id: str) -> ReminderServiceResult:
success, info = manager.delete_reminder(sender_wxid, reminder_id)
if success:
return ReminderServiceResult(success=True, messages=[f"{info}"])
return ReminderServiceResult(success=False, messages=[f"{info}"])

View File

@@ -0,0 +1,65 @@
"""Weather related service helpers for Function Call handlers."""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass
from typing import Optional
from function.func_weather import Weather
logger = logging.getLogger(__name__)
@dataclass
class WeatherResult:
success: bool
message: str
city: Optional[str] = None
def _load_city_codes() -> dict[str, str]:
"""Load mapping between city names and weather codes."""
city_code_path = os.path.join(os.path.dirname(__file__), '..', '..', 'function', 'main_city.json')
with open(city_code_path, 'r', encoding='utf-8') as f:
return json.load(f)
def get_weather_report(city_name: str) -> WeatherResult:
"""Return a weather report for a given city.
Args:
city_name: City provided by the user.
Returns:
WeatherResult containing success status and message.
"""
city = city_name.strip()
if not city:
return WeatherResult(success=False, message="🤔 请告诉我你想查询哪个城市的天气")
try:
city_codes = _load_city_codes()
except Exception as exc: # pragma: no cover - IO failure is rare
logger.error(f"加载城市代码失败: {exc}")
return WeatherResult(success=False, message="⚠️ 抱歉,天气功能暂时不可用")
code = city_codes.get(city)
if not code:
for name, value in city_codes.items():
if city in name:
code = value
city = name
break
if not code:
return WeatherResult(success=False, message=f"😕 找不到城市 '{city_name}' 的天气信息")
try:
weather_text = Weather(code).get_weather(include_forecast=True)
return WeatherResult(success=True, message=weather_text, city=city)
except Exception as exc: # pragma: no cover - upstream API failure
logger.error(f"获取天气数据失败: {exc}")
return WeatherResult(success=False, message=f"😥 获取 {city} 天气时遇到问题")

50
function_calls/spec.py Normal file
View File

@@ -0,0 +1,50 @@
"""函数规格定义与相关数据结构"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Optional
from commands.context import MessageContext
@dataclass
class FunctionResult:
"""Standardized execution result returned by handlers."""
handled: bool
messages: list[str] = field(default_factory=list)
at: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
def dispatch(self, ctx: MessageContext) -> None:
"""Send messages through the context when handled successfully."""
if not self.handled:
return
for message in self.messages:
ctx.send_text(message, self.at)
def to_tool_content(self) -> str:
"""Serialize result for LLM tool messages."""
payload = {
"handled": self.handled,
"messages": self.messages,
"metadata": self.metadata or {},
}
return json.dumps(payload, ensure_ascii=False)
@dataclass
class FunctionSpec:
"""函数规格定义"""
name: str
description: str
parameters_schema: Dict[str, Any]
handler: Callable[[MessageContext, Any], FunctionResult]
examples: list[str] = field(default_factory=list)
scope: str = "both" # group / private / both
require_at: bool = False
auth: Optional[str] = None # 权限标签(可选)

View File

@@ -32,8 +32,6 @@ logging.getLogger("httpx").setLevel(logging.ERROR) # 提高为 ERROR
logging.getLogger("Weather").setLevel(logging.WARNING)
logging.getLogger("ai_providers").setLevel(logging.WARNING)
logging.getLogger("commands").setLevel(logging.WARNING)
# 临时调试为AI路由器设置更详细的日志级别
logging.getLogger("commands.ai_router").setLevel(logging.INFO)
from function.func_report_reminder import ReportReminder
from configuration import Config

View File

@@ -28,15 +28,13 @@ from constants import ChatType
from job_mgmt import Job
from function.func_xml_process import XmlProcessor
# 导入命令路由系统
# 导入命令上下文与闲聊处理
from commands.context import MessageContext
from commands.router import CommandRouter
from commands.registry import COMMANDS, get_commands_info
from commands.handlers import handle_chitchat # 导入闲聊处理函数
# 导入AI路由系统
from commands.ai_router import ai_router
import commands.ai_functions # 导入以注册所有AI功能
# 导入新的Function Call系统
from function_calls.router import FunctionCallRouter
import function_calls.init_handlers # 导入以注册所有Function Call处理器
__version__ = "39.2.4.0"
@@ -165,12 +163,11 @@ class Robot(Job):
# 初始化图像生成管理器
self.image_manager = ImageGenerationManager(self.config, self.wcf, self.LOG, self.sendTextMsg)
# 初始化命令路由器
self.command_router = CommandRouter(COMMANDS, robot_instance=self)
self.LOG.info(f"命令路由系统初始化完成,共加载 {len(COMMANDS)} 条命令")
# 初始化AI路由器
self.LOG.info(f"AI路由系统初始化完成共加载 {len(ai_router.functions)} 个AI功能")
# 初始化Function Call路由器
self.function_call_router = FunctionCallRouter(robot_instance=self)
from function_calls.registry import list_functions
functions = list_functions()
self.LOG.info(f"Function Call路由系统初始化完成共注册 {len(functions)} 个函数")
# 初始化提醒管理器
try:
@@ -212,23 +209,27 @@ class Robot(Job):
setattr(ctx, 'chat', self.chat)
setattr(ctx, 'specific_max_history', specific_limit)
# 5. 使用命令路由器分发处理消息
handled = self.command_router.dispatch(ctx)
# 6. 如果正则路由器没有处理尝试AI路由器
if not handled:
# 只在被@或私聊时才使用AI路由
if (msg.from_group() and msg.is_at(self.wxid)) or not msg.from_group():
print(f"[AI路由调试] 准备调用AI路由器处理消息: {msg.content}")
ai_handled = ai_router.dispatch(ctx)
print(f"[AI路由调试] AI路由器处理结果: {ai_handled}")
if ai_handled:
self.LOG.info("消息已由AI路由器处理")
print("[AI路由调试] 消息已成功由AI路由器处理")
return
else:
print("[AI路由调试] AI路由器处理该消息")
# 5. 根据配置选择路由系统
handled = False
function_call_config = getattr(self.config, 'FUNCTION_CALL_ROUTER', {})
use_function_call = function_call_config.get('enable', True)
debug_function_call = function_call_config.get('debug', False)
if use_function_call:
try:
if debug_function_call:
self.LOG.debug(f"[Function Call] 开始处理消息: {msg.content}")
handled = self.function_call_router.dispatch(ctx)
if debug_function_call:
self.LOG.debug(f"[Function Call] 处理结果: {handled}")
except Exception as e:
self.LOG.error(f"Function Call路由器处理异常: {e}")
if handled:
self.LOG.info("消息已由Function Call路由器处理")
return
# 7. 如果没有命令处理器处理,则进行特殊逻辑处理
if not handled:
# 7.1 好友请求自动处理
@@ -672,4 +673,3 @@ class Robot(Job):
self.LOG.debug(f"预处理消息: text='{ctx.text}', is_group={ctx.is_group}, is_at_bot={ctx.is_at_bot}, sender='{ctx.sender_name}', is_quoted_image={is_quoted_image}")
return ctx