# -*- coding: utf-8 -*- import logging import re import time import xml.etree.ElementTree as ET from queue import Empty from threading import Thread import os import random import shutil from image.img_manager import ImageGenerationManager from wcferry import Wcf, WxMsg from ai_providers.ai_chatgpt import ChatGPT from ai_providers.ai_deepseek import DeepSeek from ai_providers.ai_perplexity import Perplexity from function.func_weather import Weather from function.func_news import News from function.func_summary import MessageSummary # 导入新的MessageSummary类 from function.func_reminder import ReminderManager # 导入ReminderManager类 from configuration import Config from constants import ChatType from job_mgmt import Job from function.func_xml_process import XmlProcessor # 导入命令上下文 from commands.context import MessageContext # 导入新的Function Call系统 from function_calls.router import FunctionCallRouter import function_calls.init_handlers # 导入以注册所有Function Call处理器 from function_calls.services import run_chat_fallback __version__ = "39.2.4.0" class Robot(Job): """个性化自己的机器人 """ def __init__(self, config: Config, wcf: Wcf, chat_type: int) -> None: super().__init__() self.wcf = wcf self.config = config self.LOG = logging.getLogger("Robot") self.wxid = self.wcf.get_self_wxid() # 获取机器人自己的wxid self.allContacts = self.getAllContacts() self._msg_timestamps = [] try: db_path = "data/message_history.db" # 使用 getattr 安全地获取 MAX_HISTORY,如果不存在则默认为 300 max_hist = getattr(config, 'MAX_HISTORY', 300) self.message_summary = MessageSummary(max_history=max_hist, db_path=db_path) self.LOG.info(f"消息历史记录器已初始化 (max_history={self.message_summary.max_history})") except Exception as e: self.LOG.error(f"初始化 MessageSummary 失败: {e}", exc_info=True) self.message_summary = None # 保持失败时的处理 self.xml_processor = XmlProcessor(self.LOG) self.chat_models = {} self.LOG.info("开始初始化各种AI模型...") # 初始化ChatGPT if ChatGPT.value_check(self.config.CHATGPT): try: # 传入 message_summary 和 wxid self.chat_models[ChatType.CHATGPT.value] = ChatGPT( self.config.CHATGPT, message_summary_instance=self.message_summary, bot_wxid=self.wxid ) self.LOG.info(f"已加载 ChatGPT 模型") except Exception as e: self.LOG.error(f"初始化 ChatGPT 模型时出错: {str(e)}") # 初始化DeepSeek if DeepSeek.value_check(self.config.DEEPSEEK): try: # 传入 message_summary 和 wxid self.chat_models[ChatType.DEEPSEEK.value] = DeepSeek( self.config.DEEPSEEK, message_summary_instance=self.message_summary, bot_wxid=self.wxid ) self.LOG.info(f"已加载 DeepSeek 模型") except Exception as e: self.LOG.error(f"初始化 DeepSeek 模型时出错: {str(e)}") # 初始化Perplexity if Perplexity.value_check(self.config.PERPLEXITY): self.chat_models[ChatType.PERPLEXITY.value] = Perplexity(self.config.PERPLEXITY) self.perplexity = self.chat_models[ChatType.PERPLEXITY.value] # 单独保存一个引用用于特殊处理 self.LOG.info(f"已加载 Perplexity 模型") # 根据chat_type参数选择默认模型 if chat_type > 0 and chat_type in self.chat_models: self.chat = self.chat_models[chat_type] self.default_model_id = chat_type else: # 如果没有指定chat_type或指定的模型不可用,尝试使用配置文件中指定的默认模型 self.default_model_id = self.config.GROUP_MODELS.get('default', 0) if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] elif self.chat_models: # 如果有任何可用模型,使用第一个 self.default_model_id = list(self.chat_models.keys())[0] self.chat = self.chat_models[self.default_model_id] else: self.LOG.warning("未配置任何可用的模型") self.chat = None self.default_model_id = 0 self.LOG.info(f"默认模型: {self.chat},模型ID: {self.default_model_id}") # 显示群组-模型映射信息 if hasattr(self.config, 'GROUP_MODELS'): # 显示群聊映射信息 if self.config.GROUP_MODELS.get('mapping'): self.LOG.info("群聊-模型映射配置:") for mapping in self.config.GROUP_MODELS.get('mapping', []): room_id = mapping.get('room_id', '') model_id = mapping.get('model', 0) if room_id and model_id in self.chat_models: model_name = self.chat_models[model_id].__class__.__name__ self.LOG.info(f" 群聊 {room_id} -> 模型 {model_name}(ID:{model_id})") elif room_id: self.LOG.warning(f" 群聊 {room_id} 配置的模型ID {model_id} 不可用") # 显示私聊映射信息 if self.config.GROUP_MODELS.get('private_mapping'): self.LOG.info("私聊-模型映射配置:") for mapping in self.config.GROUP_MODELS.get('private_mapping', []): wxid = mapping.get('wxid', '') model_id = mapping.get('model', 0) if wxid and model_id in self.chat_models: model_name = self.chat_models[model_id].__class__.__name__ contact_name = self.allContacts.get(wxid, wxid) self.LOG.info(f" 私聊用户 {contact_name}({wxid}) -> 模型 {model_name}(ID:{model_id})") elif wxid: self.LOG.warning(f" 私聊用户 {wxid} 配置的模型ID {model_id} 不可用") # 初始化图像生成管理器 self.image_manager = ImageGenerationManager(self.config, self.wcf, self.LOG, self.sendTextMsg) # 初始化Function Call路由器 self.function_call_router = FunctionCallRouter(robot_instance=self) from function_calls.registry import list_functions functions = list_functions() self.LOG.info(f"Function Call路由系统初始化完成,共注册 {len(functions)} 个函数") # 初始化提醒管理器 try: # 使用与MessageSummary相同的数据库路径 db_path = getattr(self.message_summary, 'db_path', "data/message_history.db") self.reminder_manager = ReminderManager(self, db_path) self.LOG.info("提醒管理器已初始化,与消息历史使用相同数据库。") except Exception as e: self.LOG.error(f"初始化提醒管理器失败: {e}", exc_info=True) # 输出命令列表信息,便于调试 # self.LOG.debug(get_commands_info()) # 如果需要在日志中输出所有命令信息,取消本行注释 @staticmethod def value_check(args: dict) -> bool: if args: return all(value is not None for key, value in args.items() if key != 'proxy') return False def processMsg(self, msg: WxMsg) -> None: """ 处理收到的微信消息 :param msg: 微信消息对象 """ try: # 1. 使用MessageSummary记录消息(保持不变) self.message_summary.process_message_from_wxmsg(msg, self.wcf, self.allContacts, self.wxid) # 2. 根据消息来源选择使用的AI模型 self._select_model_for_message(msg) # 3. 获取本次对话特定的历史消息限制 specific_limit = self._get_specific_history_limit(msg) self.LOG.debug(f"本次对话 ({msg.sender} in {msg.roomid or msg.sender}) 使用历史限制: {specific_limit}") # 4. 系统消息优先处理 if msg.type == 37: # 好友请求 self.autoAcceptFriendRequest(msg) return if msg.type == 10000: if "加入了群聊" in msg.content and msg.from_group(): new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content) if new_member_match: inviter = new_member_match.group(1) new_member = new_member_match.group(2) welcome_msg = self.config.WELCOME_MSG.format(new_member=new_member, inviter=inviter) self.sendTextMsg(welcome_msg, msg.roomid) self.LOG.info(f"已发送欢迎消息给新成员 {new_member} 在群 {msg.roomid}") return if "你已添加了" in msg.content: self.sayHiToNewFriend(msg) return # 5. 预处理消息,生成MessageContext ctx = self.preprocess(msg) # 确保context能访问到当前选定的chat模型及特定历史限制 setattr(ctx, 'chat', self.chat) setattr(ctx, 'specific_max_history', specific_limit) # 群聊白名单与@控制 if ctx.is_group: allowed_groups = getattr(self.config, 'GROUPS', []) or [] if ctx.msg.roomid not in allowed_groups: self.LOG.debug(f"忽略未在白名单中的群聊消息: {ctx.msg.roomid}") return if not ctx.is_at_bot: self.LOG.debug(f"群聊 {ctx.msg.roomid} 未@机器人,忽略消息") return # 6. 根据配置选择路由系统 handled = False function_call_config = getattr(self.config, 'FUNCTION_CALL_ROUTER', {}) use_function_call = function_call_config.get('enable', True) debug_function_call = function_call_config.get('debug', False) if use_function_call: try: if debug_function_call: self.LOG.debug(f"[Function Call] 开始处理消息: {msg.content}") handled = self.function_call_router.dispatch(ctx) if debug_function_call: self.LOG.debug(f"[Function Call] 处理结果: {handled}") except Exception as e: self.LOG.error(f"Function Call路由器处理异常: {e}") if handled: self.LOG.info("消息已由Function Call路由器处理") return # 7. Function Call 未处理,则执行闲聊兜底或特殊逻辑 if not (ctx.is_group and not ctx.is_at_bot): if not run_chat_fallback(ctx): self.LOG.warning("闲聊兜底失败或未发送回复") except Exception as e: self.LOG.error(f"处理消息时发生错误: {str(e)}", exc_info=True) def enableRecvMsg(self) -> None: self.wcf.enable_recv_msg(self.onMsg) def enableReceivingMsg(self) -> None: def innerProcessMsg(wcf: Wcf): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() self.LOG.info(msg) self.processMsg(msg) except Empty: continue # Empty message except Exception as e: self.LOG.error(f"Receiving message error: {e}") self.wcf.enable_receiving_msg() Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start() def sendTextMsg(self, msg: str, receiver: str, at_list: str = "") -> None: """ 发送消息并记录 :param msg: 消息字符串 :param receiver: 接收人wxid或者群id :param at_list: 要@的wxid, @所有人的wxid为:notify@all """ # 延迟和频率限制 (逻辑不变) time.sleep(float(str(time.time()).split('.')[-1][-2:]) / 100.0 + 0.3) now = time.time() if self.config.SEND_RATE_LIMIT > 0: self._msg_timestamps = [t for t in self._msg_timestamps if now - t < 60] if len(self._msg_timestamps) >= self.config.SEND_RATE_LIMIT: self.LOG.warning(f"发送消息过快,已达到每分钟{self.config.SEND_RATE_LIMIT}条上限。") return self._msg_timestamps.append(now) ats = "" message_to_send = msg # 保存原始消息用于记录 if at_list: if at_list == "notify@all": ats = " @所有人" else: wxids = at_list.split(",") for wxid_at in wxids: # Renamed variable ats += f" @{self.wcf.get_alias_in_chatroom(wxid_at, receiver)}" try: # 发送消息 (逻辑不变) if ats == "": self.LOG.info(f"To {receiver}: {msg}") self.wcf.send_text(f"{msg}", receiver, at_list) else: full_msg_content = f"{ats}\n\n{msg}" self.LOG.info(f"To {receiver}:\n{ats}\n{msg}") self.wcf.send_text(full_msg_content, receiver, at_list) if self.message_summary: # 检查 message_summary 是否初始化成功 # 确定机器人的名字 robot_name = self.allContacts.get(self.wxid, "机器人") # 使用 self.wxid 作为 sender_wxid # 注意:这里不生成时间戳,让 record_message 内部生成 self.message_summary.record_message( chat_id=receiver, sender_name=robot_name, sender_wxid=self.wxid, # 传入机器人自己的 wxid content=message_to_send ) self.LOG.debug(f"已记录机器人发送的消息到 {receiver}") else: self.LOG.warning("MessageSummary 未初始化,无法记录发送的消息") except Exception as e: self.LOG.error(f"发送消息失败: {e}") def getAllContacts(self) -> dict: """ 获取联系人(包括好友、公众号、服务号、群成员……) 格式: {"wxid": "NickName"} """ contacts = self.wcf.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;") return {contact["UserName"]: contact["NickName"] for contact in contacts} def keepRunningAndBlockProcess(self) -> None: """ 保持机器人运行,不让进程退出 """ while True: self.runPendingJobs() time.sleep(1) def autoAcceptFriendRequest(self, msg: WxMsg) -> None: try: xml = ET.fromstring(msg.content) v3 = xml.attrib["encryptusername"] v4 = xml.attrib["ticket"] scene = int(xml.attrib["scene"]) self.wcf.accept_new_friend(v3, v4, scene) except Exception as e: self.LOG.error(f"同意好友出错:{e}") def sayHiToNewFriend(self, msg: WxMsg) -> None: nickName = re.findall(r"你已添加了(.*),现在可以开始聊天了。", msg.content) if nickName: # 添加了好友,更新好友列表 self.allContacts[msg.sender] = nickName[0] self.sendTextMsg(f"Hi {nickName[0]},我是泡泡,我自动通过了你的好友请求。", msg.sender) def newsReport(self) -> None: receivers = self.config.NEWS if not receivers: self.LOG.info("未配置定时新闻接收人,跳过。") return self.LOG.info("开始执行定时新闻推送任务...") # 获取新闻,解包返回的元组 is_today, news_content = News().get_important_news() # 必须是当天的新闻 (is_today=True) 并且有有效内容 (news_content非空) 才发送 if is_today and news_content: self.LOG.info(f"成功获取当天新闻,准备推送给 {len(receivers)} 个接收人...") for r in receivers: self.sendTextMsg(news_content, r) self.LOG.info("定时新闻推送完成。") else: # 记录没有发送的原因 if not is_today and news_content: self.LOG.warning("获取到的是旧闻,定时推送已跳过。") elif not news_content: self.LOG.warning("获取新闻内容失败或为空,定时推送已跳过。") else: # 理论上不会执行到这里 self.LOG.warning("获取新闻失败(未知原因),定时推送已跳过。") def weatherReport(self, receivers: list = None) -> None: if receivers is None: receivers = self.config.WEATHER if not receivers or not self.config.CITY_CODE: self.LOG.warning("未配置天气城市代码或接收人") return report = Weather(self.config.CITY_CODE).get_weather() for r in receivers: self.sendTextMsg(report, r) def cleanup_perplexity_threads(self): """清理所有Perplexity线程""" # 如果已初始化Perplexity实例,调用其清理方法 perplexity_instance = self.get_perplexity_instance() if perplexity_instance: perplexity_instance.cleanup() def cleanup(self): """清理所有资源,在程序退出前调用""" self.LOG.info("开始清理机器人资源...") # 清理Perplexity线程 self.cleanup_perplexity_threads() # 关闭消息历史数据库连接 if hasattr(self, 'message_summary') and self.message_summary: self.LOG.info("正在关闭消息历史数据库...") self.message_summary.close_db() self.LOG.info("机器人资源清理完成") def get_perplexity_instance(self): """获取Perplexity实例 Returns: Perplexity: Perplexity实例,如果未配置则返回None """ # 检查是否已有Perplexity实例 if hasattr(self, 'perplexity'): return self.perplexity # 检查config中是否有Perplexity配置 if hasattr(self.config, 'PERPLEXITY') and Perplexity.value_check(self.config.PERPLEXITY): self.perplexity = Perplexity(self.config.PERPLEXITY) return self.perplexity # 检查chat是否是Perplexity类型 if isinstance(self.chat, Perplexity): return self.chat # 如果存在chat_models字典,尝试从中获取 if hasattr(self, 'chat_models') and ChatType.PERPLEXITY.value in self.chat_models: return self.chat_models[ChatType.PERPLEXITY.value] return None def _select_model_for_message(self, msg: WxMsg) -> None: """根据消息来源选择对应的AI模型 :param msg: 接收到的消息 """ if not hasattr(self, 'chat_models') or not self.chat_models: return # 没有可用模型,无需切换 # 获取消息来源ID source_id = msg.roomid if msg.from_group() else msg.sender # 检查配置 if not hasattr(self.config, 'GROUP_MODELS'): # 没有配置,使用默认模型 if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] return # 群聊消息处理 if msg.from_group(): model_mappings = self.config.GROUP_MODELS.get('mapping', []) for mapping in model_mappings: if mapping.get('room_id') == source_id: model_id = mapping.get('model') if model_id in self.chat_models: # 切换到指定模型 if self.chat != self.chat_models[model_id]: self.chat = self.chat_models[model_id] self.LOG.info(f"已为群 {source_id} 切换到模型: {self.chat.__class__.__name__}") else: self.LOG.warning(f"群 {source_id} 配置的模型ID {model_id} 不可用,使用默认模型") if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] return # 私聊消息处理 else: private_mappings = self.config.GROUP_MODELS.get('private_mapping', []) for mapping in private_mappings: if mapping.get('wxid') == source_id: model_id = mapping.get('model') if model_id in self.chat_models: # 切换到指定模型 if self.chat != self.chat_models[model_id]: self.chat = self.chat_models[model_id] self.LOG.info(f"已为私聊用户 {source_id} 切换到模型: {self.chat.__class__.__name__}") else: self.LOG.warning(f"私聊用户 {source_id} 配置的模型ID {model_id} 不可用,使用默认模型") if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] return # 如果没有找到对应配置,使用默认模型 if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] def _get_specific_history_limit(self, msg: WxMsg) -> int: """根据消息来源和配置,获取特定的历史消息数量限制 :param msg: 微信消息对象 :return: 历史消息数量限制,如果没有特定配置则返回None """ if not hasattr(self.config, 'GROUP_MODELS'): # 没有配置,使用当前模型默认值 return getattr(self.chat, 'max_history_messages', None) # 获取消息来源ID source_id = msg.roomid if msg.from_group() else msg.sender # 确定查找的映射和字段名 if msg.from_group(): mappings = self.config.GROUP_MODELS.get('mapping', []) key_field = 'room_id' else: mappings = self.config.GROUP_MODELS.get('private_mapping', []) key_field = 'wxid' # 在映射中查找特定配置 for mapping in mappings: if mapping.get(key_field) == source_id: # 找到了对应的配置 if 'max_history' in mapping: specific_limit = mapping['max_history'] self.LOG.debug(f"为 {source_id} 找到特定历史限制: {specific_limit}") return specific_limit else: # 找到了配置但没有max_history,使用模型默认值 self.LOG.debug(f"为 {source_id} 找到映射但无特定历史限制,使用模型默认值") break # 没有找到特定限制,使用当前模型的默认值 default_limit = getattr(self.chat, 'max_history_messages', None) self.LOG.debug(f"未找到 {source_id} 的特定历史限制,使用模型默认值: {default_limit}") return default_limit def onMsg(self, msg: WxMsg) -> int: try: self.LOG.info(msg) self.processMsg(msg) except Exception as e: self.LOG.error(e) return 0 def preprocess(self, msg: WxMsg) -> MessageContext: """ 预处理消息,生成MessageContext对象 :param msg: 微信消息对象 :return: MessageContext对象 """ is_group = msg.from_group() is_at_bot = False pure_text = msg.content # 默认使用原始内容 # 初始化引用图片相关属性 is_quoted_image = False quoted_msg_id = None quoted_image_extra = None # 处理引用消息等特殊情况 if msg.type == 49 and ("