diff --git a/.gitignore b/.gitignore index a8e62ae..7b46dd5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,4 @@ logs/ *.log.* config.yaml -duel_ranks.json data/ diff --git a/ai_providers/ai_chatgpt.py b/ai_providers/ai_chatgpt.py index 7184c3a..dd31daa 100644 --- a/ai_providers/ai_chatgpt.py +++ b/ai_providers/ai_chatgpt.py @@ -56,57 +56,70 @@ class ChatGPT(): return True return False - def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None) -> str: - # 获取并格式化数据库历史记录 - api_messages = [] - - # 1. 添加系统提示 - effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"] - if effective_system_prompt: # 确保有内容才添加 - api_messages.append({"role": "system", "content": effective_system_prompt}) - - # 添加当前时间提示(可选,但原代码有) - now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - time_mk = "Current time is: " # 或者其他合适的提示 - api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"}) - - - # 2. 获取并格式化历史消息 - if self.message_summary and self.bot_wxid: - history = self.message_summary.get_messages(wxid) - - # -限制历史消息数量 - # 优先使用传入的特定限制,如果没有则使用模型默认限制 - limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages - self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}") - - if limit_to_use is not None and limit_to_use > 0: - history = history[-limit_to_use:] # 取最新的 N 条 - elif limit_to_use == 0: # 如果设置为0,则不包含历史 - history = [] - - self.LOG.debug(f"应用限制后历史条数: {len(history)}") - - for msg in history: - role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user" - content = msg.get('content', '') - if content: # 避免添加空内容 - if role == "user": - sender_name = msg.get('sender', '未知用户') # 获取发送者名字,如果不存在则使用默认值 - formatted_content = f"{sender_name}: {content}" # 格式化内容,加入发送者名字 - api_messages.append({"role": role, "content": formatted_content}) - else: # 如果是助手(机器人自己)的消息,则不加名字 - api_messages.append({"role": role, "content": content}) + def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None, tools=None, conversation_history=None): + # 标准Function Call模式:使用传入的对话历史 + if conversation_history: + api_messages = [] + # 添加系统提示 + effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"] + if effective_system_prompt: + api_messages.append({"role": "system", "content": effective_system_prompt}) + # 添加当前时间提示 + now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + api_messages.append({"role": "system", "content": f"Current time is: {now_time}"}) + # 使用传入的对话历史 + api_messages.extend(conversation_history) else: - self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。") + # 传统模式:从数据库获取历史记录 + api_messages = [] - # 3. 添加当前用户问题 - if question: # 确保问题非空 - api_messages.append({"role": "user", "content": question}) + # 1. 添加系统提示 + effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"] + if effective_system_prompt: # 确保有内容才添加 + api_messages.append({"role": "system", "content": effective_system_prompt}) + + # 添加当前时间提示(可选,但原代码有) + now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + time_mk = "Current time is: " # 或者其他合适的提示 + api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"}) + + + # 2. 获取并格式化历史消息 + if self.message_summary and self.bot_wxid: + history = self.message_summary.get_messages(wxid) + + # -限制历史消息数量 + # 优先使用传入的特定限制,如果没有则使用模型默认限制 + limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages + self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}") + + if limit_to_use is not None and limit_to_use > 0: + history = history[-limit_to_use:] # 取最新的 N 条 + elif limit_to_use == 0: # 如果设置为0,则不包含历史 + history = [] + + self.LOG.debug(f"应用限制后历史条数: {len(history)}") + + for msg in history: + role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user" + content = msg.get('content', '') + if content: # 避免添加空内容 + if role == "user": + sender_name = msg.get('sender', '未知用户') # 获取发送者名字,如果不存在则使用默认值 + formatted_content = f"{sender_name}: {content}" # 格式化内容,加入发送者名字 + api_messages.append({"role": role, "content": formatted_content}) + else: # 如果是助手(机器人自己)的消息,则不加名字 + api_messages.append({"role": role, "content": content}) + else: + self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。") + + # 3. 添加当前用户问题 + if question: # 确保问题非空 + api_messages.append({"role": "user", "content": question}) rsp = "" try: - # 使用格式化后的 api_messages + # 使用格式化后的 api_messages params = { "model": self.model, "messages": api_messages # 使用从数据库构建的消息列表 @@ -116,10 +129,32 @@ class ChatGPT(): if not self.model.startswith("o"): params["temperature"] = 0.2 + # 如果提供了tools,添加到参数中 + if tools: + params["tools"] = tools + params["tool_choice"] = "auto" # 让AI自动决定是否调用function + ret = self.client.chat.completions.create(**params) - rsp = ret.choices[0].message.content - rsp = rsp[2:] if rsp.startswith("\n\n") else rsp - rsp = rsp.replace("\n\n", "\n") + + # 检查是否有tool_calls + if tools and ret.choices[0].message.tool_calls: + # 返回tool_calls而不是普通文本 + return { + "tool_calls": [ + { + "function": { + "name": tool_call.function.name, + "arguments": tool_call.function.arguments + } + } + for tool_call in ret.choices[0].message.tool_calls + ] + } + else: + # 普通文本响应 + rsp = ret.choices[0].message.content or "" + rsp = rsp[2:] if rsp.startswith("\n\n") else rsp + rsp = rsp.replace("\n\n", "\n") except AuthenticationError: self.LOG.error("OpenAI API 认证失败,请检查 API 密钥是否正确") diff --git a/ai_providers/ai_deepseek.py b/ai_providers/ai_deepseek.py index 2e11e1c..b45a74e 100644 --- a/ai_providers/ai_deepseek.py +++ b/ai_providers/ai_deepseek.py @@ -52,65 +52,99 @@ class DeepSeek(): return True return False - def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None) -> str: - # 获取并格式化数据库历史记录 - api_messages = [] + def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None, tools=None, conversation_history=None): + # 标准Function Call模式:使用传入的对话历史 + if conversation_history: + api_messages = [] + # 添加系统提示 + effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"] + if effective_system_prompt: + api_messages.append({"role": "system", "content": effective_system_prompt}) + # 添加当前时间提示 + now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + api_messages.append({"role": "system", "content": f"Current time is: {now_time}"}) + # 使用传入的对话历史 + api_messages.extend(conversation_history) + else: + # 传统模式:从数据库获取历史记录 + api_messages = [] - # 1. 添加系统提示 - effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"] - if effective_system_prompt: - api_messages.append({"role": "system", "content": effective_system_prompt}) + # 1. 添加系统提示 + effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"] + if effective_system_prompt: + api_messages.append({"role": "system", "content": effective_system_prompt}) - # 添加当前时间提示 (可选) - now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - time_mk = "Current time is: " - api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"}) + # 添加当前时间提示 (可选) + now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + time_mk = "Current time is: " + api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"}) - # 2. 获取并格式化历史消息 - if self.message_summary and self.bot_wxid: - history = self.message_summary.get_messages(wxid) + # 2. 获取并格式化历史消息 + if self.message_summary and self.bot_wxid: + history = self.message_summary.get_messages(wxid) - # 限制历史消息数量 - # 优先使用传入的特定限制,如果没有则使用模型默认限制 - limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages - self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}") - - if limit_to_use is not None and limit_to_use > 0: - history = history[-limit_to_use:] # 取最新的 N 条 - elif limit_to_use == 0: # 如果设置为0,则不包含历史 - history = [] - - self.LOG.debug(f"应用限制后历史条数: {len(history)}") + # 限制历史消息数量 + # 优先使用传入的特定限制,如果没有则使用模型默认限制 + limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages + self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}") - for msg in history: - role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user" - content = msg.get('content', '') - if content: - if role == "user": - sender_name = msg.get('sender', '未知用户') # 获取发送者名字 + if limit_to_use is not None and limit_to_use > 0: + history = history[-limit_to_use:] # 取最新的 N 条 + elif limit_to_use == 0: # 如果设置为0,则不包含历史 + history = [] + + self.LOG.debug(f"应用限制后历史条数: {len(history)}") + + for msg in history: + role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user" + content = msg.get('content', '') + if content: + if role == "user": + sender_name = msg.get('sender', '未知用户') # 获取发送者名字 formatted_content = f"{sender_name}: {content}" # 格式化内容 api_messages.append({"role": role, "content": formatted_content}) else: # 助手消息 api_messages.append({"role": role, "content": content}) - else: - self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。") + else: + self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。") - # 3. 添加当前用户问题 - if question: - api_messages.append({"role": "user", "content": question}) + # 3. 添加当前用户问题 + if question: + api_messages.append({"role": "user", "content": question}) try: - # 使用格式化后的 api_messages - response = self.client.chat.completions.create( - model=self.model, - messages=api_messages, # 使用构建的消息列表 - stream=False - ) - final_response = response.choices[0].message.content + # 构建API参数 + params = { + "model": self.model, + "messages": api_messages, # 使用构建的消息列表 + "stream": False + } + # 如果提供了tools,添加到参数中 + if tools: + params["tools"] = tools + params["tool_choice"] = "auto" # 让AI自动决定是否调用function - return final_response + response = self.client.chat.completions.create(**params) + + # 检查是否有tool_calls + if tools and response.choices[0].message.tool_calls: + # 返回tool_calls而不是普通文本 + return { + "tool_calls": [ + { + "function": { + "name": tool_call.function.name, + "arguments": tool_call.function.arguments + } + } + for tool_call in response.choices[0].message.tool_calls + ] + } + else: + # 普通文本响应 + return response.choices[0].message.content or "" except (APIConnectionError, APIError, AuthenticationError) as e1: self.LOG.error(f"DeepSeek API 返回了错误:{str(e1)}") diff --git a/ai_providers/ai_gemini.py b/ai_providers/ai_gemini.py index a99a5eb..73d2638 100644 --- a/ai_providers/ai_gemini.py +++ b/ai_providers/ai_gemini.py @@ -206,7 +206,12 @@ class Gemini: return rsp_text.strip() - def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None) -> str: + def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None, tools=None): + # Function Call支持检查 + if tools: + # Gemini暂时不支持function calling,返回提示 + return "当前Gemini模型暂不支持Function Call功能,请使用ChatGPT或DeepSeek模型来使用智能功能。" + if not self._model: return "Gemini 模型未成功初始化,请检查配置和网络。" diff --git a/ai_providers/ai_perplexity.py b/ai_providers/ai_perplexity.py index 39d9d61..584689c 100644 --- a/ai_providers/ai_perplexity.py +++ b/ai_providers/ai_perplexity.py @@ -341,7 +341,7 @@ class Perplexity: return all(value is not None for key, value in args.items() if key != 'proxy') return False - def get_answer(self, prompt, session_id=None): + def get_answer(self, prompt, session_id=None, wxid=None, system_prompt_override=None, specific_max_history=None, tools=None): """获取Perplexity回答 Args: @@ -351,6 +351,11 @@ class Perplexity: Returns: str: Perplexity的回答 """ + # Function Call支持检查 + if tools: + # Perplexity暂时不支持function calling,返回提示 + return "当前Perplexity模型暂不支持Function Call功能,请使用ChatGPT或DeepSeek模型来使用智能功能。" + try: if not self.api_key or not self.client: return "Perplexity API key 未配置或客户端初始化失败" diff --git a/commands/ai_functions.py b/commands/ai_functions.py index 8e2140c..bb13f72 100644 --- a/commands/ai_functions.py +++ b/commands/ai_functions.py @@ -15,12 +15,20 @@ from .context import MessageContext @ai_router.register( name="weather_query", description="查询城市未来五天的简要天气预报", - examples=["北京天气怎么样", "上海天气"], - params_description="城市名称" + parameters={ + "type": "object", + "properties": { + "city": { + "type": "string", + "description": "要查询天气的城市名称,如:北京、上海、深圳" + } + }, + "required": ["city"] + } ) -def ai_handle_weather(ctx: MessageContext, params: str) -> bool: +def ai_handle_weather(ctx: MessageContext, city: str, **kwargs) -> bool: """AI路由的天气查询处理""" - city_name = params.strip() + city_name = city.strip() if not city_name: ctx.send_text("🤔 请告诉我你想查询哪个城市的天气") return True @@ -66,11 +74,14 @@ def ai_handle_weather(ctx: MessageContext, params: str) -> bool: # ======== 新闻功能 ======== @ai_router.register( name="news_query", - description="获取今日新闻", - examples=["看看今天的新闻", "今日要闻"], - params_description="无需参数" + description="获取今日重要新闻和要闻", + parameters={ + "type": "object", + "properties": {}, + "required": [] + } ) -def ai_handle_news(ctx: MessageContext, params: str) -> bool: +def ai_handle_news(ctx: MessageContext, **kwargs) -> bool: """AI路由的新闻查询处理""" try: from function.func_news import News @@ -95,76 +106,259 @@ def ai_handle_news(ctx: MessageContext, params: str) -> bool: # ======== 提醒功能 ======== @ai_router.register( name="reminder_set", - description="设置提醒", - examples=["提醒我明天下午3点开会", "每天早上8点提醒我吃早餐"], - params_description="时间和内容" + description="为用户设置一个新的提醒", + parameters={ + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "提醒的具体内容和时间,如:明天下午3点开会、每天早上8点吃早餐" + } + }, + "required": ["content"] + } ) -def ai_handle_reminder_set(ctx: MessageContext, params: str) -> bool: +def ai_handle_reminder_set(ctx: MessageContext, content: str, **kwargs) -> bool: """AI路由的提醒设置处理""" - if not params.strip(): + if not content.strip(): at_list = ctx.msg.sender if ctx.is_group else "" ctx.send_text("请告诉我需要提醒什么内容和时间呀~", at_list) return True - + # 调用原有的提醒处理逻辑 from .handlers import handle_reminder - + # 临时修改消息内容以适配原有处理器 original_content = ctx.msg.content - ctx.msg.content = f"提醒我{params}" - + ctx.msg.content = f"提醒我{content}" + # handle_reminder不使用match参数,直接传None result = handle_reminder(ctx, None) - + # 恢复原始内容 ctx.msg.content = original_content - + return result @ai_router.register( name="reminder_list", - description="查看所有提醒", - examples=["查看我的提醒", "我有哪些提醒"], - params_description="无需参数" + description="查看用户已经设置的所有提醒列表", + parameters={ + "type": "object", + "properties": {}, + "required": [] + } ) -def ai_handle_reminder_list(ctx: MessageContext, params: str) -> bool: +def ai_handle_reminder_list(ctx: MessageContext, **kwargs) -> bool: """AI路由的提醒列表查看处理""" from .handlers import handle_list_reminders return handle_list_reminders(ctx, None) @ai_router.register( name="reminder_delete", - description="删除提醒", - examples=["删除开会的提醒", "取消明天的提醒"], - params_description="提醒描述" + description="删除指定的提醒", + parameters={ + "type": "object", + "properties": { + "description": { + "type": "string", + "description": "要删除的提醒的描述或关键词,如:开会、早餐、明天的提醒" + } + }, + "required": ["description"] + } ) -def ai_handle_reminder_delete(ctx: MessageContext, params: str) -> bool: +def ai_handle_reminder_delete(ctx: MessageContext, description: str, **kwargs) -> bool: """AI路由的提醒删除处理""" # 调用原有的删除提醒逻辑 from .handlers import handle_delete_reminder - + # 临时修改消息内容 original_content = ctx.msg.content - ctx.msg.content = f"删除提醒 {params}" - + ctx.msg.content = f"删除提醒 {description}" + # handle_delete_reminder不使用match参数,直接传None result = handle_delete_reminder(ctx, None) - + # 恢复原始内容 ctx.msg.content = original_content - + + return result + +# ======== 帮助功能 ======== +@ai_router.register( + name="help", + description="显示机器人的帮助信息和可用指令列表", + parameters={ + "type": "object", + "properties": {}, + "required": [] + } +) +def ai_handle_help(ctx: MessageContext, **kwargs) -> bool: + """AI路由的帮助处理""" + help_text = [ + "🤖 泡泡智能助手 🤖", + "", + "🌟 我现在支持自然语言交互!你可以用平常说话的方式和我对话:", + "", + "【天气查询】", + "💬 \"北京今天天气怎么样\"", + "💬 \"上海明天会下雨吗\"", + "💬 \"查一下深圳的天气预报\"", + "", + "【新闻资讯】", + "💬 \"看看今天的新闻\"", + "💬 \"有什么重要新闻吗\"", + "", + "【智能提醒】", + "💬 \"提醒我明天下午3点开会\"", + "💬 \"每天早上8点提醒我吃早餐\"", + "💬 \"查看我的提醒\"", + "💬 \"删掉开会的提醒\"", + "", + "【智能搜索】", + "💬 \"搜索Python最新特性\"", + "💬 \"查一下机器学习教程\"", + "", + "【群聊管理】", + "💬 \"总结一下最近的聊天\" (仅群聊)", + "💬 \"清除聊天历史\" (仅群聊)", + "💬 \"查看我的装备\" (仅群聊)", + "", + "【娱乐功能】", + "💬 \"骂一下@张三\" (仅群聊)", + "", + "✨ 直接用自然语言告诉我你想做什么,我会智能理解你的意图!", + "🔧 在群聊中需要@我才能使用功能哦~" + ] + help_message = "\n".join(help_text) + + # 发送消息 + ctx.send_text(help_message) + return True + +# ======== 消息管理功能 ======== +@ai_router.register( + name="summary", + description="总结群聊中最近的聊天消息内容", + parameters={ + "type": "object", + "properties": {}, + "required": [] + } +) +def ai_handle_summary(ctx: MessageContext, **kwargs) -> bool: + """AI路由的消息总结处理""" + if not ctx.is_group: + ctx.send_text("⚠️ 消息总结功能仅支持群聊") + return True + + from .handlers import handle_summary + return handle_summary(ctx, None) + +@ai_router.register( + name="clear_messages", + description="清除当前群聊的历史消息记录", + parameters={ + "type": "object", + "properties": {}, + "required": [] + } +) +def ai_handle_clear_messages(ctx: MessageContext, **kwargs) -> bool: + """AI路由的消息历史清除处理""" + if not ctx.is_group: + ctx.send_text("⚠️ 消息历史管理功能仅支持群聊") + return True + + from .handlers import handle_clear_messages + return handle_clear_messages(ctx, None) + +# ======== 决斗功能 ======== +@ai_router.register( + name="check_equipment", + description="查看玩家在决斗游戏中的魔法装备和道具", + parameters={ + "type": "object", + "properties": {}, + "required": [] + } +) +def ai_handle_check_equipment(ctx: MessageContext, **kwargs) -> bool: + """AI路由的装备查看处理""" + if not ctx.is_group: + ctx.send_text("❌ 装备查看功能只支持群聊") + return True + + from .handlers import handle_check_equipment + return handle_check_equipment(ctx, None) + +# ======== 娱乐功能 ======== +@ai_router.register( + name="insult_user", + description="骂指定的用户(娱乐功能,无恶意)", + parameters={ + "type": "object", + "properties": { + "target_user": { + "type": "string", + "description": "要骂的目标用户的名称或昵称" + } + }, + "required": ["target_user"] + } +) +def ai_handle_insult(ctx: MessageContext, target_user: str, **kwargs) -> bool: + """AI路由的骂人处理""" + if not ctx.is_group: + ctx.send_text("❌ 骂人功能只支持群聊") + return True + + # 解析参数,提取用户名 + user_name = target_user.strip() + if not user_name: + ctx.send_text("🤔 请告诉我要骂谁") + return True + + # 移除@符号 + user_name = user_name.replace("@", "").strip() + + # 创建一个假的match对象,因为原始处理器需要 + fake_match = type('MockMatch', (), { + 'group': lambda self, n: user_name if n == 1 else None + })() + + from .handlers import handle_insult + # 临时修改消息内容以适配原有处理器 + original_content = ctx.msg.content + ctx.msg.content = f"骂一下@{user_name}" + + result = handle_insult(ctx, fake_match) + + # 恢复原始内容 + ctx.msg.content = original_content + return result # ======== Perplexity搜索功能 ======== @ai_router.register( name="perplexity_search", - description="搜索查询资料并深度研究某个专业问题", - examples=["搜索Python最新特性", "查查机器学习教程"], - params_description="搜索内容" + description="使用Perplexity搜索查询资料并深度研究某个专业问题", + parameters={ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "要搜索的关键词或问题,如:Python最新特性、机器学习教程" + } + }, + "required": ["query"] + } ) -def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool: +def ai_handle_perplexity(ctx: MessageContext, query: str, **kwargs) -> bool: """AI路由的Perplexity搜索处理""" - if not params.strip(): + if not query.strip(): at_list = ctx.msg.sender if ctx.is_group else "" ctx.send_text("请告诉我你想搜索什么内容", at_list) return True @@ -176,7 +370,7 @@ def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool: return True # 调用Perplexity处理 - content_for_perplexity = f"ask {params}" + content_for_perplexity = f"ask {query}" chat_id = ctx.get_receiver() sender_wxid = ctx.msg.sender room_id = ctx.msg.roomid if ctx.is_group else None @@ -198,7 +392,7 @@ def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool: try: import time current_time = time.strftime("%H:%M", time.localtime()) - q_with_info = f"[{current_time}] {ctx.sender_name}: {params}" + q_with_info = f"[{current_time}] {ctx.sender_name}: {query}" rsp = chat_model.get_answer( question=q_with_info, diff --git a/commands/ai_router.py b/commands/ai_router.py index 341a81f..763c8ac 100644 --- a/commands/ai_router.py +++ b/commands/ai_router.py @@ -9,12 +9,26 @@ logger = logging.getLogger(__name__) @dataclass class AIFunction: - """AI可调用的功能定义""" + """AI可调用的功能定义 - 最原生实现""" name: str # 功能唯一标识名 handler: Callable # 处理函数 description: str # 功能描述(给AI看的) - examples: list[str] = field(default_factory=list) # 示例用法 - params_description: str = "" # 参数说明 + parameters: dict = field(default_factory=dict) # OpenAI function call参数定义 + + def to_function_schema(self) -> dict: + """转换为OpenAI function call schema格式""" + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.parameters or { + "type": "object", + "properties": {}, + "required": [] + } + } + } class AIRouter: """AI智能路由器""" @@ -23,17 +37,28 @@ class AIRouter: self.functions: Dict[str, AIFunction] = {} self.logger = logger - def register(self, name: str, description: str, examples: list[str] = None, params_description: str = ""): + def register(self, name: str, description: str, parameters: dict = None): """ - 装饰器:注册一个功能到AI路由器 - + 装饰器:注册一个功能到AI路由器 - 最原生实现 + + Args: + name: 功能名称 + description: 功能描述 + parameters: OpenAI function call参数定义 + @ai_router.register( name="weather_query", description="查询指定城市的天气预报", - examples=["北京天气怎么样", "查一下上海的天气", "明天深圳会下雨吗"], - params_description="城市名称" + parameters={ + "type": "object", + "properties": { + "city": {"type": "string", "description": "城市名称"} + }, + "required": ["city"] + } ) - def handle_weather(ctx: MessageContext, params: str) -> bool: + def handle_weather(ctx: MessageContext, **kwargs) -> bool: + city = kwargs.get('city') # 实现天气查询逻辑 pass """ @@ -42,211 +67,154 @@ class AIRouter: name=name, handler=func, description=description, - examples=examples or [], - params_description=params_description + parameters=parameters or {} ) self.functions[name] = ai_func - self.logger.info(f"AI路由器注册功能: {name} - {description}") + self.logger.info(f"注册Function Call功能: {name} - {description}") return func - + return decorator - def _build_ai_prompt(self) -> str: - """构建给AI的系统提示词,包含所有可用功能的信息""" - prompt = """你是一个智能路由助手。根据用户的输入,判断用户的意图并返回JSON格式的响应。 - - ### 注意: - 1. 你需要优先判断自己是否可以直接回答用户的问题,如果你可以直接回答,则返回 "chat",无需返回 "function" - 2. 如果用户输入中包含多个功能,请优先匹配最符合用户意图的功能。如果无法判断,则返回 "chat"。 - 3. 优先考虑使用 chat 处理,需要外部资料或其他功能逻辑时,再返回 "function"。 - - ### 可用的功能列表: - """ - for name, func in self.functions.items(): - prompt += f"\n- {name}: {func.description}" - if func.params_description: - prompt += f"\n 参数: {func.params_description}" - if func.examples: - prompt += f"\n 示例: {', '.join(func.examples[:3])}" - prompt += "\n" - - prompt += """ - 请你分析用户输入,严格按照以下格式返回JSON: - - ### 返回格式: - - 1. 如果用户只是聊天或者不匹配任何功能,返回: - { - "action_type": "chat" - } - - 2.如果用户需要使用上述功能之一,返回: - { - "action_type": "function", - "function_name": "上述功能列表中的功能名", - "params": "从用户输入中提取的参数" - } - - #### 示例: - - 用户输入"北京天气怎么样" -> {"action_type": "function", "function_name": "weather_query", "params": "北京"} - - 用户输入"看看新闻" -> {"action_type": "function", "function_name": "news_query", "params": ""} - - 用户输入"你好" -> {"action_type": "chat"} - - 用户输入"查一下Python教程" -> {"action_type": "function", "function_name": "perplexity_search", "params": "Python教程"} - - #### 格式注意事项: - 1. action_type 只能是 "function" 或 "chat" - 2. 只返回JSON,无需其他解释 - 3. function_name 必须完全匹配上述功能列表中的名称 - """ - return prompt + def _build_function_tools(self, functions: Dict[str, AIFunction]) -> list: + """构建function call的tools参数""" + return [func.to_function_schema() for func in functions.values()] - def route(self, ctx: MessageContext) -> Tuple[bool, Optional[Dict[str, Any]]]: + def handle_standard_function_call(self, ctx: MessageContext) -> bool: """ - AI路由决策 - - 返回: (是否处理成功, AI决策结果) + 标准的OpenAI Function Call实现 + 支持多轮调用、函数结果反馈、AI最终回复 """ - print(f"[AI路由器] route方法被调用") - if not ctx.text: - print("[AI路由器] ctx.text为空,返回False") - return False, None - - # 获取AI模型 - chat_model = getattr(ctx, 'chat', None) - if not chat_model: - chat_model = getattr(ctx.robot, 'chat', None) if ctx.robot else None - - if not chat_model: - print("[AI路由器] 无可用的AI模型") - self.logger.error("AI路由器:无可用的AI模型") - return False, None - - print(f"[AI路由器] 找到AI模型: {type(chat_model)}") - - try: - # 构建系统提示词 - system_prompt = self._build_ai_prompt() - print(f"[AI路由器] 已构建系统提示词,长度: {len(system_prompt)}") - - # 让AI分析用户意图 - user_input = f"用户输入:{ctx.text}" - print(f"[AI路由器] 准备调用AI分析意图: {user_input}") - - ai_response = chat_model.get_answer( - user_input, - wxid=ctx.get_receiver(), - system_prompt_override=system_prompt - ) - - print(f"[AI路由器] AI响应: {ai_response}") - - # 解析AI返回的JSON - json_match = re.search(r'\{.*\}', ai_response, re.DOTALL) - if not json_match: - self.logger.warning(f"AI路由器:无法从AI响应中提取JSON - {ai_response}") - return False, None - - decision = json.loads(json_match.group(0)) - - # 验证决策格式 - action_type = decision.get("action_type") - if action_type not in ["chat", "function"]: - self.logger.warning(f"AI路由器:未知的action_type - {action_type}") - return False, None - - # 如果是功能调用,验证功能名 - if action_type == "function": - function_name = decision.get("function_name") - if function_name not in self.functions: - self.logger.warning(f"AI路由器:未知的功能名 - {function_name}") - return False, None - - self.logger.info(f"AI路由决策: {decision}") - return True, decision - - except json.JSONDecodeError as e: - self.logger.error(f"AI路由器:解析JSON失败 - {e}") - return False, None - except Exception as e: - self.logger.error(f"AI路由器:处理异常 - {e}") - return False, None - - def _check_permission(self, ctx: MessageContext) -> bool: - """ - 检查是否有权限使用AI路由功能 - - :param ctx: 消息上下文 - :return: 是否有权限 - """ - # 检查是否启用AI路由 - ai_router_config = getattr(ctx.config, 'AI_ROUTER', {}) - if not ai_router_config.get('enable', True): - self.logger.info("AI路由功能已禁用") - return False - - # 私聊始终允许 - if not ctx.is_group: - return True - - # 群聊需要检查白名单 - allowed_groups = ai_router_config.get('allowed_groups', []) - current_group = ctx.get_receiver() - - if current_group in allowed_groups: - self.logger.info(f"群聊 {current_group} 在AI路由白名单中,允许使用") - return True - else: - self.logger.info(f"群聊 {current_group} 不在AI路由白名单中,禁止使用") return False + # 获取AI模型 + chat_model = getattr(ctx, 'chat', None) or getattr(ctx.robot, 'chat', None) + if not chat_model: + self.logger.error("无可用的AI模型") + return False + + try: + # 构建所有可用函数的tools + tools = self._build_function_tools(self.functions) + specific_max_history = getattr(ctx, 'specific_max_history', None) + + # 初始化对话历史 + conversation = [{"role": "user", "content": ctx.text}] + + # 最多5轮function call,防止无限循环 + max_iterations = 5 + + for iteration in range(max_iterations): + self.logger.debug(f"Function Call第{iteration+1}轮") + + # 调用AI模型 + response = chat_model.get_answer( + question="", # 使用conversation模式,question可以为空 + wxid=ctx.get_receiver(), + tools=tools, + specific_max_history=specific_max_history, + conversation_history=conversation # 传递完整对话历史 + ) + + # 如果AI直接回复文本(不调用函数) + if isinstance(response, str): + at_list = ctx.msg.sender if ctx.is_group else "" + ctx.send_text(response, at_list) + return True + + # 如果AI调用函数 + if isinstance(response, dict) and 'tool_calls' in response: + tool_calls = response['tool_calls'] + + # 添加assistant消息到对话历史 + conversation.append({ + "role": "assistant", + "tool_calls": tool_calls + }) + + # 执行所有函数调用 + for tool_call in tool_calls: + function_name = tool_call['function']['name'] + arguments = json.loads(tool_call['function']['arguments']) + + self.logger.info(f"执行函数: {function_name}, 参数: {arguments}") + + # 执行函数 + func = self.functions.get(function_name) + if func: + try: + # 调用函数处理器 + success = func.handler(ctx, **arguments) + function_result = "执行成功" if success else "执行失败" + except Exception as e: + self.logger.error(f"函数{function_name}执行错误: {e}") + function_result = f"执行错误: {str(e)}" + else: + function_result = f"函数{function_name}不存在" + + # 添加函数结果到对话历史 + conversation.append({ + "role": "tool", + "tool_call_id": tool_call.get('id', f"call_{function_name}"), + "content": function_result + }) + + # 继续下一轮,让AI基于函数结果继续思考 + continue + + # 如果响应格式异常,跳出循环 + break + + # 如果达到最大迭代次数,让AI生成最终回复 + if iteration == max_iterations - 1: + final_response = chat_model.get_answer( + question="请基于以上函数调用结果,生成最终回复。", + wxid=ctx.get_receiver(), + specific_max_history=specific_max_history, + conversation_history=conversation + ) + + if isinstance(final_response, str): + at_list = ctx.msg.sender if ctx.is_group else "" + ctx.send_text(final_response, at_list) + return True + + return True + + except Exception as e: + self.logger.error(f"标准Function Call处理异常: {e}") + return False + def dispatch(self, ctx: MessageContext) -> bool: """ - 执行AI路由分发 - - 返回: 是否成功处理 + 标准Function Call分发器 """ - print(f"[AI路由器] dispatch被调用,消息内容: {ctx.text}") - - # 检查权限 - if not self._check_permission(ctx): - print("[AI路由器] 权限检查失败,返回False") + if not ctx.text: return False - - # 获取AI路由决策 - success, decision = self.route(ctx) - print(f"[AI路由器] route返回 - success: {success}, decision: {decision}") - - if not success or not decision: - print("[AI路由器] route失败或无决策,返回False") - return False - - action_type = decision.get("action_type") - - # 如果是聊天,返回False让后续处理器处理 - if action_type == "chat": - self.logger.info("AI路由器:识别为聊天意图,交给聊天处理器") - return False - - # 如果是功能调用 - if action_type == "function": - function_name = decision.get("function_name") - params = decision.get("params", "") - - func = self.functions.get(function_name) - if not func: - self.logger.error(f"AI路由器:功能 {function_name} 未找到") + + # 调用标准Function Call处理 + success = self.handle_standard_function_call(ctx) + + if not success: + # 如果Function Call失败,回退到聊天模式 + return self._handle_chitchat(ctx) + + return True + + def _handle_chitchat(self, ctx: MessageContext) -> bool: + """ + 处理闲聊逻辑 - 最简实现 + """ + try: + if not ctx.text: return False - - try: - self.logger.info(f"AI路由器:调用功能 {function_name},参数: {params}") - result = func.handler(ctx, params) - return result - except Exception as e: - self.logger.error(f"AI路由器:执行功能 {function_name} 出错 - {e}") - return False - - return False + + # 调用闲聊处理器 + from .handlers import handle_chitchat + return handle_chitchat(ctx, None) + except Exception as e: + self.logger.error(f"闲聊处理出错: {e}") + return False # 创建全局AI路由器实例 ai_router = AIRouter() \ No newline at end of file diff --git a/commands/handlers.py b/commands/handlers.py index 64938b2..6566bc9 100644 --- a/commands/handlers.py +++ b/commands/handlers.py @@ -4,88 +4,12 @@ from typing import Optional, Match, Dict, Any import json # 确保已导入json from datetime import datetime # 确保已导入datetime import os # 导入os模块用于文件路径操作 -# from function.func_duel import DuelRankSystem # 前向引用避免循环导入 from typing import TYPE_CHECKING if TYPE_CHECKING: from .context import MessageContext -def handle_help(ctx: 'MessageContext', match: Optional[Match]) -> bool: - """ - 处理 "帮助" 命令 - - 匹配: info/帮助/指令 - """ - help_text = [ - "🤖 泡泡的指令列表 🤖", - "", - "【实用工具】", - "- 天气/温度 [城市名]", - "- 天气预报/预报 [城市名]", - "- 新闻", - "- ask [问题]", - "", - "【决斗 & 偷袭】", - "- 决斗@XX", - "- 偷袭@XX", - "- 决斗排行/排行榜", - "- 我的战绩/决斗战绩", - "- 我的装备/查看装备", - "- 改名 [旧名] [新名]", - "", - "【提醒】", - "- 提醒xxxxx:一次性、每日、每周", - "- 查看提醒/我的提醒/提醒列表", - "- 删..提醒..", - "", - "【群聊工具】", - "- summary/总结", - "- clearmessages/清除历史", - "" - ] - help_text = "\n".join(help_text) - - # 发送消息 - return ctx.send_text(help_text) - -def handle_check_equipment(ctx: 'MessageContext', match: Optional[Match]) -> bool: - """ - 处理 "查看装备" 命令 - - 匹配: 我的装备/查看装备 - """ - if not ctx.is_group: - ctx.send_text("❌ 装备查看功能只支持群聊") - return True - - try: - from function.func_duel import DuelRankSystem - - player_name = ctx.sender_name - rank_system = DuelRankSystem(ctx.msg.roomid) - player_data = rank_system.get_player_data(player_name) - - if not player_data: - ctx.send_text(f"⚠️ 没有找到 {player_name} 的数据") - return True - - items = player_data.get("items", {"elder_wand": 0, "magic_stone": 0, "invisibility_cloak": 0}) - result = [ - f"🧙‍♂️ {player_name} 的魔法装备:", - f"🪄 老魔杖: {items.get('elder_wand', 0)}次 ", - f"💎 魔法石: {items.get('magic_stone', 0)}次", - f"🧥 隐身衣: {items.get('invisibility_cloak', 0)}次 " - ] - - ctx.send_text("\n".join(result)) - - return True - except Exception as e: - if ctx.logger: - ctx.logger.error(f"查看装备出错: {e}") - ctx.send_text("⚠️ 查看装备失败") - return False def handle_summary(ctx: 'MessageContext', match: Optional[Match]) -> bool: """ @@ -149,45 +73,6 @@ def handle_clear_messages(ctx: 'MessageContext', match: Optional[Match]) -> bool ctx.send_text("⚠️ 清除消息历史失败") return False -def handle_news_request(ctx: 'MessageContext', match: Optional[Match]) -> bool: - """ - 处理 "新闻" 命令 - - 匹配: 新闻 - """ - if ctx.logger: - ctx.logger.info(f"收到来自 {ctx.sender_name} (群聊: {ctx.msg.roomid if ctx.is_group else '无'}) 的新闻请求") - - try: - from function.func_news import News - news_instance = News() - # 调用方法,接收返回的元组(is_today, news_content) - is_today, news_content = news_instance.get_important_news() - - receiver = ctx.get_receiver() - sender_for_at = ctx.msg.sender if ctx.is_group else "" # 群聊中@请求者 - - if is_today: - # 是当天新闻,直接发送 - ctx.send_text(f"📰 今日要闻来啦:\n{news_content}", sender_for_at) - else: - # 不是当天新闻或获取失败 - if news_content: - # 有内容,说明是旧闻 - prompt = "ℹ️ 今日新闻暂未发布,为您找到最近的一条新闻:" - ctx.send_text(f"{prompt}\n{news_content}", sender_for_at) - else: - # 内容为空,说明获取彻底失败 - ctx.send_text("❌ 获取新闻失败,请稍后重试或联系管理员。", sender_for_at) - - return True # 无论结果如何,命令本身算成功处理 - - except Exception as e: - if ctx.logger: ctx.logger.error(f"处理新闻请求时出错: {e}") - receiver = ctx.get_receiver() - sender_for_at = ctx.msg.sender if ctx.is_group else "" - ctx.send_text("❌ 获取新闻时发生错误,请稍后重试。", sender_for_at) - return False # 处理失败 def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool: """ @@ -409,114 +294,6 @@ def handle_insult(ctx: 'MessageContext', match: Optional[Match]) -> bool: ctx.send_text("呃,我想骂但出错了...") return True -def handle_perplexity_ask(ctx: 'MessageContext', match: Optional[Match]) -> bool: - """ - 处理 "ask" 命令,调用 Perplexity AI - - 匹配: ask [问题内容] - """ - if not match: # 理论上正则匹配成功才会被调用,但加个检查更安全 - return False - - # 1. 尝试从 Robot 实例获取 Perplexity 实例 - perplexity_instance = getattr(ctx.robot, 'perplexity', None) - - # 2. 检查 Perplexity 实例是否存在 - if not perplexity_instance: - if ctx.logger: - ctx.logger.warning("尝试调用 Perplexity,但实例未初始化或未配置。") - ctx.send_text("❌ Perplexity 功能当前不可用或未正确配置。") - return True # 命令已被处理(错误处理也是处理) - - # 3. 从匹配结果中提取问题内容 - prompt = match.group(1).strip() - if not prompt: # 如果 'ask' 后面没有内容 - ctx.send_text("请在 'ask' 后面加上您想问的问题。", ctx.msg.sender if ctx.is_group else None) - return True # 命令已被处理 - - # 4. 准备调用 Perplexity 实例的 process_message 方法 - if ctx.logger: - ctx.logger.info(f"检测到 Perplexity 请求,发送者: {ctx.sender_name}, 问题: {prompt[:50]}...") - - # 准备参数并调用 process_message - # 确保无论用户输入有没有空格,都以标准格式"ask 问题"传给process_message - content_for_perplexity = f"ask {prompt}" # 重构包含触发词的内容 - chat_id = ctx.get_receiver() - sender_wxid = ctx.msg.sender - room_id = ctx.msg.roomid if ctx.is_group else None - is_group = ctx.is_group - - # 5. 调用 process_message 并返回其结果 - was_handled, fallback_prompt = perplexity_instance.process_message( - content=content_for_perplexity, - chat_id=chat_id, - sender=sender_wxid, - roomid=room_id, - from_group=is_group, - send_text_func=ctx.send_text - ) - - # 6. 如果没有被处理且有备选prompt,使用默认AI处理 - if not was_handled and fallback_prompt: - if ctx.logger: - ctx.logger.info(f"使用备选prompt '{fallback_prompt[:20]}...' 调用默认AI处理") - - # 获取当前选定的AI模型 - chat_model = None - if hasattr(ctx, 'chat'): - chat_model = ctx.chat - elif ctx.robot and hasattr(ctx.robot, 'chat'): - chat_model = ctx.robot.chat - - if chat_model: - # 使用与 handle_chitchat 类似的逻辑,但使用备选prompt - try: - # 格式化消息,与 handle_chitchat 保持一致 - if ctx.robot and hasattr(ctx.robot, "xml_processor"): - if ctx.is_group: - msg_data = ctx.robot.xml_processor.extract_quoted_message(ctx.msg) - q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, ctx.sender_name) - else: - msg_data = ctx.robot.xml_processor.extract_private_quoted_message(ctx.msg) - q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, ctx.sender_name) - - if not q_with_info: - import time - current_time = time.strftime("%H:%M", time.localtime()) - q_with_info = f"[{current_time}] {ctx.sender_name}: {prompt or '[空内容]'}" - else: - import time - current_time = time.strftime("%H:%M", time.localtime()) - q_with_info = f"[{current_time}] {ctx.sender_name}: {prompt or '[空内容]'}" - - if ctx.logger: - ctx.logger.info(f"发送给默认AI的消息内容: {q_with_info}") - - # 调用 AI 模型时传入备选 prompt - # 需要调整 get_answer 方法以支持 system_prompt_override 参数 - # 这里我们假设已对各AI模型实现了这个参数 - specific_max_history = getattr(ctx, 'specific_max_history', None) - rsp = chat_model.get_answer( - question=q_with_info, - wxid=ctx.get_receiver(), - system_prompt_override=fallback_prompt, - specific_max_history=specific_max_history - ) - - if rsp: - # 发送回复 - at_list = ctx.msg.sender if ctx.is_group else "" - ctx.send_text(rsp, at_list) - - return True - else: - if ctx.logger: - ctx.logger.error("无法从默认AI获得答案") - except Exception as e: - if ctx.logger: - ctx.logger.error(f"使用备选prompt调用默认AI时出错: {e}") - - return was_handled def handle_reminder(ctx: 'MessageContext', match: Optional[Match]) -> bool: """处理来自私聊或群聊的 '提醒' 命令,支持批量添加多个提醒""" @@ -1042,71 +819,3 @@ def handle_delete_reminder(ctx: 'MessageContext', match: Optional[Match]) -> boo ctx.logger.error(f"handle_delete_reminder AI 部分顶层错误: {e}", exc_info=True) return True -def handle_weather_forecast(ctx: 'MessageContext', match: Optional[Match]) -> bool: - """ - 处理 "天气预报" 或 "预报" 命令 - - 匹配: 天气预报 [城市名] 或 预报 [城市名] - """ - if not match: - return False - - city_name = match.group(1).strip() - if not city_name: - ctx.send_text("🤔 请告诉我你想查询哪个城市的天气预报,例如:天气预报 北京") - return True - - if ctx.logger: - ctx.logger.info(f"天气预报查询指令匹配: 城市={city_name}") - - # --- 加载城市代码 --- - city_codes: Dict[str, str] = {} - city_code_path = os.path.join(os.path.dirname(__file__), '..', 'function', 'main_city.json') # 确保路径正确 - try: - with open(city_code_path, 'r', encoding='utf-8') as f: - city_codes = json.load(f) - except FileNotFoundError: - if ctx.logger: - ctx.logger.error(f"城市代码文件未找到: {city_code_path}") - ctx.send_text("⚠️ 抱歉,天气功能所需的城市列表文件丢失了。") - return True - except json.JSONDecodeError: - if ctx.logger: - ctx.logger.error(f"无法解析城市代码文件: {city_code_path}") - ctx.send_text("⚠️ 抱歉,天气功能的城市列表文件格式错误。") - return True - except Exception as e: - if ctx.logger: - ctx.logger.error(f"加载城市代码时发生未知错误: {e}", exc_info=True) - ctx.send_text("⚠️ 抱歉,加载城市代码时发生错误。") - return True - # --- 城市代码加载完毕 --- - - city_code = city_codes.get(city_name) - - if not city_code: - # 尝试模糊匹配 (可选,如果需要) - found = False - for name, code in city_codes.items(): - if city_name in name: # 如果输入的名字是城市全名的一部分 - city_code = code - city_name = name # 使用找到的完整城市名 - if ctx.logger: - ctx.logger.info(f"城市 '{match.group(1).strip()}' 未精确匹配,使用模糊匹配结果: {city_name} ({city_code})") - found = True - break - if not found: - ctx.send_text(f"😕 找不到城市 '{city_name}' 的天气信息,请检查城市名称是否正确。") - return True - - # 获取天气信息 (包含预报) - try: - from function.func_weather import Weather - weather_info = Weather(city_code).get_weather(include_forecast=True) # 注意这里传入True - ctx.send_text(weather_info) - except Exception as e: - if ctx.logger: - ctx.logger.error(f"获取城市 {city_name}({city_code}) 天气预报时出错: {e}", exc_info=True) - ctx.send_text(f"😥 获取 {city_name} 天气预报时遇到问题,请稍后再试。") - - return True \ No newline at end of file diff --git a/commands/models.py b/commands/models.py deleted file mode 100644 index 4034e3f..0000000 --- a/commands/models.py +++ /dev/null @@ -1,38 +0,0 @@ -import re -from dataclasses import dataclass -from typing import Pattern, Callable, Literal, Optional, Any, Union, Match - -# 导入 MessageContext,使用前向引用避免循环导入 -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from .context import MessageContext - - -@dataclass -class Command: - """ - 命令定义类,封装命令的匹配条件和处理函数 - """ - name: str # 命令名称,用于日志和调试 - pattern: Union[Pattern, Callable[['MessageContext'], Optional[Match]]] # 匹配规则:正则表达式或自定义匹配函数 - scope: Literal["group", "private", "both"] # 生效范围: "group"-仅群聊, "private"-仅私聊, "both"-两者都可 - handler: Callable[['MessageContext', Optional[Match]], bool] # 处理函数 - need_at: bool = False # 在群聊中是否必须@机器人才能触发 - priority: int = 100 # 优先级,数字越小越先匹配 - description: str = "" # 命令的描述,用于生成帮助信息 - - def __post_init__(self): - """验证命令配置的有效性""" - if self.scope not in ["group", "private", "both"]: - raise ValueError(f"无效的作用域: {self.scope},必须是 'group', 'private' 或 'both'") - - # 检查pattern是否为正则表达式或可调用对象 - if not isinstance(self.pattern, (Pattern, Callable)): - # 如果是字符串,尝试转换为正则表达式 - if isinstance(self.pattern, str): - try: - self.pattern = re.compile(self.pattern) - except re.error: - raise ValueError(f"无效的正则表达式: {self.pattern}") - else: - raise TypeError(f"pattern 必须是正则表达式或可调用对象,而不是 {type(self.pattern)}") \ No newline at end of file diff --git a/commands/registry.py b/commands/registry.py deleted file mode 100644 index 6b6bcd6..0000000 --- a/commands/registry.py +++ /dev/null @@ -1,136 +0,0 @@ -import re -from .models import Command -from .handlers import ( - handle_help, - # handle_duel, handle_sneak_attack, handle_duel_rank, - # handle_duel_stats, handle_check_equipment, handle_rename, - handle_summary, handle_clear_messages, handle_news_request, - handle_chitchat, handle_insult, - handle_perplexity_ask, handle_reminder, handle_list_reminders, handle_delete_reminder, - handle_weather_forecast -) - -# 命令列表,按优先级排序 -# 优先级越小越先匹配 -COMMANDS = [ - # ======== 基础系统命令 ======== - Command( - name="help", - pattern=re.compile(r"^(info|帮助|指令)$", re.IGNORECASE), - scope="both", # 群聊和私聊都支持 - need_at=False, # 不需要@机器人 - priority=10, # 优先级较高 - handler=handle_help, - description="显示机器人的帮助信息" - ), - - # ======== Perplexity AI 命令 ======== - Command( - name="perplexity_ask", - pattern=re.compile(r"^ask\s*(.+)", re.IGNORECASE | re.DOTALL), - scope="both", # 群聊和私聊都支持 - need_at=True, # 需要@机器人 - priority=25, # 较高优先级,确保在闲聊之前处理 - handler=handle_perplexity_ask, - description="使用 Perplexity AI 进行深度查询" - ), - - # ======== 消息管理命令 ======== - Command( - name="summary", - pattern=re.compile(r"^(summary|总结)$", re.IGNORECASE), - scope="group", # 仅群聊支持 - need_at=True, # 需要@机器人 - priority=30, # 优先级一般 - handler=handle_summary, - description="总结群聊最近的消息" - ), - - Command( - name="clear_messages", - pattern=re.compile(r"^(clearmessages|清除历史)$", re.IGNORECASE), - scope="group", # 仅群聊支持 - need_at=True, # 需要@机器人 - priority=31, # 优先级一般 - handler=handle_clear_messages, - description="从数据库中清除群聊的历史消息记录" - ), - - # ======== 提醒功能 ======== - Command( - name="reminder", - pattern=re.compile(r"提醒我", re.IGNORECASE), - scope="both", # 支持群聊和私聊 - need_at=True, # 在群聊中需要@机器人 - priority=35, # 优先级适中,在基础命令后,复杂功能或闲聊前 - handler=handle_reminder, - description="设置一个提醒 (包含 '提醒我' 关键字即可, 例如:提醒我明天下午3点开会)" - ), - - Command( - name="list_reminders", - pattern=re.compile(r"^(查看提醒|我的提醒|提醒列表)$", re.IGNORECASE), - scope="both", # 支持群聊和私聊 - need_at=True, # 在群聊中需要@机器人 - priority=36, # 优先级略低于设置提醒 - handler=handle_list_reminders, - description="查看您设置的所有提醒" - ), - - Command( - name="delete_reminder", - # 修改为只匹配包含"删"、"删除"或"取消"的消息,不再要求特定格式 - pattern=re.compile(r"(?:删|删除|取消)", re.IGNORECASE), - scope="both", # 支持群聊和私聊 - need_at=True, # 在群聊中需要@机器人 - priority=37, - handler=handle_delete_reminder, - description="删除提醒 (包含'删'和'提醒'关键字即可,如: 把开会的提醒删了)" - ), - - # ======== 新闻和实用工具 ======== - Command( - name="weather_forecast", - pattern=re.compile(r"^(?:天气预报|天气)\s+(.+)$"), # 匹配 天气预报/预报 城市名 - scope="both", # 群聊和私聊都支持 - need_at=True, # 需要@机器人 - priority=38, # 优先级比天气高一点 - handler=handle_weather_forecast, - description="查询指定城市未来几天的天气预报 (例如:天气预报 北京)" - ), - - Command( - name="news", - pattern=re.compile(r"^新闻$"), - scope="both", # 群聊和私聊都支持 - need_at=True, # 需要@机器人 - priority=40, # 优先级一般 - handler=handle_news_request, - description="获取最新新闻" - ), - - # ======== 骂人命令 ======== - Command( - name="insult", - pattern=re.compile(r"骂一下\s*@([^\s@]+)"), - scope="group", # 仅群聊支持 - need_at=True, # 需要@机器人 - priority=100, # 优先级较高 - handler=handle_insult, - description="骂指定用户" - ), - -] - -# 可以添加一个函数,获取命令列表的简单描述 -def get_commands_info(): - """获取所有命令的简要信息,用于调试""" - info = [] - for i, cmd in enumerate(COMMANDS): - scope_str = {"group": "仅群聊", "private": "仅私聊", "both": "群聊私聊"}[cmd.scope] - at_str = "需要@" if cmd.need_at else "不需@" - info.append(f"{i+1}. [{cmd.priority}] {cmd.name} ({scope_str},{at_str}) - {cmd.description or '无描述'}") - return "\n".join(info) - -# 导出所有命令 -__all__ = ["COMMANDS", "get_commands_info"] \ No newline at end of file diff --git a/commands/router.py b/commands/router.py deleted file mode 100644 index 8fd769d..0000000 --- a/commands/router.py +++ /dev/null @@ -1,117 +0,0 @@ -import re -import logging -from typing import List, Optional, Any, Dict, Match -import traceback - -from .models import Command -from .context import MessageContext - -# 获取模块级 logger -logger = logging.getLogger(__name__) - - -class CommandRouter: - """ - 命令路由器,负责将消息路由到对应的命令处理函数 - """ - def __init__(self, commands: List[Command], robot_instance: Optional[Any] = None): - # 按优先级排序命令列表,数字越小优先级越高 - self.commands = sorted(commands, key=lambda cmd: cmd.priority) - self.robot_instance = robot_instance - - # 分析并输出命令注册信息,便于调试 - scope_count = {"group": 0, "private": 0, "both": 0} - for cmd in commands: - scope_count[cmd.scope] += 1 - - logger.info(f"命令路由器初始化成功,共加载 {len(commands)} 个命令") - logger.info(f"命令作用域分布: 仅群聊 {scope_count['group']},仅私聊 {scope_count['private']},两者均可 {scope_count['both']}") - - # 按优先级输出命令信息 - for i, cmd in enumerate(self.commands[:10]): # 只输出前10个 - logger.info(f"{i+1}. [{cmd.priority}] {cmd.name} - {cmd.description or '无描述'}") - if len(self.commands) > 10: - logger.info(f"... 共 {len(self.commands)} 个命令") - - def dispatch(self, ctx: MessageContext) -> bool: - """ - 根据消息上下文分发命令 - :param ctx: 消息上下文对象 - :return: 是否有命令成功处理 - """ - # 确保context可以访问到robot实例 - if self.robot_instance and not ctx.robot: - ctx.robot = self.robot_instance - # 如果robot有logger属性且ctx没有logger,则使用robot的logger - if hasattr(self.robot_instance, 'LOG') and not ctx.logger: - ctx.logger = self.robot_instance.LOG - - # 记录日志,便于调试 - if ctx.logger: - ctx.logger.debug(f"开始路由消息: '{ctx.text}', 来自: {ctx.sender_name}, 群聊: {ctx.is_group}, @机器人: {ctx.is_at_bot}") - - # 遍历命令列表,按优先级顺序匹配 - for cmd in self.commands: - # 1. 检查作用域 (scope) - if cmd.scope != "both": - if (cmd.scope == "group" and not ctx.is_group) or \ - (cmd.scope == "private" and ctx.is_group): - continue # 作用域不匹配,跳过 - - # 2. 检查是否需要 @ (need_at) - 仅在群聊中有效 - if ctx.is_group and cmd.need_at and not ctx.is_at_bot: - continue # 需要@机器人但未被@,跳过 - - # 3. 执行匹配逻辑 - match_result = None - try: - # 根据pattern类型执行匹配 - if callable(cmd.pattern): - # 自定义匹配函数 - match_result = cmd.pattern(ctx) - else: - # 正则表达式匹配 - match_obj = cmd.pattern.search(ctx.text) - match_result = match_obj - - # 匹配失败,尝试下一个命令 - if match_result is None: - continue - - # 匹配成功,记录日志 - if ctx.logger: - ctx.logger.info(f"命令 '{cmd.name}' 匹配成功,准备处理") - - # 4. 执行命令处理函数 - try: - result = cmd.handler(ctx, match_result) - if result: - if ctx.logger: - ctx.logger.info(f"命令 '{cmd.name}' 处理成功") - return True - else: - if ctx.logger: - ctx.logger.warning(f"命令 '{cmd.name}' 处理返回False,尝试下一个命令") - except Exception as e: - if ctx.logger: - ctx.logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}") - ctx.logger.error(traceback.format_exc()) - else: - logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}", exc_info=True) - # 出错后继续尝试下一个命令 - except Exception as e: - # 匹配过程出错,记录并继续 - if ctx.logger: - ctx.logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}") - else: - logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}", exc_info=True) - continue - - # 所有命令都未匹配或处理失败 - if ctx.logger: - ctx.logger.debug("所有命令匹配失败或处理失败") - return False - - def get_command_descriptions(self) -> Dict[str, str]: - """获取所有命令的描述,用于生成帮助信息""" - return {cmd.name: cmd.description for cmd in self.commands if cmd.description} \ No newline at end of file diff --git a/function/func_duel.py b/function/func_duel.py deleted file mode 100644 index a8357a2..0000000 --- a/function/func_duel.py +++ /dev/null @@ -1,1546 +0,0 @@ -import random -import logging -import time -import json -import os -import sqlite3 -from typing import List, Dict, Tuple, Optional, Any -from threading import Thread, Lock - -# 获取 Logger 实例 -logger_duel = logging.getLogger("DuelRankSystem") - -# 排位积分系统 -class DuelRankSystem: - # 使用线程锁确保数据库操作的线程安全 - _db_lock = Lock() - - def __init__(self, group_id=None, db_path="data/message_history.db"): - """ - 初始化排位系统 - - Args: - group_id: 群组ID - db_path: 数据库文件路径 - """ - # 确保group_id不为空,现在只支持群聊 - if not group_id: - raise ValueError("决斗功能只支持群聊") - - self.group_id = group_id - self.db_path = db_path - self._init_db() # 初始化数据库 - - def _get_db_conn(self) -> sqlite3.Connection: - """获取数据库连接""" - try: - conn = sqlite3.connect(self.db_path, timeout=10, check_same_thread=False) - conn.row_factory = sqlite3.Row # 让查询结果可以像字典一样访问列 - return conn - except sqlite3.Error as e: - logger_duel.error(f"无法连接到 SQLite 数据库 '{self.db_path}': {e}", exc_info=True) - raise # 连接失败是严重问题,直接抛出异常 - - def _init_db(self): - """初始化数据库,创建表(如果不存在)""" - sql_create_players = """ - CREATE TABLE IF NOT EXISTS duel_players ( - group_id TEXT NOT NULL, - player_name TEXT NOT NULL, - score INTEGER DEFAULT 1000, - wins INTEGER DEFAULT 0, - losses INTEGER DEFAULT 0, - total_matches INTEGER DEFAULT 0, - elder_wand INTEGER DEFAULT 0, - magic_stone INTEGER DEFAULT 0, - invisibility_cloak INTEGER DEFAULT 0, - last_updated TEXT, - PRIMARY KEY (group_id, player_name) - ); - """ - # 移除了 duel_history 表的创建语句 - # 移除了相关索引的创建语句 - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - cursor.execute(sql_create_players) - # 移除了执行创建 duel_history 表和索引的命令 - conn.commit() - logger_duel.info("数据库表 'duel_players' 检查/创建 完成。") - except sqlite3.Error as e: - logger_duel.error(f"创建/检查数据库表失败: {e}", exc_info=True) - raise # 初始化失败是严重问题 - - def get_player_data(self, player_name: str) -> Dict: - """获取玩家数据,如果不存在则创建""" - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - # 查询玩家数据 - sql_query = """ - SELECT * FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_query, (self.group_id, player_name)) - result = cursor.fetchone() - - if result: - # 将 sqlite3.Row 转换为字典 - player_data = dict(result) - # 构造特殊的 items 字典 - player_data["items"] = { - "elder_wand": player_data.pop("elder_wand", 0), - "magic_stone": player_data.pop("magic_stone", 0), - "invisibility_cloak": player_data.pop("invisibility_cloak", 0) - } - return player_data - else: - # 玩家不存在,创建新玩家 - default_data = { - "score": 1000, - "wins": 0, - "losses": 0, - "total_matches": 0, - "items": { - "elder_wand": 0, - "magic_stone": 0, - "invisibility_cloak": 0 - } - } - - # 插入新玩家数据 - sql_insert = """ - INSERT INTO duel_players - (group_id, player_name, score, wins, losses, total_matches, - elder_wand, magic_stone, invisibility_cloak, last_updated) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) - """ - cursor.execute(sql_insert, ( - self.group_id, - player_name, - default_data["score"], - default_data["wins"], - default_data["losses"], - default_data["total_matches"], - default_data["items"]["elder_wand"], - default_data["items"]["magic_stone"], - default_data["items"]["invisibility_cloak"] - )) - conn.commit() - - logger_duel.info(f"创建了新玩家: {player_name} 在群组 {self.group_id}") - return default_data - - except sqlite3.Error as e: - logger_duel.error(f"获取玩家数据失败: {e}", exc_info=True) - # 出错时返回默认数据 - return { - "score": 1000, - "wins": 0, - "losses": 0, - "total_matches": 0, - "items": { - "elder_wand": 0, - "magic_stone": 0, - "invisibility_cloak": 0 - } - } - - def update_score(self, winner: str, loser: str, winner_hp: int, rounds: int) -> Tuple[int, int]: - """更新玩家积分 - - Args: - winner: 胜利者名称 - loser: 失败者名称 - winner_hp: 胜利者剩余生命值 - rounds: 决斗回合数 - - Returns: - Tuple[int, int]: (胜利者获得积分, 失败者失去积分) - """ - # 获取玩家数据 - winner_data = self.get_player_data(winner) - loser_data = self.get_player_data(loser) - - # 基础积分计算 - 回合数越少积分越高 - base_points = 100 - if rounds <= 5: # 速战速决 - base_points = 100 - elif rounds <= 10: - base_points = 60 - elif rounds >= 15: # 长时间战斗 - base_points = 40 - - # 计算总积分变化(剩余生命值作为百分比加成) - hp_percent_bonus = winner_hp / 100.0 # 血量百分比 - points = int(base_points * (hp_percent_bonus)) # 血量越多,积分越高 - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新胜利者数据 - sql_update_winner = """ - UPDATE duel_players SET - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_winner, (points, self.group_id, winner)) - - # 更新失败者数据 - sql_update_loser = """ - UPDATE duel_players SET - score = MAX(1, score - ?), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_loser, (points, self.group_id, loser)) - - # 移除了记录对战历史的代码 - - conn.commit() - logger_duel.info(f"{winner} 击败 {loser},获得 {points} 积分") - - return (points, points) # 返回胜者得分和败者失分(相同) - - except sqlite3.Error as e: - logger_duel.error(f"更新积分失败: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - - def get_rank_list(self, top_n: int = 10) -> List[Dict]: - """获取排行榜 - - Args: - top_n: 返回前几名 - - Returns: - List[Dict]: 排行榜数据 - """ - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - sql_query = """ - SELECT player_name, score, wins, losses, total_matches, - elder_wand, magic_stone, invisibility_cloak - FROM duel_players - WHERE group_id = ? - ORDER BY score DESC - LIMIT ? - """ - cursor.execute(sql_query, (self.group_id, top_n)) - results = cursor.fetchall() - - # 转换结果为字典列表,格式与原JSON格式相同 - ranked_players = [] - for row in results: - player_dict = dict(row) - player_name = player_dict.pop("player_name") - - # 构造与原格式相同的字典 - player = { - "name": player_name, - "score": player_dict["score"], - "wins": player_dict["wins"], - "losses": player_dict["losses"], - "total_matches": player_dict["total_matches"], - "items": { - "elder_wand": player_dict["elder_wand"], - "magic_stone": player_dict["magic_stone"], - "invisibility_cloak": player_dict["invisibility_cloak"] - } - } - ranked_players.append(player) - - return ranked_players - - except sqlite3.Error as e: - logger_duel.error(f"获取排行榜失败: {e}", exc_info=True) - return [] # 出错时返回空列表 - - def get_player_rank(self, player_name: str) -> Tuple[Optional[int], Dict]: - """获取玩家排名 - - Args: - player_name: 玩家名称 - - Returns: - Tuple[Optional[int], Dict]: (排名, 玩家数据) - """ - # 获取玩家数据 - player_data = self.get_player_data(player_name) - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 查询排行榜中有哪些分数比该玩家高 - sql_rank = """ - SELECT COUNT(*) + 1 as rank - FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - ) - """ - cursor.execute(sql_rank, (self.group_id, self.group_id, player_name)) - result = cursor.fetchone() - - if result: - rank = result["rank"] - return rank, player_data - else: - # 找不到玩家排名,可能是新玩家 - return None, player_data - - except sqlite3.Error as e: - logger_duel.error(f"获取玩家排名失败: {e}", exc_info=True) - return None, player_data # 出错时返回None作为排名 - - def change_player_name(self, old_name: str, new_name: str) -> bool: - """更改玩家名称 - - Args: - old_name: 旧名称 - new_name: 新名称 - - Returns: - bool: 是否成功更改 - """ - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 开启事务 - conn.execute("BEGIN TRANSACTION") - - # 检查旧名称是否存在 - sql_check_old = """ - SELECT COUNT(*) as count FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_check_old, (self.group_id, old_name)) - if cursor.fetchone()["count"] == 0: - conn.rollback() - return False - - # 检查新名称是否已存在 - sql_check_new = """ - SELECT COUNT(*) as count FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_check_new, (self.group_id, new_name)) - if cursor.fetchone()["count"] > 0: - conn.rollback() - return False - - # 更新玩家表 - sql_update_player = """ - UPDATE duel_players SET player_name = ? - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_player, (new_name, self.group_id, old_name)) - - # 移除了更新历史记录表中的胜者和败者名称的代码 - - # 提交事务 - conn.commit() - logger_duel.info(f"成功将玩家 {old_name} 改名为 {new_name}") - - return True - - except sqlite3.Error as e: - logger_duel.error(f"更改玩家名称失败: {e}", exc_info=True) - return False # 出错时返回失败 - - def update_score_by_magic(self, winner: str, loser: str, magic_power: int) -> Tuple[int, int]: - """根据魔法分数更新玩家积分 - - Args: - winner: 胜利者名称 - loser: 失败者名称 - magic_power: 决斗中所有参与者使用的魔法总分数 - - Returns: - Tuple[int, int]: (胜利者获得积分, 失败者失去积分) - """ - # 获取玩家数据 (这里只是为了确保玩家存在) - self.get_player_data(winner) - self.get_player_data(loser) - - # 使用魔法总分作为积分变化值 - points = magic_power - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新胜利者数据 - sql_update_winner = """ - UPDATE duel_players SET - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_winner, (points, self.group_id, winner)) - - # 更新失败者数据 - sql_update_loser = """ - UPDATE duel_players SET - score = MAX(1, score - ?), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_loser, (points, self.group_id, loser)) - - # 移除了记录对战历史的代码 - - conn.commit() - logger_duel.info(f"{winner} 使用魔法击败 {loser},获得 {points} 积分") - - return (points, points) # 返回胜者得分和败者失分(相同) - - except sqlite3.Error as e: - logger_duel.error(f"根据魔法分数更新积分失败: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - - def record_duel_result(self, winner: str, loser: str, winner_points: int, loser_points: int, total_magic_power: int, used_item: Optional[str] = None) -> Tuple[int, int]: - """记录决斗结果,更新玩家数据和历史记录 - - Args: - winner: 胜利者名称 - loser: 失败者名称 - winner_points: 胜利者获得的积分 - loser_points: 失败者失去的积分 - total_magic_power: 决斗中使用的总魔法力 - used_item: 本次决斗中使用的道具名称 (可选) - 可能是 "elder_wand"(老魔杖), "magic_stone"(魔法石), "invisibility_cloak"(隐身衣) - - Returns: - Tuple[int, int]: (胜利者实际获得积分, 失败者实际失去积分) - """ - # 获取玩家数据 (确保玩家存在) - self.get_player_data(winner) - self.get_player_data(loser) - - # 注意:loser_points 是正数,表示要扣除的分数 - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新胜利者数据 - sql_update_winner = """ - UPDATE duel_players SET - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_winner, (winner_points, self.group_id, winner)) - - # 更新失败者数据 - sql_update_loser = """ - UPDATE duel_players SET - score = MAX(1, score - ?), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_loser, (loser_points, self.group_id, loser)) - - # --- 改进处理道具消耗逻辑 --- - if used_item == "elder_wand": - # 老魔杖是胜利者使用的 - cursor.execute("UPDATE duel_players SET elder_wand = MAX(0, elder_wand - 1) WHERE group_id = ? AND player_name = ?", (self.group_id, winner)) - logger_duel.info(f"消耗了 {winner} 的老魔杖 (剩余数量将被更新)") - elif used_item == "magic_stone": - # 魔法石是失败者使用的 - cursor.execute("UPDATE duel_players SET magic_stone = MAX(0, magic_stone - 1) WHERE group_id = ? AND player_name = ?", (self.group_id, loser)) - logger_duel.info(f"消耗了 {loser} 的魔法石 (剩余数量将被更新)") - elif used_item == "invisibility_cloak": - # 隐身衣由胜利者使用 - cursor.execute("UPDATE duel_players SET invisibility_cloak = MAX(0, invisibility_cloak - 1) WHERE group_id = ? AND player_name = ?", (self.group_id, winner)) - logger_duel.info(f"消耗了 {winner} 的隐身衣 (剩余数量将被更新)") - # -------------------------- - - # 移除了记录对战历史的代码 - - conn.commit() - logger_duel.info(f"{winner} 在决斗中击败 {loser},胜者积分 +{winner_points},败者积分 -{loser_points},使用道具: {used_item or '无'}") - - return (winner_points, loser_points) # 返回实际积分变化 - - except sqlite3.Error as e: - logger_duel.error(f"记录决斗结果失败: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - except Exception as e: - logger_duel.error(f"记录决斗结果时发生未知错误: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - -class HarryPotterDuel: - """决斗功能""" - - def __init__(self, player1, player2, group_id, player1_is_challenger=True): - """ - 初始化决斗 - :param player1: 玩家1的名称 - :param player2: 玩家2的名称 - :param group_id: 群组ID - :param player1_is_challenger: 玩家1是否为决斗发起者 - """ - # 确保只在群聊中决斗 - if not group_id: - raise ValueError("决斗功能只支持群聊") - - self.player1 = { - "name": player1, - "hp": 100, - "spells": [], - "is_challenger": player1_is_challenger - } - self.player2 = { - "name": player2, - "hp": 100, - "spells": [], - "is_challenger": not player1_is_challenger - } - self.rounds = 0 - self.steps = [] - self.group_id = group_id # 记录群组ID - - # 检测是否为Boss战(对手是AI"泡泡") - self.is_boss_fight = (player2 == "泡泡") - - # Boss战特殊设置 - if self.is_boss_fight: - # Boss战胜率极低,设为10% - self.player_win_chance = 0.1 - # 添加Boss战提示信息 - self.steps.append("⚔️ Boss战开始 ⚔️\n挑战强大的魔法师泡泡!") - - # 设置防御成功率 - self.defense_success_rate = 0.3 - - # 咒语列表(名称、威力、权重)- 权重越小越稀有 - self.spells = [ - {"name": "除你武器", "power": 10, "weight": 30, "desc": "🪄", - "attack_desc": ["挥动魔杖划出一道弧线,魔杖尖端发出红光,释放", "伸手一指对手的魔杖,大声喊道", "用魔杖直指对手,施放缴械咒"], - "damage_desc": ["被红光击中,魔杖瞬间脱手飞出", "的魔杖被一股无形力量扯离手掌,飞向远处", "手中魔杖突然被击飞,不得不空手应对"]}, - - {"name": "昏昏倒地", "power": 25, "weight": 25, "desc": "✨", - "attack_desc": ["魔杖发出耀眼的红光,发射昏迷咒", "快速挥舞魔杖,释放出一道猩红色闪光", "高声呼喊咒语,杖尖喷射出红色火花"], - "damage_desc": ["被红光击中,意识开始模糊,几近昏迷", "躲闪不及,被击中后身体摇晃,眼神涣散", "被咒语命中,双腿一软,差点跪倒在地"]}, - - {"name": "统统石化", "power": 40, "weight": 20, "desc": "💫", - "attack_desc": ["直指对手,魔杖尖端射出蓝白色光芒,施放石化咒", "魔杖在空中划过一道蓝光,精准施放", "双目紧盯对手,冷静施展全身束缚咒"], - "damage_desc": ["身体被蓝光罩住,四肢瞬间变得僵硬如石", "全身突然绷紧,像被无形的绳索紧紧束缚", "动作突然凝固,仿佛变成了一座雕像"]}, - - {"name": "障碍重重", "power": 55, "weight": 15, "desc": "⚡", - "attack_desc": ["魔杖猛地向前一挥,发射出闪亮的紫色光束", "大声念出咒语,同时杖尖射出炫目光芒", "旋转魔杖制造出一道旋转的障碍咒"], - "damage_desc": ["被一股无形的力量狠狠推开,猛烈撞上后方障碍物", "身体被击中后像断线风筝般飞出数米,重重摔落", "被强大的冲击波掀翻在地,一时无法站起"]}, - - {"name": "神锋无影", "power": 70, "weight": 10, "desc": "🗡️", - "attack_desc": ["低声念诵,魔杖如剑般挥下", "以危险的低沉嗓音念诵咒语,杖尖闪烁着寒光", "用魔杖在空中划出复杂轨迹,释放斯内普的秘咒"], - "damage_desc": ["身上突然出现多道无形的切割伤口,鲜血喷涌而出", "惨叫一声,胸前与面部浮现出深深的伤痕,鲜血直流", "被无形的刀刃划过全身,衣物和皮肤同时被割裂,伤痕累累"]}, - - {"name": "钻心剜骨", "power": 85, "weight": 5, "desc": "🔥", - "attack_desc": ["眼中闪过一丝狠厉,用尖利的声音喊出不可饶恕咒", "面露残忍笑容,魔杖直指对手施放酷刑咒", "用充满恶意的声音施放黑魔法,享受对方的痛苦"], - "damage_desc": ["被咒语击中,全身每一根神经都在燃烧般剧痛,倒地挣扎哀嚎", "发出撕心裂肺的惨叫,痛苦地在地上痉挛扭曲", "遭受前所未有的剧痛折磨,脸上血管暴起,痛不欲生"]}, - - {"name": "阿瓦达索命", "power": 100, "weight": 1, "desc": "💀", - "attack_desc": ["用充满杀意的声音念出死咒,魔杖喷射出刺目的绿光", "冷酷无情地发出致命死咒,绿光直射对手", "毫无犹豫地使用了最邪恶的不可饶恕咒,绿光闪耀"], - "damage_desc": ["被绿光正面击中,生命瞬间被夺走,眼神空洞地倒下", "还未来得及反应,生命便随着绿光的接触戛然而止", "被死咒击中,身体僵直地倒下,生命气息完全消失"]} - ] - - # 防御咒语列表(名称、描述)- 统一使用self.defense_success_rate作为成功率 - self.defense_spells = [ - {"name": "盔甲护身", "desc": "🛡️", - "defense_desc": ["迅速在身前制造出一道透明魔法屏障,挡住了攻击", "挥动魔杖在周身形成一道金色防御光幕,抵消了咒语", "大声喊出咒语,召唤出强力的防护盾牌"]}, - - {"name": "除你武器", "desc": "⚔️", - "defense_desc": ["用缴械咒反击,成功击飞对方魔杖", "喊道出魔咒,让对手的魔咒偏离方向", "巧妙反击,用缴械咒化解了对手的攻击"]}, - - {"name": "呼神护卫", "desc": "🧿", - "defense_desc": ["全神贯注地召唤出银色守护神,抵挡住了攻击", "魔杖射出耀眼银光,形成守护屏障吸收了咒语", "集中思念快乐回忆,释放出强大的守护神魔法"]} - ] - - # 设置胜利描述 - self.victory_descriptions = [ - "让对手失去了战斗能力", - "最终击倒了对手", - "的魔法取得了胜利", - "的致命一击决定了结果", - "的魔法赢得了这场决斗", - "对魔法的控制带来了胜利", - "在激烈的对决中占据上风", - "毫无悬念地获胜" - ] - - # 记录开场信息 - if not self.is_boss_fight: - self.steps.append(f"⚔️ 决斗开始 ⚔️\n{self.player1['name']} VS {self.player2['name']}") - - def select_spell(self): - """随机选择一个咒语,威力越高出现概率越低""" - weights = [spell["weight"] for spell in self.spells] - total_weight = sum(weights) - normalized_weights = [w/total_weight for w in weights] - return random.choices(self.spells, weights=normalized_weights, k=1)[0] - - def attempt_defense(self): - """尝试防御,返回是否成功和使用的防御咒语""" - defense = random.choice(self.defense_spells) - success = random.random() < self.defense_success_rate - return success, defense - - def start_duel(self): - """开始决斗,返回决斗过程的步骤列表""" - # 创建积分系统实例,整个方法中重用 - rank_system = DuelRankSystem(self.group_id) - - # --- 修改:提前获取双方玩家数据 --- - player1_data = rank_system.get_player_data(self.player1["name"]) - player2_data = rank_system.get_player_data(self.player2["name"]) - # --------------------------------------------- - - # Boss战特殊处理 - if self.is_boss_fight: - # 生成随机的Boss战斗过程 - boss_battle_descriptions = [ - f"🔮 强大的Boss泡泡挥动魔杖,释放出一道耀眼的紫色光束,{self.player1['name']}勉强躲开!", - f"⚡ {self.player1['name']}尝试施放昏昏倒地,但泡泡像预知一般轻松侧身避过!", - f"🌪️ 泡泡召唤出一阵魔法旋风,将{self.player1['name']}的咒语全部吹散!", - f"🔥 {self.player1['name']}使出全力施放火焰咒,泡泡却用一道水盾将其熄灭!", - f"✨ 双方魔杖相对,杖尖迸发出耀眼的金色火花,魔力在空中碰撞!", - f"🌟 泡泡释放出数十个魔法分身,{self.player1['name']}不知道哪个是真身!", - f"🧙 {self.player1['name']}召唤出守护神,但在泡泡强大的黑魔法面前迅速消散!", - f"⚔️ 一连串快速的魔咒交锋,魔法光束在空中交织成绚丽的网!", - f"🛡️ 泡泡创造出一道几乎无法破解的魔法屏障,{self.player1['name']}的咒语无法穿透!", - f"💫 {self.player1['name']}施放最强一击,能量波动让整个决斗场地震颤!" - ] - - # 只随机选择一条战斗描述添加(减少刷屏) - self.steps.append(random.choice(boss_battle_descriptions)) - - # 检查是否战胜Boss(极低概率) - if random.random() < self.player_win_chance: # 玩家赢了 - winner, loser = self.player1, self.player2 - - # 添加胜利转折点描述 - victory_turn = [ - f"✨ 关键时刻,{winner['name']}找到了泡泡防御的破绽!", - f"🌟 命运女神眷顾了{winner['name']},一个意外的反弹击中了泡泡的要害!", - f"💥 在泡泡即将施放致命一击时,{winner['name']}突然爆发出前所未有的魔法力量!" - ] - self.steps.append(random.choice(victory_turn)) - - # 随机获得一件装备 - items = ["elder_wand", "magic_stone", "invisibility_cloak"] - item_names = {"elder_wand": "老魔杖", "magic_stone": "魔法石", "invisibility_cloak": "隐身衣"} - - try: - with rank_system._db_lock: - with rank_system._get_db_conn() as conn: - cursor = conn.cursor() - - # 获取当前玩家的道具数量 - sql_query = """ - SELECT elder_wand, magic_stone, invisibility_cloak - FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_query, (self.group_id, winner["name"])) - result = cursor.fetchone() - - if result: - # 更新玩家数据,增加道具 - sql_update = """ - UPDATE duel_players SET - elder_wand = elder_wand + 1, - magic_stone = magic_stone + 1, - invisibility_cloak = invisibility_cloak + 1, - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - winner_points = 300 # 胜利积分固定为500分 - cursor.execute(sql_update, (winner_points, self.group_id, winner["name"])) - - # 移除了记录对战历史的代码 - - conn.commit() - - # 查询更新后玩家排名 - sql_rank = """ - SELECT COUNT(*) + 1 as rank - FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - ) - """ - cursor.execute(sql_rank, (self.group_id, self.group_id, winner["name"])) - rank_result = cursor.fetchone() - rank = rank_result["rank"] if rank_result else None - - rank_text = f"第{rank}名" if rank else "暂无排名" - - # 添加获得装备的信息 - result = ( - f"🏆 {winner['name']} 以不可思议的实力击败了强大的Boss泡泡!\n\n" - f"获得了三件死亡圣器!\n" - f" 🪄 💎 🧥 \n\n" - f"积分: +{winner_points}分 ({rank_text})" - ) - - self.steps.append(result) - return self.steps - else: - # 玩家不存在,这种情况理论上不可能发生,但为安全添加 - logger_duel.error(f"Boss战获胜但找不到玩家 {winner['name']} 数据") - except sqlite3.Error as e: - logger_duel.error(f"处理Boss战胜利时出错: {e}", exc_info=True) - self.steps.append(f"⚠️ 处理战利品时遇到问题: {e}") - return self.steps - - else: # 玩家输了 - winner, loser = self.player2, self.player1 - - # 添加失败结局描述 - 更恐怖、更简洁的描述 - defeat_end = [ - f"💀 泡泡瞬间爆发出令人胆寒的强大魔力,{loser['name']}甚至来不及反应就被击倒在地!", - f"⚰️ 只见泡泡轻轻挥动魔杖,{loser['name']}如遭雷击,整个人被恐怖的魔法能量碾压!", - f"☠️ 泡泡展现出真正的实力,一道黑色闪电瞬间击穿{loser['name']}的所有防御!" - ] - self.steps.append(random.choice(defeat_end)) - - try: - with rank_system._db_lock: - with rank_system._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新失败者数据 - sql_update = """ - UPDATE duel_players SET - score = MAX(1, score - 100), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update, (self.group_id, loser["name"])) - - # 移除了记录对战历史的代码 - - conn.commit() - except sqlite3.Error as e: - logger_duel.error(f"处理Boss战失败时出错: {e}", exc_info=True) - - result = ( - f"💀 {loser['name']} 不敌强大的Boss泡泡!\n\n" - f"积分: -100分\n" - f"再接再厉,下次挑战吧!" - ) - - self.steps.append(result) - return self.steps - - # --- 新增:开局检查双方隐身衣 --- - p1_cloak = player1_data["items"].get("invisibility_cloak", 0) > 0 - p2_cloak = player2_data["items"].get("invisibility_cloak", 0) > 0 - - if p1_cloak and not p2_cloak: # 只有 Player1 有隐身衣 - winner, loser = self.player1, self.player2 - winner_points = 30 - loser_points = 30 - used_item = "invisibility_cloak" - self.steps.append(f"🧥 {winner['name']} 开局使用了隐身衣,潜行偷袭,直接获胜!") - # 直接调用记录结果函数处理数据库和返回消息 - return self._handle_direct_win(rank_system, winner, loser, winner_points, loser_points, used_item, player1_data) - elif not p1_cloak and p2_cloak: # 只有 Player2 有隐身衣 - winner, loser = self.player2, self.player1 - winner_points = 30 - loser_points = 30 - used_item = "invisibility_cloak" - self.steps.append(f"🧥 {winner['name']} 开局使用了隐身衣,潜行偷袭,直接获胜!") - # 直接调用记录结果函数处理数据库和返回消息 - return self._handle_direct_win(rank_system, winner, loser, winner_points, loser_points, used_item, player2_data) - elif p1_cloak and p2_cloak: # 双方都有隐身衣 - self.steps.append(f"🧥 双方都试图使用隐身衣,魔法相互干扰,隐身效果失效!决斗正常进行!") - # (可选)可以在这里添加消耗双方隐身衣的逻辑,但为了简化,暂时不加 - # --- 隐身衣检查结束 --- - - # 普通决斗流程,保持原有逻辑 - # 根据决斗发起者设置先手概率 - if self.player1["is_challenger"]: - # 获取挑战者的排名和总玩家数 - challenger = self.player1["name"] - challenger_rank, _ = rank_system.get_player_rank(challenger) - - # 获取总玩家数 - all_players = rank_system.get_rank_list(9999) # 获取所有玩家 - total_players = len(all_players) - - # 计算先手概率:基础概率50% + (排名/总人数)*30% - # 如果没有排名或总玩家数为0,则使用基础概率50% - if challenger_rank is not None and total_players > 0: - # 排名越大(越靠后),先手优势越大 - first_attack_prob = 0.5 + (challenger_rank / total_players) * 0.3 - else: - first_attack_prob = 0.5 # 默认概率 - - current_attacker = "player1" if random.random() < first_attack_prob else "player2" - else: - # 获取挑战者的排名和总玩家数 - challenger = self.player2["name"] - challenger_rank, _ = rank_system.get_player_rank(challenger) - - # 获取总玩家数 - all_players = rank_system.get_rank_list(9999) # 获取所有玩家 - total_players = len(all_players) - - # 计算先手概率:基础概率50% + (排名/总人数)*30% - # 如果没有排名或总玩家数为0,则使用基础概率50% - if challenger_rank is not None and total_players > 0: - # 排名越大(越靠后),先手优势越大 - first_attack_prob = 0.5 + (challenger_rank / total_players) * 0.3 - else: - first_attack_prob = 0.5 # 默认概率 - - current_attacker = "player2" if random.random() < first_attack_prob else "player1" - - # 随机选择先手介绍语 - first_move_descriptions = [ - "抢先出手,迅速进入战斗状态,", - "反应更快,抢得先机,", - "魔杖一挥,率先发动攻击,", - "眼疾手快,先发制人,", - "气势如虹,先声夺人,", - "以迅雷不及掩耳之势抢先出手," - ] - - # 记录所有魔法分数的总和 - total_magic_power = 0 - - # 一击必胜模式,只有一回合 - self.rounds = 1 - - # 确定当前回合的攻击者和防御者 - if current_attacker == "player1": - attacker = self.player1 - defender = self.player2 - else: - attacker = self.player2 - defender = self.player1 - - # 选择咒语 - spell = self.select_spell() - - # 记录使用的魔法分数 - total_magic_power += spell["power"] - attacker["spells"].append(spell) - - # 先手介绍与咒语专属攻击描述组合在一起 - first_move_desc = random.choice(first_move_descriptions) - # 从咒语的专属攻击描述中随机选择一个 - spell_attack_desc = random.choice(spell["attack_desc"]) - attack_info = f"🎲 {attacker['name']} {first_move_desc}{spell_attack_desc} {spell['name']}{spell['desc']}" - self.steps.append(attack_info) - - # 尝试防御 - defense_success, defense = self.attempt_defense() - - if defense_success: - # 防御成功,使用防御咒语的专属描述 - defense_desc = random.choice(defense["defense_desc"]) - defense_info = f"{defender['name']} {defense_desc},使用 {defense['name']}{defense['desc']} 防御成功!" - self.steps.append(defense_info) - - # 记录防御使用的魔法分数 - for defense_spell in self.defense_spells: - if defense_spell["name"] == defense["name"]: - total_magic_power += 20 # 防御魔法固定20分 - break - - # 转折描述与反击描述组合 - counter_transition = [ - "防御成功后立即抓住机会反击,", - "挡下攻击的同时,立刻准备反攻,", - "借着防御的势头,迅速转为攻势,", - "一个漂亮的防御后,立刻发起反击,", - "丝毫不给对手喘息的机会,立即反击," - ] - - # 反制:防守方变为攻击方 - counter_spell = self.select_spell() - - # 记录反制使用的魔法分数 - total_magic_power += counter_spell["power"] - defender["spells"].append(counter_spell) - - # 转折与咒语专属反击描述组合在一起 - counter_transition_desc = random.choice(counter_transition) - # 从反制咒语的专属攻击描述中随机选择一个 - counter_spell_attack_desc = random.choice(counter_spell["attack_desc"]) - counter_info = f"{defender['name']} {counter_transition_desc}{counter_spell_attack_desc} {counter_spell['name']}{counter_spell['desc']}" - self.steps.append(counter_info) - - # 显示反击造成的伤害描述 - counter_damage_desc = random.choice(counter_spell["damage_desc"]) - if current_attacker == "player1": - damage_info = f"{self.player1['name']} {counter_damage_desc}!" - else: - damage_info = f"{self.player2['name']} {counter_damage_desc}!" - self.steps.append(damage_info) - - # 防御成功并反制,原攻击者直接失败 - if current_attacker == "player1": - self.player1["hp"] = 0 - winner, loser = self.player2, self.player1 - else: - self.player2["hp"] = 0 - winner, loser = self.player1, self.player2 - else: - # 防御失败,直接被击败 - # 从攻击咒语的专属伤害描述中随机选择一个 - damage_desc = random.choice(spell["damage_desc"]) - damage_info = f"{defender['name']} {damage_desc}!" - self.steps.append(damage_info) - - if current_attacker == "player1": - self.player2["hp"] = 0 - winner, loser = self.player1, self.player2 - else: - self.player1["hp"] = 0 - winner, loser = self.player2, self.player1 - - # --- 修改:获取胜利者和失败者的最新数据 --- - # 在决斗结束后,重新获取双方数据以确保道具数量是当前的 - winner_data = rank_system.get_player_data(winner["name"]) - loser_data = rank_system.get_player_data(loser["name"]) - # --------------------------------------- - - # --- 修改:道具效果处理逻辑 --- - used_item_winner = None # 记录胜利者使用的道具 - used_item_loser = None # 记录失败者使用的道具 - winner_points = total_magic_power # 基础胜利积分 - loser_points = total_magic_power # 基础失败扣分 - - # 检查失败者是否有魔法石 - 失败不扣分 - if loser_data["items"].get("magic_stone", 0) > 0: - self.steps.append(f"💎 {loser['name']} 使用了魔法石,虽然失败但是痊愈了!") - used_item_loser = "magic_stone" - loser_points = 0 # 不扣分 - - # 检查胜利者是否有老魔杖 - 胜利积分×5 (独立于魔法石判断) - if winner_data["items"].get("elder_wand", 0) > 0: - # 如果失败者没用魔法石,才显示胜利加成信息(避免信息重复) - if used_item_loser != "magic_stone": - self.steps.append(f"🪄 {winner['name']} 使用了老魔杖,魔法威力增加了五倍!") - else: # 如果失败者用了魔法石,补充说明胜利者也用了老魔杖 - self.steps.append(f"🪄 同时,{winner['name']} 使用了老魔杖,得分加倍!") - used_item_winner = "elder_wand" - winner_points *= 5 # 积分乘以5 - - # --- 整合使用的道具信息 --- - # 注意:record_duel_result 目前只支持记录一个 used_item - # 为了兼容,优先记录影响积分计算的道具 - final_used_item = used_item_winner or used_item_loser # 优先记录胜利者道具,其次失败者道具 - # -------------------------- - - # 使用 record_duel_result 方法记录结果并更新数据库 - try: - # 调用新的记录结果方法,它会处理积分更新、道具消耗和历史记录 - # **注意:需要修改 record_duel_result 来正确处理道具消耗** - actual_winner_points, actual_loser_points = rank_system.record_duel_result( - winner=winner["name"], - loser=loser["name"], - winner_points=winner_points, - loser_points=loser_points, # 传递可能为0的扣分值 - total_magic_power=total_magic_power, - used_item=final_used_item # 传递最终决定的使用道具 - ) - logger_duel.info(f"数据库更新成功: 胜者 {winner['name']} +{actual_winner_points}, 败者 {loser['name']} -{actual_loser_points}") - except Exception as e: - logger_duel.error(f"调用 record_duel_result 时发生错误: {e}", exc_info=True) - self.steps.append(f"⚠️ 保存决斗结果时发生错误: {e}") - - # 获取胜利者当前排名 - rank, _ = rank_system.get_player_rank(winner["name"]) - rank_text = f"第{rank}名" if rank else "暂无排名" - - # 重新获取玩家数据以显示正确的道具数量 (在调用 record_duel_result 后获取) - updated_winner_data = rank_system.get_player_data(winner["name"]) - updated_loser_data = rank_system.get_player_data(loser["name"]) - - # 选择胜利描述 - victory_desc = random.choice(self.victory_descriptions) - - # 结果信息 - result = ( - f"🏆 {winner['name']} {victory_desc}!\n\n" - f"积分: {winner['name']} +{winner_points}分 ({rank_text})\n" # 显示计算出的得分 - f"{loser['name']} -{loser_points}分" # 显示计算出的扣分 (可能为0) - ) - - # 如果使用了道具,显示剩余次数 - if used_item_winner == "elder_wand": - result += f"\n\n📦 {winner['name']} 剩余老魔杖: {updated_winner_data['items'].get('elder_wand', 0)}次" - if used_item_loser == "magic_stone": - result += f"\n\n📦 {loser['name']} 剩余魔法石: {updated_loser_data['items'].get('magic_stone', 0)}次" - # 如果开局隐身衣获胜,这里不会执行 - - # 添加结果 - self.steps.append(result) - return self.steps - - # --- 新增:处理隐身衣直接获胜的辅助方法 --- - def _handle_direct_win(self, rank_system, winner, loser, winner_points, loser_points, used_item, winner_original_data): - """处理因隐身衣直接获胜的情况,更新数据库并格式化消息""" - try: - # 直接调用 record_duel_result 来处理数据库更新 - # 注意:这里 total_magic_power 为 0,因为没有进行魔法对决 - rank_system.record_duel_result( - winner=winner["name"], - loser=loser["name"], - winner_points=winner_points, - loser_points=loser_points, - total_magic_power=0, # 隐身衣获胜没有魔法力计算 - used_item=used_item - ) - logger_duel.info(f"{winner['name']} 使用隐身衣击败 {loser['name']},积分 +{winner_points}") - - # 重新获取更新后的玩家数据以显示剩余道具 - updated_winner_data = rank_system.get_player_data(winner["name"]) - - except sqlite3.Error as e: - logger_duel.error(f"处理隐身衣胜利时数据库出错: {e}", exc_info=True) - self.steps.append(f"⚠️ 处理隐身衣胜利时遇到数据库问题: {e}") - # 数据库出错时,仍使用原始数据显示结果,避免程序崩溃 - updated_winner_data = winner_original_data - except Exception as e: # 捕获其他可能的异常 - logger_duel.error(f"处理隐身衣胜利时发生未知错误: {e}", exc_info=True) - self.steps.append(f"⚠️ 处理隐身衣胜利时发生内部错误: {e}") - updated_winner_data = winner_original_data - - # 获取胜利者当前排名 - rank, _ = rank_system.get_player_rank(winner["name"]) - rank_text = f"第{rank}名" if rank else "暂无排名" - - # 添加结果 - result = ( - f"🏆 {winner['name']} 使用隐身衣获胜!\n\n" - f"积分: {winner['name']} +{winner_points}分 ({rank_text})\n" - f"{loser['name']} -{loser_points}分\n\n" - f"📦 剩余隐身衣: {updated_winner_data['items'].get('invisibility_cloak', 0)}次" - ) - self.steps.append(result) - return self.steps - # --- 辅助方法结束 --- - -def start_duel(player1: str, player2: str, group_id=None, player1_is_challenger=True) -> List[str]: - """ - 启动一场决斗 - - Args: - player1: 玩家1的名称 - player2: 玩家2的名称 - group_id: 群组ID,必须提供 - player1_is_challenger: 玩家1是否为挑战发起者 - - Returns: - List[str]: 决斗过程的步骤 - """ - # 确保只在群聊中决斗 - if not group_id: - return ["❌ 决斗功能只支持群聊"] - - try: - duel = HarryPotterDuel(player1, player2, group_id, player1_is_challenger) - return duel.start_duel() - except Exception as e: - logging.error(f"决斗过程中发生错误: {e}") - return [f"决斗过程中发生错误: {e}"] - -def get_rank_list(top_n: int = 10, group_id=None) -> str: - """获取排行榜信息 - - Args: - top_n: 返回前几名 - group_id: 群组ID,必须提供 - """ - # 确保只在群聊中获取排行榜 - if not group_id: - return "❌ 决斗排行榜功能只支持群聊" - - try: - rank_system = DuelRankSystem(group_id) - ranks = rank_system.get_rank_list(top_n) - - if not ranks: - return "📊 决斗排行榜还没有数据" - - result = [f"📊 本群决斗排行榜 Top {len(ranks)}"] - for i, player in enumerate(ranks): - medal = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else f"{i+1}." - win_rate = int((player["wins"] / player["total_matches"]) * 100) if player["total_matches"] > 0 else 0 - result.append(f"{medal} {player['name']}: {player['score']}分 ({player['wins']}/{player['losses']}/{win_rate}%)") - - return "\n".join(result) - except Exception as e: - logging.error(f"获取排行榜失败: {e}") - return f"获取排行榜失败: {e}" - -def get_player_stats(player_name: str, group_id=None) -> str: - """获取玩家战绩 - - Args: - player_name: 玩家名称 - group_id: 群组ID,必须提供 - """ - # 确保只在群聊中获取战绩 - if not group_id: - return "❌ 决斗战绩查询功能只支持群聊" - - try: - rank_system = DuelRankSystem(group_id) - rank, player_data = rank_system.get_player_rank(player_name) - - win_rate = int((player_data["wins"] / player_data["total_matches"]) * 100) if player_data["total_matches"] > 0 else 0 - - result = [ - f"📊 {player_name} 的本群决斗战绩", - f"排名: {rank if rank else '暂无排名'}", - f"积分: {player_data['score']}", - f"胜场: {player_data['wins']}", - f"败场: {player_data['losses']}", - f"总场次: {player_data['total_matches']}", - f"胜率: {win_rate}%" - ] - - return "\n".join(result) - except Exception as e: - logging.error(f"获取玩家战绩失败: {e}") - return f"获取玩家战绩失败: {e}" - -def change_player_name(old_name: str, new_name: str, group_id=None) -> str: - """更改玩家名称 - - Args: - old_name: 旧名称 - new_name: 新名称 - group_id: 群组ID,必须提供 - - Returns: - str: 操作结果消息 - """ - # 确保只在群聊中更改玩家名称 - if not group_id: - return "❌ 更改玩家名称功能只支持群聊" - - try: - rank_system = DuelRankSystem(group_id) - result = rank_system.change_player_name(old_name, new_name) - - if result: - return f"✅ 已成功将决斗记录中的玩家 \"{old_name}\" 重命名为 \"{new_name}\"" - else: - return f"❌ 改名失败:请确认 \"{old_name}\" 在本群中有战绩记录,且 \"{new_name}\" 名称未被使用" - except Exception as e: - logging.error(f"更改玩家名称失败: {e}") - return f"❌ 更改玩家名称失败: {e}" - -class DuelManager: - """决斗管理器,处理决斗线程和消息发送""" - - def __init__(self, message_sender_func): - """ - 初始化决斗管理器 - - Args: - message_sender_func: 消息发送函数,接收(message, receiver)两个参数 - """ - self.message_sender = message_sender_func - self._duel_thread = None - self._duel_lock = Lock() - self.LOG = logging.getLogger("DuelManager") - - def send_duel_message(self, msg: str, receiver: str) -> None: - """发送决斗消息 - - Args: - msg: 消息内容 - receiver: 接收者ID(通常是群ID) - """ - try: - self.LOG.info(f"发送决斗消息 To {receiver}: {msg[:20]}...") - self.message_sender(msg, receiver) - except Exception as e: - self.LOG.error(f"发送决斗消息失败: {e}") - - def run_duel(self, challenger_name, opponent_name, receiver, is_group=False): - """在单独线程中运行决斗 - - Args: - challenger_name: 挑战者名称 - opponent_name: 对手名称 - receiver: 消息接收者(群id) - is_group: 是否是群聊 - """ - try: - # 确保只在群聊中运行决斗 - if not is_group: - self.send_duel_message("❌ 决斗功能只支持群聊", receiver) - return - - # 开始决斗 - # 传递群组ID参数 - group_id = receiver - duel_steps = start_duel(challenger_name, opponent_name, group_id, True) # challenger_name是发起者 - - # 逐步发送决斗过程 - for step in duel_steps: - self.send_duel_message(step, receiver) - time.sleep(1.5) # 每步之间添加适当延迟 - except Exception as e: - self.LOG.error(f"决斗过程中发生错误: {e}") - self.send_duel_message(f"决斗过程中发生错误: {e}", receiver) - finally: - # 释放决斗线程 - with self._duel_lock: - self._duel_thread = None - self.LOG.info("决斗线程已结束并销毁") - - def start_duel_thread(self, challenger_name, opponent_name, receiver, is_group=False): - """启动决斗线程 - - Args: - challenger_name: 挑战者名称 - opponent_name: 对手名称 - receiver: 消息接收者 - is_group: 是否是群聊 - - Returns: - bool: 是否成功启动决斗线程 - """ - with self._duel_lock: - if self._duel_thread is not None and self._duel_thread.is_alive(): - return False - - self._duel_thread = Thread( - target=self.run_duel, - args=(challenger_name, opponent_name, receiver, is_group), - daemon=True - ) - self._duel_thread.start() - return True - - def is_duel_running(self): - """检查是否有决斗正在进行 - - Returns: - bool: 是否有决斗正在进行 - """ - with self._duel_lock: - return self._duel_thread is not None and self._duel_thread.is_alive() - -# --- 新增:偷袭成功/失败的随机句子 --- -SNEAK_ATTACK_SUCCESS_MESSAGES = [ - "趁其不备,{attacker} 悄悄从 {target} 的口袋里摸走了 {points} 积分!真是个小机灵鬼!👻", - "月黑风高夜,正是下手时!{attacker} 成功偷袭 {target},顺走了 {points} 积分!🌙", - "{target} 一时大意,被 {attacker} 抓住了破绽,损失了 {points} 积分!💸", - "神不知鬼不觉,{attacker} 从 {target} 那里\"借\"来了 {points} 积分!🤫", - "手法娴熟!{attacker} 像一阵风一样掠过,{target} 发现时已经少了 {points} 积分!💨", -] - -SNEAK_ATTACK_FAILURE_MESSAGES = [ - "哎呀!{attacker} 的鬼祟行踪被 {target} 发现了,偷袭失败!👀", - "{target} 警惕性很高,{attacker} 的小动作没能得逞。🛡️", - "差点就成功了!可惜 {attacker} 不小心弄出了声响,被 {target} 逮个正着!🔔", - "{target} 哼了一声:\"就这点伎俩?\" {attacker} 的偷袭计划泡汤了。😏", - "运气不佳,{attacker} 刚伸手就被 {target} 的护身符弹开了,偷袭失败!✨", - "{attacker} 脚底一滑,在 {target} 面前摔了个狗啃泥,偷袭什么的早就忘光了!🤣", - "{target} 突然转身,和 {attacker} 对视,场面一度十分尴尬... 偷袭失败!😅", - "{attacker} 刚准备动手,{target} 的口袋里突然钻出一只嗅嗅,叼走了 {attacker} 的...嗯?偷袭失败!👃", - "{target} 拍了拍 {attacker} 的肩膀:\"兄弟,想啥呢?\",{attacker} 只好悻悻收手。🤝", - "一阵妖风刮过,把 {attacker} 准备用来偷袭的工具吹跑了... 时运不济啊!🌬️", - "{attacker} 发现 {target} 的口袋是画上去的!可恶,被摆了一道!🖌️", -] - -# --- 新增:偷到道具的随机句子 --- -SNEAK_ATTACK_ITEM_SUCCESS_MESSAGES = [ - "趁乱摸鱼!{attacker} 竟然从 {target} 身上摸走了一件 {item_name_cn}!真是妙手空空!👏", - "运气爆棚!{attacker} 偷袭失败,但顺走了 {target} 的一件 {item_name_cn}!🥳", - "{target} 光顾着得意,没注意到 {attacker} 悄悄拿走了一件 {item_name_cn}!🤭", - "失之东隅,收之桑榆。{attacker} 虽然没偷到分,但拐走了一件 {item_name_cn}!🎁", - "神偷再现!{attacker} 从 {target} 那里顺走了一件 {item_name_cn}!🔮", -] - -# --- 新增:道具英文名到中文名的映射 --- -ITEM_NAME_MAP = { - "elder_wand": "老魔杖 🪄", - "magic_stone": "魔法石 💎", - "invisibility_cloak": "隐身衣 🧥" -} - -# --- 新增:处理偷袭逻辑的函数 --- -def attempt_sneak_attack(attacker_name: str, target_name: str, group_id: str) -> str: - """ - 处理玩家尝试偷袭另一个玩家的逻辑 - - Args: - attacker_name: 偷袭者名称 - target_name: 被偷袭者名称 - group_id: 群组ID - - Returns: - str: 偷袭结果的消息 - """ - if not group_id: - return "❌ 偷袭功能也只支持群聊哦。" - - try: - rank_system = DuelRankSystem(group_id) - - # 检查玩家是否存在 - with rank_system._db_lock: - with rank_system._get_db_conn() as conn: - cursor = conn.cursor() - - # 检查偷袭者是否存在 - cursor.execute( - "SELECT COUNT(*) as count FROM duel_players WHERE group_id = ? AND player_name = ?", - (group_id, attacker_name) - ) - if cursor.fetchone()["count"] == 0: - return f"❌ 偷袭发起者 {attacker_name} 还没有决斗记录。" - - # 检查目标是否存在 - cursor.execute( - "SELECT COUNT(*) as count FROM duel_players WHERE group_id = ? AND player_name = ?", - (group_id, target_name) - ) - if cursor.fetchone()["count"] == 0: - return f"❌ 目标 {target_name} 还没有决斗记录。" - - # 获取偷袭者排名 - cursor.execute(""" - SELECT COUNT(*) + 1 as rank FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - )""", (group_id, group_id, attacker_name)) - attacker_rank_result = cursor.fetchone() - attacker_rank = attacker_rank_result["rank"] if attacker_rank_result else None - - # 获取目标排名 - cursor.execute(""" - SELECT COUNT(*) + 1 as rank FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - )""", (group_id, group_id, target_name)) - target_rank_result = cursor.fetchone() - target_rank = target_rank_result["rank"] if target_rank_result else None - - # 获取总玩家数 - cursor.execute("SELECT COUNT(*) as count FROM duel_players WHERE group_id = ?", (group_id,)) - total_players = cursor.fetchone()["count"] - - # 计算成功率 - success_prob = 0.3 # 基础成功率 30% - - # 计算概率加成(仅当双方都有排名且总人数大于0时) - if attacker_rank is not None and target_rank is not None and total_players > 0: - if attacker_rank > target_rank: # 偷袭者排名更低 - rank_difference = attacker_rank - target_rank - # 排名差值影响概率,最多增加 40% - success_prob += min((rank_difference / total_players) * 0.4, 0.4) - # else: 偷袭者排名更高或相同,使用基础概率 30% - - # 确保概率在 0 到 1 之间 - success_prob = max(0, min(1, success_prob)) - - # 格式化概率显示为0-100%的百分比 - prob_percent = success_prob * 100 - logger_duel.info(f"偷袭计算: {attacker_name}({attacker_rank}) vs {target_name}({target_rank}), 总人数: {total_players}, 成功率: {prob_percent:.1f}%") - - roll_successful = random.random() < success_prob - points_exchanged_successfully = False # 标记是否成功转移了分数 - - # 决定偷袭是否成功 - if roll_successful: - # --- 偷袭概率判定成功,尝试计算分数转移 --- - # 获取分数差 - cursor.execute(""" - SELECT t1.score as attacker_score, t2.score as target_score - FROM duel_players t1, duel_players t2 - WHERE t1.group_id = ? AND t1.player_name = ? - AND t2.group_id = ? AND t2.player_name = ? - """, (group_id, attacker_name, group_id, target_name)) - result = cursor.fetchone() - # 添加检查,以防万一查询不到结果 - if not result: - logger_duel.error(f"偷袭成功后查询分数失败: {attacker_name} vs {target_name}") - return "❌ 处理偷袭时发生内部错误:无法获取玩家分数。" - - attacker_score = result["attacker_score"] - target_score = result["target_score"] - - # 1. 计算潜在偷取分数 - score_difference = abs(attacker_score - target_score) - potential_points_stolen = max(random.randint(10, 50), int(score_difference * 0.1)) # 偷取(10-50)或分数差的10%,取最大值 - - # 2. 计算目标实际能损失的最大分数 (最低保留1分) - max_points_target_can_lose = max(0, target_score - 1) - - # 3. 确定实际交换的分数 - actual_points_exchanged = min(potential_points_stolen, max_points_target_can_lose) - - # 只有实际交换分数大于0时才更新数据库和记录历史 - if actual_points_exchanged > 0: - # 更新分数 (零和交换) - cursor.execute( - "UPDATE duel_players SET score = score + ? WHERE group_id = ? AND player_name = ?", - (actual_points_exchanged, group_id, attacker_name) - ) - cursor.execute( - "UPDATE duel_players SET score = score - ? WHERE group_id = ? AND player_name = ?", - (actual_points_exchanged, group_id, target_name) - ) - - # 移除了记录到历史记录的代码 - - # 提交事务 - conn.commit() - logger_duel.info(f"偷袭成功: {attacker_name} 偷取 {target_name} {actual_points_exchanged} 分 (原目标分数: {target_score}, 潜在偷取: {potential_points_stolen})") - - # 选择并格式化成功消息 (使用 actual_points_exchanged) - message_template = random.choice(SNEAK_ATTACK_SUCCESS_MESSAGES) - result_message = message_template.format(attacker=attacker_name, target=target_name, points=actual_points_exchanged) - - points_exchanged_successfully = True # 标记成功转移了分数 - return result_message # 只有在成功转移分数时才直接返回 - else: - # 如果实际交换分数为0 (例如目标只有1分) - logger_duel.info(f"偷袭概率判定成功但未发生分数转移: {attacker_name} 偷袭 {target_name} (目标分数: {target_score}),转为尝试偷道具...") - # 不设置 points_exchanged_successfully = True - # 不返回,继续执行下面的偷道具逻辑 - - # --- 如果偷袭概率判定失败,或者判定成功但未转移分数,则尝试偷道具 --- - if not points_exchanged_successfully: # 这个条件覆盖了概率判定失败和概率判定成功但未转移分数两种情况 - # 根据情况选择日志消息 - if not roll_successful: # 如果是概率判定失败的情况 - logger_duel.info(f"偷袭分数失败: {attacker_name} 偷袭 {target_name}. 尝试根据目标道具数量计算偷道具概率...") - - # --- 修改:提前获取目标道具信息以计算概率 --- - cursor.execute(""" - SELECT elder_wand, magic_stone, invisibility_cloak - FROM duel_players - WHERE group_id = ? AND player_name = ? - """, (group_id, target_name)) - target_items_result = cursor.fetchone() # 使用新变量名避免混淆 - - item_steal_prob = 0.0 # 初始化概率为 0 - total_items_count = 0 - - if target_items_result: - # 计算总道具数量 - total_items_count = (target_items_result["elder_wand"] + - target_items_result["magic_stone"] + - target_items_result["invisibility_cloak"]) - - # 计算动态概率,每件道具增加 1% - item_steal_prob = total_items_count * 0.01 - logger_duel.info(f"目标共有 {total_items_count} 件道具,计算出的偷道具概率为: {item_steal_prob*100:.1f}% ") - else: - # 如果查询不到目标道具信息(理论上不应发生,因为前面检查过玩家存在) - logger_duel.warning(f"未能查询到目标 {target_name} 的道具信息,无法计算偷道具概率。") - item_steal_prob = 0.0 # 无法计算则概率为0 - - # --- 使用计算出的 item_steal_prob 进行判断 --- - if total_items_count > 0 and random.random() < item_steal_prob: - # --- 概率判定成功,且目标确实有道具可偷 --- - logger_duel.info(f"偷道具判定成功 (概率 {item_steal_prob*100:.1f}%),开始选择道具...") - - # --- 复用之前获取的 target_items_result 构建列表 --- - available_item_names = [] - item_weights = [] - - if target_items_result["elder_wand"] > 0: - available_item_names.append("elder_wand") - item_weights.append(target_items_result["elder_wand"]) - - if target_items_result["magic_stone"] > 0: - available_item_names.append("magic_stone") - item_weights.append(target_items_result["magic_stone"]) - - if target_items_result["invisibility_cloak"] > 0: - available_item_names.append("invisibility_cloak") - item_weights.append(target_items_result["invisibility_cloak"]) - - # 这个检查理论上可以省略,因为前面 total_items_count > 0 已经保证了列表非空 - # 但为了代码健壮性可以保留 - if available_item_names: - # 根据权重随机选择一件道具 - item_stolen = random.choices(available_item_names, weights=item_weights, k=1)[0] - item_name_cn = ITEM_NAME_MAP.get(item_stolen, item_stolen) - - # 更新数据库:目标减道具,攻击者加道具 - sql_update_target = f"UPDATE duel_players SET {item_stolen} = MAX(0, {item_stolen} - 1) WHERE group_id = ? AND player_name = ?" - sql_update_attacker = f"UPDATE duel_players SET {item_stolen} = {item_stolen} + 1 WHERE group_id = ? AND player_name = ?" - - cursor.execute(sql_update_target, (group_id, target_name)) - cursor.execute(sql_update_attacker, (group_id, attacker_name)) - conn.commit() # 偷道具成功,提交事务 - - # 选择并格式化偷道具成功消息 - message_template = random.choice(SNEAK_ATTACK_ITEM_SUCCESS_MESSAGES) - result_message = message_template.format(attacker=attacker_name, target=target_name, item_name_cn=item_name_cn) - logger_duel.info(f"偷道具成功: {attacker_name} 偷取了 {target_name} 的 {item_stolen}") - # 偷到道具直接返回,不再执行后面的失败逻辑 - return result_message - else: - # 如果因为某种原因(例如并发问题),刚才还有道具现在没了 - logger_duel.warning(f"尝试偷取 {target_name} 道具时发现其道具列表为空,虽然 total_items_count > 0。") - # 这里会继续执行下面的通用失败逻辑 - - # --- 偷道具判定失败 或 目标没有任何道具 --- - # (包括 total_items_count 为 0 的情况, 以及 random.random() >= item_steal_prob 的情况) - message_template = random.choice(SNEAK_ATTACK_FAILURE_MESSAGES) - result_message = message_template.format(attacker=attacker_name, target=target_name) - if total_items_count == 0: - logger_duel.info(f"偷袭完全失败: {attacker_name} 偷袭 {target_name},且目标没有任何道具。") - else: - logger_duel.info(f"偷袭完全失败: {attacker_name} 偷袭 {target_name},未达到偷道具概率 {item_steal_prob*100:.1f}%。") - return result_message - - except sqlite3.Error as e: - logger_duel.error(f"处理偷袭时发生数据库错误: {e}", exc_info=True) - return f"处理偷袭时发生内部错误: {e}" - except Exception as e: - logger_duel.error(f"处理偷袭时发生未知错误: {e}", exc_info=True) - return f"处理偷袭时发生内部错误: {e}" - diff --git a/robot.py b/robot.py index f452b00..e06d745 100644 --- a/robot.py +++ b/robot.py @@ -20,7 +20,6 @@ from ai_providers.ai_gemini import Gemini from ai_providers.ai_perplexity import Perplexity from function.func_weather import Weather from function.func_news import News -from function.func_duel import start_duel, get_rank_list, get_player_stats, change_player_name, DuelManager, attempt_sneak_attack from function.func_summary import MessageSummary # 导入新的MessageSummary类 from function.func_reminder import ReminderManager # 导入ReminderManager类 from configuration import Config @@ -28,13 +27,8 @@ from constants import ChatType from job_mgmt import Job from function.func_xml_process import XmlProcessor -# 导入命令路由系统 +# 导入Function Call系统 from commands.context import MessageContext -from commands.router import CommandRouter -from commands.registry import COMMANDS, get_commands_info -from commands.handlers import handle_chitchat # 导入闲聊处理函数 - -# 导入AI路由系统 from commands.ai_router import ai_router import commands.ai_functions # 导入以注册所有AI功能 @@ -54,7 +48,6 @@ class Robot(Job): self.wxid = self.wcf.get_self_wxid() # 获取机器人自己的wxid self.allContacts = self.getAllContacts() self._msg_timestamps = [] - self.duel_manager = DuelManager(self.sendDuelMsg) try: db_path = "data/message_history.db" @@ -165,12 +158,8 @@ class Robot(Job): # 初始化图像生成管理器 self.image_manager = ImageGenerationManager(self.config, self.wcf, self.LOG, self.sendTextMsg) - # 初始化命令路由器 - self.command_router = CommandRouter(COMMANDS, robot_instance=self) - self.LOG.info(f"命令路由系统初始化完成,共加载 {len(COMMANDS)} 条命令") - - # 初始化AI路由器 - self.LOG.info(f"AI路由系统初始化完成,共加载 {len(ai_router.functions)} 个AI功能") + # Function Call系统已自动加载 + self.LOG.info(f"🚀 Function Call系统初始化完成,共加载 {len(ai_router.functions)} 个智能功能") # 初始化提醒管理器 try: @@ -181,8 +170,10 @@ class Robot(Job): except Exception as e: self.LOG.error(f"初始化提醒管理器失败: {e}", exc_info=True) - # 输出命令列表信息,便于调试 - # self.LOG.debug(get_commands_info()) # 如果需要在日志中输出所有命令信息,取消本行注释 + # 输出AI功能列表信息,便于调试 + if self.LOG.isEnabledFor(logging.DEBUG): + for name, func in ai_router.functions.items(): + self.LOG.debug(f"AI功能: {name} - {func.description} (scope: {func.scope}, need_at: {func.need_at})") @staticmethod def value_check(args: dict) -> bool: @@ -192,85 +183,43 @@ class Robot(Job): def processMsg(self, msg: WxMsg) -> None: """ - 处理收到的微信消息 + 处理收到的微信消息 - 纯Function Call实现 :param msg: 微信消息对象 """ try: - # 1. 使用MessageSummary记录消息(保持不变) + # 1. 使用MessageSummary记录消息 self.message_summary.process_message_from_wxmsg(msg, self.wcf, self.allContacts, self.wxid) - - # 2. 根据消息来源选择使用的AI模型 - self._select_model_for_message(msg) - - # 3. 获取本次对话特定的历史消息限制 - specific_limit = self._get_specific_history_limit(msg) - self.LOG.debug(f"本次对话 ({msg.sender} in {msg.roomid or msg.sender}) 使用历史限制: {specific_limit}") - - # 4. 预处理消息,生成MessageContext + + # 2. 预处理消息,生成MessageContext ctx = self.preprocess(msg) - # 确保context能访问到当前选定的chat模型及特定历史限制 setattr(ctx, 'chat', self.chat) - setattr(ctx, 'specific_max_history', specific_limit) - - # 5. 使用命令路由器分发处理消息 - handled = self.command_router.dispatch(ctx) - - # 6. 如果正则路由器没有处理,尝试AI路由器 - if not handled: - # 只在被@或私聊时才使用AI路由 - if (msg.from_group() and msg.is_at(self.wxid)) or not msg.from_group(): - print(f"[AI路由调试] 准备调用AI路由器处理消息: {msg.content}") - ai_handled = ai_router.dispatch(ctx) - print(f"[AI路由调试] AI路由器处理结果: {ai_handled}") - if ai_handled: - self.LOG.info("消息已由AI路由器处理") - print("[AI路由调试] 消息已成功由AI路由器处理") - return - else: - print("[AI路由调试] AI路由器未处理该消息") - - # 7. 如果没有命令处理器处理,则进行特殊逻辑处理 - if not handled: - # 7.1 好友请求自动处理 - if msg.type == 37: # 好友请求 - self.autoAcceptFriendRequest(msg) + + # 3. 直接使用Function Call系统处理所有消息 + handled = ai_router.dispatch(ctx) + if handled: + return + + # 4. 特殊系统消息处理 + if msg.type == 37: # 好友请求 + self.autoAcceptFriendRequest(msg) + return + + elif msg.type == 10000: + # 处理新成员入群 + if "加入了群聊" in msg.content and msg.from_group(): + new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content) + if new_member_match: + inviter = new_member_match.group(1) + new_member = new_member_match.group(2) + welcome_msg = self.config.WELCOME_MSG.format(new_member=new_member, inviter=inviter) + self.sendTextMsg(welcome_msg, msg.roomid) + self.LOG.info(f"已发送欢迎消息给新成员 {new_member} 在群 {msg.roomid}") return - - # 7.2 系统消息处理 - elif msg.type == 10000: - # 7.2.1 处理新成员入群 - if "加入了群聊" in msg.content and msg.from_group(): - new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content) - if new_member_match: - inviter = new_member_match.group(1) # 邀请人 - new_member = new_member_match.group(2) # 新成员 - # 使用配置文件中的欢迎语,支持变量替换 - welcome_msg = self.config.WELCOME_MSG.format(new_member=new_member, inviter=inviter) - self.sendTextMsg(welcome_msg, msg.roomid) - self.LOG.info(f"已发送欢迎消息给新成员 {new_member} 在群 {msg.roomid}") - return - # 7.2.2 处理新好友添加 - elif "你已添加了" in msg.content: - self.sayHiToNewFriend(msg) - return - - # 7.3 群聊消息,且配置了响应该群 - if msg.from_group() and msg.roomid in self.config.GROUPS: - # 如果在群里被@了,但命令路由器没有处理,则进行闲聊 - if msg.is_at(self.wxid): - # 调用handle_chitchat函数处理闲聊,传递完整的上下文 - handle_chitchat(ctx, None) - else: - pass - - # 7.4 私聊消息,未被命令处理,进行闲聊 - elif not msg.from_group() and not msg.from_self(): - # 检查是否是文本消息(type 1)或者是包含用户输入的类型49消息 - if msg.type == 1 or (msg.type == 49 and ctx.text): - self.LOG.info(f"准备回复私聊消息: 类型={msg.type}, 文本内容='{ctx.text}'") - # 调用handle_chitchat函数处理闲聊,传递完整的上下文 - handle_chitchat(ctx, None) - + # 处理新好友添加 + elif "你已添加了" in msg.content: + self.sayHiToNewFriend(msg) + return + except Exception as e: self.LOG.error(f"处理消息时发生错误: {str(e)}", exc_info=True) @@ -416,36 +365,12 @@ class Robot(Job): for r in receivers: self.sendTextMsg(report, r) - def sendDuelMsg(self, msg: str, receiver: str) -> None: - """发送决斗消息,不受消息频率限制,不记入历史记录 - :param msg: 消息字符串 - :param receiver: 接收人wxid或者群id - """ - try: - self.wcf.send_text(f"{msg}", receiver, "") - except Exception as e: - self.LOG.error(f"发送决斗消息失败: {e}") - def cleanup_perplexity_threads(self): """清理所有Perplexity线程""" # 如果已初始化Perplexity实例,调用其清理方法 perplexity_instance = self.get_perplexity_instance() if perplexity_instance: perplexity_instance.cleanup() - - # 检查并等待决斗线程结束 - if hasattr(self, 'duel_manager') and self.duel_manager.is_duel_running(): - self.LOG.info("等待决斗线程结束...") - # 最多等待5秒 - for i in range(5): - if not self.duel_manager.is_duel_running(): - break - time.sleep(1) - - if self.duel_manager.is_duel_running(): - self.LOG.warning("决斗线程在退出时仍在运行") - else: - self.LOG.info("决斗线程已结束") def cleanup(self): """清理所有资源,在程序退出前调用""" @@ -485,7 +410,6 @@ class Robot(Job): return self.chat_models[ChatType.PERPLEXITY.value] return None - def _select_model_for_message(self, msg: WxMsg) -> None: """根据消息来源选择对应的AI模型 @@ -493,17 +417,17 @@ class Robot(Job): """ if not hasattr(self, 'chat_models') or not self.chat_models: return # 没有可用模型,无需切换 - + # 获取消息来源ID source_id = msg.roomid if msg.from_group() else msg.sender - + # 检查配置 if not hasattr(self.config, 'GROUP_MODELS'): # 没有配置,使用默认模型 if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] return - + # 群聊消息处理 if msg.from_group(): model_mappings = self.config.GROUP_MODELS.get('mapping', []) @@ -536,24 +460,24 @@ class Robot(Job): if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] return - + # 如果没有找到对应配置,使用默认模型 if self.default_model_id in self.chat_models: self.chat = self.chat_models[self.default_model_id] - + def _get_specific_history_limit(self, msg: WxMsg) -> int: """根据消息来源和配置,获取特定的历史消息数量限制 - + :param msg: 微信消息对象 :return: 历史消息数量限制,如果没有特定配置则返回None """ if not hasattr(self.config, 'GROUP_MODELS'): # 没有配置,使用当前模型默认值 return getattr(self.chat, 'max_history_messages', None) - + # 获取消息来源ID source_id = msg.roomid if msg.from_group() else msg.sender - + # 确定查找的映射和字段名 if msg.from_group(): mappings = self.config.GROUP_MODELS.get('mapping', []) @@ -561,7 +485,7 @@ class Robot(Job): else: mappings = self.config.GROUP_MODELS.get('private_mapping', []) key_field = 'wxid' - + # 在映射中查找特定配置 for mapping in mappings: if mapping.get(key_field) == source_id: @@ -574,7 +498,7 @@ class Robot(Job): # 找到了配置但没有max_history,使用模型默认值 self.LOG.debug(f"为 {source_id} 找到映射但无特定历史限制,使用模型默认值") break - + # 没有找到特定限制,使用当前模型的默认值 default_limit = getattr(self.chat, 'max_history_messages', None) self.LOG.debug(f"未找到 {source_id} 的特定历史限制,使用模型默认值: {default_limit}")