diff --git a/README.MD b/README.MD index 82a2529..98da8bf 100644 --- a/README.MD +++ b/README.MD @@ -33,7 +33,12 @@ Bubbles 是一个功能丰富的微信机器人框架,基于 [wcferry](https://github.com/lich0821/wcferry) 和 [WechatRobot](https://github.com/lich0821/wechatrobot) 开发,支持接入多种LLM,提供丰富的交互功能和定时任务。该项目旨在将微信客户端转变为一个智能的个人助手,可以执行多种实用功能,带来便捷的用户体验。 -和一般机器人框架不同的是,Bubbles 设计了一套简单的 [**命令路由系统**](https://github.com/Zippland/Bubbles/blob/main/commands/registry.py) ,让添加新功能变得容易,且不用改动任何原有的代码 —— 相当于给一个主线 Hub 添加一个插件,只用通过简单正则逻辑,即可将外部服务进行注册,将海量的、不同种类的工具集成到 AI 里。具体操作详见 **如何添加新功能(命令路由系统)** 章节。 +和一般机器人框架不同的是,Bubbles 设计了两套灵活的路由系统: + +1. **命令路由系统** - 基于正则表达式的精确命令匹配,适合有明确触发词的功能 +2. **AI智能路由系统** - 基于AI的自然语言理解,自动识别用户意图并调用相应功能 + +通过这两套路由系统,添加新功能变得极其简单,且不需要改动原有代码。相当于给一个主线 Hub 添加插件,让海量的、不同种类的工具都能集成到 AI 里。具体操作详见 **如何添加新功能** 章节。 #### 案例演示其一:使用自然语言设置提醒 @@ -54,8 +59,9 @@ Bubbles 是一个功能丰富的微信机器人框架,基于 [wcferry](https:/ - DeepSeek - Perplexity -#### 🛠️ 丰富的命令系统 -- 强大的命令路由系统,让功能新增无比简单 +#### 🛠️ 双重路由系统 +- **命令路由系统**:基于正则表达式的精确匹配,高效处理特定命令 +- **AI智能路由**:自然语言理解,无需记住特定命令格式 - 支持自定义命令及参数 - 预设 [多种实用和娱乐命令](#可用命令) @@ -213,8 +219,10 @@ Bubbles-WechatAI/ │ ├── ai_name.py # AI 模型接口实现 │ └── ... ├── commands/ # 命令系统 -│ ├── registry.py # 命令注册 -│ ├── handlers.py # 实现功能调用的函数 +│ ├── registry.py # 正则命令注册 +│ ├── handlers.py # 命令处理函数 +│ ├── ai_router.py # AI智能路由器 +│ ├── ai_functions.py # AI路由功能注册 │ └── ... ├── data/ # 数据文件 │ @@ -225,9 +233,11 @@ Bubbles-WechatAI/ └── ... ``` -### ✨ 如何添加新功能(命令路由系统) +### ✨ 如何添加新功能 -本项目设计了一套简单的命令路由系统,让添加新功能变得容易。主要流程如下: +本项目提供两种方式添加新功能: + +#### 方式一:使用命令路由系统(适合有明确触发词的功能) 1. **定义功能逻辑 (可选但推荐)**: * 如果你的功能逻辑比较复杂,建议在 `function/` 目录下创建一个新的 Python 文件 (例如 `func_your_feature.py`)。 @@ -260,6 +270,46 @@ Bubbles-WechatAI/ 4. **更新帮助信息 (可选)**: * 如果希望用户能在 `帮助` 命令中看到你的新功能,可以更新 `commands/handlers.py` 中的 `handle_help` 函数,将新命令的用法添加到帮助文本中。 +#### 方式二:使用AI智能路由(适合自然语言交互的功能) + +AI路由系统让用户可以用自然语言触发功能,无需记住特定命令格式: + +1. **实现功能逻辑**: + * 在 `function/` 目录下创建功能模块(如已有则跳过) + +2. **注册AI路由功能**: + * 打开 `commands/ai_functions.py` + * 使用装饰器注册你的功能: + ```python + @ai_router.register( + name="your_function_name", + description="功能描述(AI会根据这个判断用户意图)", + examples=[ + "示例用法1", + "示例用法2", + "示例用法3" + ], + params_description="参数说明" + ) + def ai_handle_your_function(ctx: MessageContext, params: str) -> bool: + # params 是AI从用户输入中提取的参数 + # 调用你的功能逻辑 + # 使用 ctx.send_text() 发送回复 + return True + ``` + +3. **工作原理**: + * 用户发送消息时,如果正则路由未匹配,AI会分析用户意图 + * AI根据功能描述和示例,判断应该调用哪个功能 + * AI会自动提取参数并传递给功能处理函数 + +例如,注册了天气查询功能后,用户可以说: +- "北京天气怎么样" +- "查一下上海的天气" +- "明天深圳会下雨吗" + +AI都能理解并调用天气查询功能。 + 完成以上步骤后,重启机器人即可测试你的新功能! ## 📄 许可证 diff --git a/commands/ai_functions.py b/commands/ai_functions.py new file mode 100644 index 0000000..e4da47a --- /dev/null +++ b/commands/ai_functions.py @@ -0,0 +1,260 @@ +""" +AI路由功能注册 +将需要通过AI路由的功能在这里注册 +""" +import re +import json +import os +from typing import Optional, Match +from datetime import datetime + +from .ai_router import ai_router +from .context import MessageContext + +# ======== 天气功能 ======== +@ai_router.register( + name="weather_query", + description="查询指定城市的天气情况和天气预报", + examples=[ + "北京天气怎么样", + "查一下上海的天气", + "明天深圳会下雨吗", + "杭州天气预报", + "广州未来几天的天气" + ], + params_description="城市名称" +) +def ai_handle_weather(ctx: MessageContext, params: str) -> bool: + """AI路由的天气查询处理""" + city_name = params.strip() + if not city_name: + ctx.send_text("🤔 请告诉我你想查询哪个城市的天气") + return True + + # 加载城市代码 + city_codes = {} + city_code_path = os.path.join(os.path.dirname(__file__), '..', 'function', 'main_city.json') + try: + with open(city_code_path, 'r', encoding='utf-8') as f: + city_codes = json.load(f) + except Exception as e: + if ctx.logger: + ctx.logger.error(f"加载城市代码文件失败: {e}") + ctx.send_text("⚠️ 抱歉,天气功能暂时不可用") + return True + + # 查找城市代码 + city_code = city_codes.get(city_name) + if not city_code: + # 尝试模糊匹配 + for name, code in city_codes.items(): + if city_name in name: + city_code = code + city_name = name + break + + if not city_code: + ctx.send_text(f"😕 找不到城市 '{city_name}' 的天气信息") + return True + + # 获取天气信息 + try: + from function.func_weather import Weather + weather_info = Weather(city_code).get_weather(include_forecast=True) + ctx.send_text(weather_info) + return True + except Exception as e: + if ctx.logger: + ctx.logger.error(f"获取天气信息失败: {e}") + ctx.send_text(f"😥 获取 {city_name} 天气时遇到问题") + return True + +# ======== 新闻功能 ======== +@ai_router.register( + name="news_query", + description="获取当日新闻资讯(很长的流水账,如果用户要精简的新闻则不用)", + examples=[ + "看看今天的新闻", + "有什么新闻吗", + "最近发生了什么事", + "今日要闻", + "给我看看新闻" + ], + params_description="无需参数" +) +def ai_handle_news(ctx: MessageContext, params: str) -> bool: + """AI路由的新闻查询处理""" + try: + from function.func_news import News + news_instance = News() + is_today, news_content = news_instance.get_important_news() + + if is_today: + ctx.send_text(f"📰 今日要闻来啦:\n{news_content}") + else: + if news_content: + ctx.send_text(f"ℹ️ 今日新闻暂未发布,为您找到最近的一条新闻:\n{news_content}") + else: + ctx.send_text("❌ 获取新闻失败,请稍后重试") + + return True + except Exception as e: + if ctx.logger: + ctx.logger.error(f"获取新闻失败: {e}") + ctx.send_text("❌ 获取新闻时发生错误") + return True + +# ======== 提醒功能 ======== +@ai_router.register( + name="reminder_set", + description="设置提醒,支持一次性提醒、每日提醒、每周提醒", + examples=[ + "提醒我明天下午3点开会", + "每天早上8点提醒我吃早餐", + "每周一提醒我周会", + "下午5点提醒我下班", + "设置一个提醒:周五下午检查周报" + ], + params_description="提醒的时间和内容描述" +) +def ai_handle_reminder_set(ctx: MessageContext, params: str) -> bool: + """AI路由的提醒设置处理""" + if not params.strip(): + at_list = ctx.msg.sender if ctx.is_group else "" + ctx.send_text("请告诉我需要提醒什么内容和时间呀~", at_list) + return True + + # 调用原有的提醒处理逻辑 + from .handlers import handle_reminder + # 构造一个假的match对象,因为AI路由不使用正则匹配 + class FakeMatch: + def group(self, n): + return params + + # 临时修改消息内容以适配原有处理器 + original_content = ctx.msg.content + ctx.msg.content = f"提醒我{params}" + + result = handle_reminder(ctx, FakeMatch()) + + # 恢复原始内容 + ctx.msg.content = original_content + + return result + +@ai_router.register( + name="reminder_list", + description="查看已设置的所有提醒", + examples=[ + "查看我的提醒", + "我有哪些提醒", + "显示提醒列表", + "我设置了什么提醒", + "看看我的提醒" + ], + params_description="无需参数" +) +def ai_handle_reminder_list(ctx: MessageContext, params: str) -> bool: + """AI路由的提醒列表查看处理""" + from .handlers import handle_list_reminders + return handle_list_reminders(ctx, None) + +@ai_router.register( + name="reminder_delete", + description="删除已设置的提醒", + examples=[ + "删除开会的提醒", + "取消明天的提醒", + "把早餐提醒删了", + "删除所有提醒", + "取消周会提醒" + ], + params_description="要删除的提醒描述或ID" +) +def ai_handle_reminder_delete(ctx: MessageContext, params: str) -> bool: + """AI路由的提醒删除处理""" + # 调用原有的删除提醒逻辑 + from .handlers import handle_delete_reminder + + # 临时修改消息内容 + original_content = ctx.msg.content + ctx.msg.content = f"删除提醒 {params}" + + # 构造假的match对象 + class FakeMatch: + def group(self, n): + return params + + result = handle_delete_reminder(ctx, FakeMatch()) + + # 恢复原始内容 + ctx.msg.content = original_content + + return result + +# ======== Perplexity搜索功能 ======== +@ai_router.register( + name="perplexity_search", + description="使用Perplexity AI进行深度搜索和问答", + examples=[ + "搜索一下Python最新版本的特性", + "帮我查查如何学习机器学习", + "查找关于量子计算的最新进展", + "搜索健康饮食的建议", + "了解一下区块链技术" + ], + params_description="搜索查询内容" +) +def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool: + """AI路由的Perplexity搜索处理""" + if not params.strip(): + at_list = ctx.msg.sender if ctx.is_group else "" + ctx.send_text("请告诉我你想搜索什么内容", at_list) + return True + + # 获取Perplexity实例 + perplexity_instance = getattr(ctx.robot, 'perplexity', None) + if not perplexity_instance: + ctx.send_text("❌ Perplexity搜索功能当前不可用") + return True + + # 调用Perplexity处理 + content_for_perplexity = f"ask {params}" + chat_id = ctx.get_receiver() + sender_wxid = ctx.msg.sender + room_id = ctx.msg.roomid if ctx.is_group else None + is_group = ctx.is_group + + was_handled, fallback_prompt = perplexity_instance.process_message( + content=content_for_perplexity, + chat_id=chat_id, + sender=sender_wxid, + roomid=room_id, + from_group=is_group, + send_text_func=ctx.send_text + ) + + # 如果Perplexity无法处理,使用默认AI + if not was_handled and fallback_prompt: + chat_model = getattr(ctx, 'chat', None) or (getattr(ctx.robot, 'chat', None) if ctx.robot else None) + if chat_model: + try: + import time + current_time = time.strftime("%H:%M", time.localtime()) + q_with_info = f"[{current_time}] {ctx.sender_name}: {params}" + + rsp = chat_model.get_answer( + question=q_with_info, + wxid=ctx.get_receiver(), + system_prompt_override=fallback_prompt + ) + + if rsp: + at_list = ctx.msg.sender if ctx.is_group else "" + ctx.send_text(rsp, at_list) + return True + except Exception as e: + if ctx.logger: + ctx.logger.error(f"默认AI处理失败: {e}") + + return was_handled \ No newline at end of file diff --git a/commands/ai_router.py b/commands/ai_router.py new file mode 100644 index 0000000..c281d0c --- /dev/null +++ b/commands/ai_router.py @@ -0,0 +1,191 @@ +import re +import json +import logging +from typing import Dict, Callable, Optional, Any, Tuple +from dataclasses import dataclass, field +from .context import MessageContext + +logger = logging.getLogger(__name__) + +@dataclass +class AIFunction: + """AI可调用的功能定义""" + name: str # 功能唯一标识名 + handler: Callable # 处理函数 + description: str # 功能描述(给AI看的) + examples: list[str] = field(default_factory=list) # 示例用法 + params_description: str = "" # 参数说明 + +class AIRouter: + """AI智能路由器""" + + def __init__(self): + self.functions: Dict[str, AIFunction] = {} + self.logger = logger + + def register(self, name: str, description: str, examples: list[str] = None, params_description: str = ""): + """ + 装饰器:注册一个功能到AI路由器 + + @ai_router.register( + name="weather_query", + description="查询指定城市的天气预报", + examples=["北京天气怎么样", "查一下上海的天气", "明天深圳会下雨吗"], + params_description="城市名称" + ) + def handle_weather(ctx: MessageContext, params: str) -> bool: + # 实现天气查询逻辑 + pass + """ + def decorator(func: Callable) -> Callable: + ai_func = AIFunction( + name=name, + handler=func, + description=description, + examples=examples or [], + params_description=params_description + ) + self.functions[name] = ai_func + self.logger.info(f"AI路由器注册功能: {name} - {description}") + return func + + return decorator + + def _build_ai_prompt(self) -> str: + """构建给AI的系统提示词,包含所有可用功能的信息""" + prompt = """你是一个智能路由助手。根据用户的输入,判断用户的意图并返回JSON格式的响应。 + +可用的功能列表: +""" + for name, func in self.functions.items(): + prompt += f"\n{name}: {func.description}" + if func.params_description: + prompt += f"\n 参数: {func.params_description}" + if func.examples: + prompt += f"\n 示例: {', '.join(func.examples[:3])}" + prompt += "\n" + + prompt += """ +请分析用户输入,返回以下JSON格式之一: + +1. 如果用户需要执行某个功能: +{ + "action_type": "function", + "function_name": "功能名称", + "params": "提取并整理的参数" +} + +2. 如果用户只是想聊天对话: +{ + "action_type": "chat" +} + +重要提示: +- 只返回JSON,不要有其他文字 +- function_name必须是上述列表中的功能名之一 +- params是你从用户输入中提取和整理的参数字符串 +- 如果无法确定用户意图,默认返回chat +""" + return prompt + + def route(self, ctx: MessageContext) -> Tuple[bool, Optional[Dict[str, Any]]]: + """ + AI路由决策 + + 返回: (是否处理成功, AI决策结果) + """ + if not ctx.text: + return False, None + + # 获取AI模型 + chat_model = getattr(ctx, 'chat', None) + if not chat_model: + chat_model = getattr(ctx.robot, 'chat', None) if ctx.robot else None + + if not chat_model: + self.logger.error("AI路由器:无可用的AI模型") + return False, None + + try: + # 构建系统提示词 + system_prompt = self._build_ai_prompt() + + # 让AI分析用户意图 + user_input = f"用户输入:{ctx.text}" + ai_response = chat_model.get_answer( + user_input, + wxid=ctx.get_receiver(), + system_prompt_override=system_prompt + ) + + # 解析AI返回的JSON + json_match = re.search(r'\{.*\}', ai_response, re.DOTALL) + if not json_match: + self.logger.warning(f"AI路由器:无法从AI响应中提取JSON - {ai_response}") + return False, None + + decision = json.loads(json_match.group(0)) + + # 验证决策格式 + action_type = decision.get("action_type") + if action_type not in ["chat", "function"]: + self.logger.warning(f"AI路由器:未知的action_type - {action_type}") + return False, None + + # 如果是功能调用,验证功能名 + if action_type == "function": + function_name = decision.get("function_name") + if function_name not in self.functions: + self.logger.warning(f"AI路由器:未知的功能名 - {function_name}") + return False, None + + self.logger.info(f"AI路由决策: {decision}") + return True, decision + + except json.JSONDecodeError as e: + self.logger.error(f"AI路由器:解析JSON失败 - {e}") + return False, None + except Exception as e: + self.logger.error(f"AI路由器:处理异常 - {e}") + return False, None + + def dispatch(self, ctx: MessageContext) -> bool: + """ + 执行AI路由分发 + + 返回: 是否成功处理 + """ + # 获取AI路由决策 + success, decision = self.route(ctx) + if not success or not decision: + return False + + action_type = decision.get("action_type") + + # 如果是聊天,返回False让后续处理器处理 + if action_type == "chat": + self.logger.info("AI路由器:识别为聊天意图,交给聊天处理器") + return False + + # 如果是功能调用 + if action_type == "function": + function_name = decision.get("function_name") + params = decision.get("params", "") + + func = self.functions.get(function_name) + if not func: + self.logger.error(f"AI路由器:功能 {function_name} 未找到") + return False + + try: + self.logger.info(f"AI路由器:调用功能 {function_name},参数: {params}") + result = func.handler(ctx, params) + return result + except Exception as e: + self.logger.error(f"AI路由器:执行功能 {function_name} 出错 - {e}") + return False + + return False + +# 创建全局AI路由器实例 +ai_router = AIRouter() \ No newline at end of file diff --git a/robot.py b/robot.py index 8d91015..53e516a 100644 --- a/robot.py +++ b/robot.py @@ -34,6 +34,10 @@ from commands.router import CommandRouter from commands.registry import COMMANDS, get_commands_info from commands.handlers import handle_chitchat # 导入闲聊处理函数 +# 导入AI路由系统 +from commands.ai_router import ai_router +import commands.ai_functions # 导入以注册所有AI功能 + __version__ = "39.2.4.0" @@ -165,6 +169,9 @@ class Robot(Job): self.command_router = CommandRouter(COMMANDS, robot_instance=self) self.LOG.info(f"命令路由系统初始化完成,共加载 {len(COMMANDS)} 条命令") + # 初始化AI路由器 + self.LOG.info(f"AI路由系统初始化完成,共加载 {len(ai_router.functions)} 个AI功能") + # 初始化提醒管理器 try: # 使用与MessageSummary相同的数据库路径 @@ -208,16 +215,25 @@ class Robot(Job): # 5. 使用命令路由器分发处理消息 handled = self.command_router.dispatch(ctx) - # 6. 如果没有命令处理器处理,则进行特殊逻辑处理 + # 6. 如果正则路由器没有处理,尝试AI路由器 if not handled: - # 6.1 好友请求自动处理 + # 只在被@或私聊时才使用AI路由 + if (msg.from_group() and msg.is_at(self.wxid)) or not msg.from_group(): + ai_handled = ai_router.dispatch(ctx) + if ai_handled: + self.LOG.info("消息已由AI路由器处理") + return + + # 7. 如果没有命令处理器处理,则进行特殊逻辑处理 + if not handled: + # 7.1 好友请求自动处理 if msg.type == 37: # 好友请求 self.autoAcceptFriendRequest(msg) return - # 6.2 系统消息处理 + # 7.2 系统消息处理 elif msg.type == 10000: - # 6.2.1 处理新成员入群 + # 7.2.1 处理新成员入群 if "加入了群聊" in msg.content and msg.from_group(): new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content) if new_member_match: @@ -228,12 +244,12 @@ class Robot(Job): self.sendTextMsg(welcome_msg, msg.roomid) self.LOG.info(f"已发送欢迎消息给新成员 {new_member} 在群 {msg.roomid}") return - # 6.2.2 处理新好友添加 + # 7.2.2 处理新好友添加 elif "你已添加了" in msg.content: self.sayHiToNewFriend(msg) return - # 6.3 群聊消息,且配置了响应该群 + # 7.3 群聊消息,且配置了响应该群 if msg.from_group() and msg.roomid in self.config.GROUPS: # 如果在群里被@了,但命令路由器没有处理,则进行闲聊 if msg.is_at(self.wxid): @@ -242,7 +258,7 @@ class Robot(Job): else: pass - # 6.4 私聊消息,未被命令处理,进行闲聊 + # 7.4 私聊消息,未被命令处理,进行闲聊 elif not msg.from_group() and not msg.from_self(): # 检查是否是文本消息(type 1)或者是包含用户输入的类型49消息 if msg.type == 1 or (msg.type == 49 and ctx.text):