From 4419f16843112b9ece141a9f06323b7a5b212a05 Mon Sep 17 00:00:00 2001 From: zihanjian Date: Thu, 25 Sep 2025 11:32:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E4=BA=86=E7=82=B9=E4=B8=9C=E8=A5=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 - README.MD | 1 - ai_providers/ai_deepseek.py | 43 +- ai_providers/ai_gemini.py | 398 ------- commands/handlers.py | 12 +- config.yaml.template | 16 - configuration.py | 2 - constants.py | 14 +- function/func_duel.py | 1546 --------------------------- function_call_agent_flow.md | 87 ++ function_call_argument_reference.md | 105 ++ function_calls/handlers.py | 81 +- function_calls/models.py | 22 +- function_calls/router.py | 94 +- function_calls/services/__init__.py | 11 +- function_calls/services/help.py | 8 - image/__init__.py | 10 +- image/img_gemini_image.py | 113 -- image/img_manager.py | 31 +- image/文生图功能的使用说明.MD | 38 +- requirements.txt | 2 - robot.py | 42 - 22 files changed, 278 insertions(+), 2399 deletions(-) delete mode 100644 ai_providers/ai_gemini.py delete mode 100644 function/func_duel.py create mode 100644 function_call_agent_flow.md create mode 100644 function_call_argument_reference.md delete mode 100644 image/img_gemini_image.py diff --git a/.gitignore b/.gitignore index a8e62ae..7b46dd5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,4 @@ logs/ *.log.* config.yaml -duel_ranks.json data/ diff --git a/README.MD b/README.MD index c85f6b8..109b71b 100644 --- a/README.MD +++ b/README.MD @@ -67,7 +67,6 @@ Bubbles 是一个功能丰富的微信机器人框架,基于 [wcferry](https:/ - 支持为不同的群聊和私聊设置不同的 AI 模型和 system prompt - OpenAI (ChatGPT) - DeepSeek - - Gemini #### 🛠️ 双重路由系统 - **命令路由系统**:基于正则表达式的精确匹配,高效处理特定命令 diff --git a/ai_providers/ai_deepseek.py b/ai_providers/ai_deepseek.py index 2e11e1c..f39acc9 100644 --- a/ai_providers/ai_deepseek.py +++ b/ai_providers/ai_deepseek.py @@ -3,6 +3,7 @@ # -*- coding: utf-8 -*- import logging +from copy import deepcopy from datetime import datetime import time # 引入 time 模块 @@ -24,6 +25,10 @@ class DeepSeek(): self.model = conf.get("model", "deepseek-chat") # 读取最大历史消息数配置 self.max_history_messages = conf.get("max_history_messages", 10) # 读取配置,默认10条 + self.strict_functions = bool(conf.get("function_call_strict", False)) + if self.strict_functions and conf.get("api") is None: + # 严格模式需要 beta 端点 + api = "https://api.deepseek.com/beta" self.LOG = logging.getLogger("DeepSeek") # 存储传入的实例和wxid @@ -119,8 +124,44 @@ class DeepSeek(): self.LOG.error(f"发生未知错误:{str(e0)}") return "抱歉,处理您的请求时出现了错误" + def call_with_functions(self, messages, functions, wxid): + """Invoke DeepSeek function-calling API.""" + try: + tools = [] + for fn in functions: + payload = deepcopy(fn) + tool_entry = { + "type": "function", + "function": { + "name": payload.get("name"), + "description": payload.get("description", ""), + "parameters": payload.get("parameters", {}), + }, + } + if self.strict_functions: + tool_entry["function"]["strict"] = True + tools.append(tool_entry) + + params = { + "model": self.model, + "messages": messages, + "tools": tools, + "tool_choice": "auto", + "stream": False, + } + + response = self.client.chat.completions.create(**params) + return response + + except (APIConnectionError, APIError, AuthenticationError) as exc: + self.LOG.error(f"DeepSeek 函数调用失败: {exc}") + raise + except Exception as exc: # pragma: no cover - 兜底保护 + self.LOG.error(f"调用 DeepSeek 函数接口时发生未知错误: {exc}", exc_info=True) + raise + if __name__ == "__main__": # --- 测试代码需要调整 --- print("请注意:直接运行此文件进行测试需要模拟 MessageSummary 并提供 bot_wxid。") - pass \ No newline at end of file + pass diff --git a/ai_providers/ai_gemini.py b/ai_providers/ai_gemini.py deleted file mode 100644 index a99a5eb..0000000 --- a/ai_providers/ai_gemini.py +++ /dev/null @@ -1,398 +0,0 @@ -# ai_providers/ai_gemini.py -#! /usr/bin/env python3 -# -*- coding: utf-8 -*- - -import logging -import os -import time -import httpx -import pathlib # 用于处理文件路径 -import mimetypes # 用于猜测图片类型 -import google.generativeai as genai -from google.generativeai.types import generation_types, safety_types # 显式导入需要的类型 -from google.api_core.exceptions import GoogleAPICallError, ClientError - -# 引入 MessageSummary 类型提示 -try: - from function.func_summary import MessageSummary -except ImportError: - MessageSummary = object # Fallback - -class Gemini: - DEFAULT_MODEL = "gemini-1.5-pro-latest" - DEFAULT_PROMPT = "You are a helpful assistant." - DEFAULT_MAX_HISTORY = 15 - SAFETY_SETTINGS = { # 默认安全设置 - 可根据需要调整或从配置加载 - safety_types.HarmCategory.HARM_CATEGORY_HARASSMENT: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - safety_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - safety_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - safety_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - } - - def __init__(self, conf: dict, message_summary_instance: MessageSummary = None, bot_wxid: str = None) -> None: - self.LOG = logging.getLogger("Gemini") - self._api_key = conf.get("api_key") - self._model_name = conf.get("model_name", self.DEFAULT_MODEL) - # 存储原始配置的 prompt,用于初始化和可能的重载 - self._base_prompt = conf.get("prompt", self.DEFAULT_PROMPT) - self._proxy = conf.get("proxy") - self.max_history_messages = conf.get("max_history_messages", self.DEFAULT_MAX_HISTORY) - - self.message_summary = message_summary_instance - self.bot_wxid = bot_wxid - self._model = None - self.support_vision = False # 初始化时假设不支持,成功加载模型后再判断 - - if not self._api_key: - self.LOG.error("Gemini API Key 未在配置中提供!") - return # 没有 API Key 无法继续 - - if not self.message_summary: - self.LOG.warning("MessageSummary 实例未提供给 Gemini,上下文功能将不可用!") - if not self.bot_wxid: - self.LOG.warning("bot_wxid 未提供给 Gemini,可能无法正确识别机器人自身消息!") - - try: - # 1. 配置代理 (如果提供) - transport = None - if self._proxy: - try: - transport = httpx.HTTPTransport(proxy=self._proxy) - self.LOG.info(f"Gemini 使用代理: {self._proxy}") - except Exception as proxy_err: - self.LOG.error(f"配置 Gemini 代理失败: {proxy_err}", exc_info=True) - # 代理配置失败,可以选择不使用代理继续或直接失败 - # 这里选择继续,不使用代理 - transport = None - - # 2. 配置 Google AI Client - genai.configure(api_key=self._api_key, transport=transport) - - # 3. 初始化模型 - # 将基础 prompt 作为 system_instruction 传递 - self._model = genai.GenerativeModel( - self._model_name, - system_instruction=self._base_prompt, - safety_settings=self.SAFETY_SETTINGS # 应用安全设置 - ) - self.LOG.info(f"已加载 Gemini 模型") - - # 4. 检查视觉能力 (依赖模型名称的简单检查) - # 注意:更可靠的方式是调用 list_models 并检查支持的方法 - if "vision" in self._model_name or "pro" in self._model_name or "gemini-1.5" in self._model_name or "flash" in self._model_name: - self.support_vision = True - self.LOG.info(f"模型 {self._model_name} 被认为支持视觉能力。") - else: - self.LOG.info(f"模型 {self._model_name} 根据名称判断可能不支持视觉能力。") - - except (GoogleAPICallError, ClientError) as api_error: - self.LOG.error(f"初始化 Gemini 时发生 API 错误: {api_error}", exc_info=True) - self._model = None - except Exception as e: - self.LOG.error(f"初始化 Gemini 时发生未知错误: {e}", exc_info=True) - self._model = None - - def __repr__(self): - return f'Gemini(model={self._model_name}, initialized={self._model is not None})' - - @staticmethod - def value_check(conf: dict) -> bool: - # 只需要 API Key 是必须的 - return bool(conf and conf.get("api_key")) - - def _format_history(self, history: list) -> list: - """将数据库历史消息转换为 Gemini API 的 contents 格式""" - contents = [] - for msg in history: - role = "model" if msg.get("sender_wxid") == self.bot_wxid else "user" - content = msg.get('content', '') - sender_name = msg.get('sender', '未知用户') # 获取发送者名称 - - if content: # 避免添加空内容 - # Gemini 推荐 user role 包含发言者信息,model role 不需要 - if role == "user": - formatted_content = f"[{sender_name}]: {content}" # 添加发送者标记 - contents.append({'role': role, 'parts': [{'text': formatted_content}]}) - else: # role == "model" - contents.append({'role': role, 'parts': [{'text': content}]}) - return contents - - def _generate_response(self, contents: list, generation_config_override: generation_types.GenerationConfig | None = None) -> str: - """内部方法,用于调用 Gemini API 并处理响应""" - if not self._model: - return "Gemini 模型未成功初始化,请检查配置和网络。" - - # 配置生成参数 (可以从 config 中读取更多参数) - # 默认使用适中的温度,可以根据需要调整 - default_config = generation_types.GenerationConfig(temperature=0.7) - config_to_use = generation_config_override if generation_config_override else default_config - - self.LOG.debug(f"发送给 Gemini API 的内容条数: {len(contents)}") - # self.LOG.debug(f"使用的 GenerationConfig: {config_to_use}") - # self.LOG.debug(f"发送内容详情: {contents}") # DEBUG: 打印发送内容 - - rsp_text = "" - try: - # 调用 API - response = self._model.generate_content( - contents=contents, - generation_config=config_to_use, - # safety_settings=... # 如果需要覆盖初始化时的安全设置 - stream=False # 非流式响应 - ) - - # 1. 检查 Prompt 是否被阻止 (在获取 candidates 之前) - if response.prompt_feedback and response.prompt_feedback.block_reason: - reason = response.prompt_feedback.block_reason.name - self.LOG.warning(f"Gemini 提示被阻止,原因: {reason}") - return f"抱歉,您的请求因包含不适内容而被阻止 (原因: {reason})。" - - # 2. 检查 Candidates 和 Finish Reason - # 尝试从 response 中安全地提取文本 - try: - rsp_text = response.text # .text 是获取聚合文本的便捷方式 - except ValueError: - # 如果 .text 不可用 (例如,因为 finish_reason 不是 STOP 或 MAX_TOKENS) - # 检查具体的 finish_reason - if response.candidates: - candidate = response.candidates[0] - finish_reason = candidate.finish_reason # 类型是 FinishReason 枚举 - finish_reason_name = finish_reason.name - - if finish_reason == generation_types.FinishReason.SAFETY: - self.LOG.warning(f"Gemini 响应被安全策略阻止。") - # 可以尝试查看 safety_ratings 获取更详细信息 - ratings_info = getattr(candidate, 'safety_ratings', []) - self.LOG.debug(f"Safety Ratings: {ratings_info}") - return "抱歉,生成的响应可能包含不安全内容,已被阻止。" - elif finish_reason == generation_types.FinishReason.RECITATION: - self.LOG.warning(f"Gemini 响应因引用保护被阻止。") - return "抱歉,回答可能包含受版权保护的内容,已被部分阻止。" - elif finish_reason == generation_types.FinishReason.OTHER: - self.LOG.warning(f"Gemini 响应因未知原因停止 (FinishReason: OTHER)") - return "抱歉,生成响应时遇到未知问题。" - else: - # 包括 MAX_TOKENS, STOP 等预期情况,但没有文本 - self.LOG.warning(f"Gemini 未返回文本内容,但完成原因可接受: {finish_reason_name}") - return f"生成内容时遇到问题 (完成原因: {finish_reason_name})" - else: - # 没有 candidates,也没有 prompt block,未知情况 - self.LOG.error("Gemini API 调用成功但未返回任何候选内容或提示反馈。") - return "抱歉,Gemini未能生成响应。" - - except (GoogleAPICallError, ClientError) as api_error: - self.LOG.error(f"Gemini API 调用错误:{api_error}", exc_info=True) - if "API key not valid" in str(api_error): - return "Gemini API 密钥无效或已过期。" - elif "quota" in str(api_error).lower(): - return "Gemini API 调用已达配额限制。" - elif "Model not found" in str(api_error): - return f"配置的 Gemini 模型 '{self._model_name}' 未找到或不可用。" - elif "Resource has been exhausted" in str(api_error): - return "Gemini API 资源耗尽,请稍后再试或检查配额。" - else: - return f"与 Gemini 通信时出错: {type(api_error).__name__}" - except generation_types.StopCandidateException as sce: # 明确捕获这个 - self.LOG.error(f"Gemini API 响应被停止 (StopCandidateException): {sce}", exc_info=True) - # 通常在流式处理中遇到,但也可能在非流式中因某些原因触发 - return "抱歉,Gemini 生成的响应被意外停止了。" - # BlockedPromptException 似乎不直接抛出,而是通过 prompt_feedback 反馈 - # except generation_types.BlockedPromptException as bpe: - # self.LOG.error(f"Gemini API 提示被阻止:{bpe}", exc_info=True) - # return "抱歉,您的请求内容被 Gemini 阻止了。" - except Exception as e: - self.LOG.error(f"调用 Gemini 时发生未知错误: {e}", exc_info=True) - return f"处理您的请求时发生未知错误: {type(e).__name__}" - - return rsp_text.strip() - - def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None) -> str: - if not self._model: - return "Gemini 模型未成功初始化,请检查配置和网络。" - - if not question: - self.LOG.warning(f"尝试为 wxid={wxid} 获取答案,但问题为空。") - return "您没有提问哦。" - - # 1. 准备历史消息 - contents = [] - if self.message_summary and self.bot_wxid: - history = self.message_summary.get_messages(wxid) - limit = specific_max_history if specific_max_history is not None else self.max_history_messages - self.LOG.debug(f"获取 Gemini 历史 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit}") - - if limit > 0: - history = history[-limit:] - elif limit == 0: - history = [] # 明确清空历史 - - self.LOG.debug(f"应用限制后 Gemini 历史条数: {len(history)}") - contents.extend(self._format_history(history)) - else: - self.LOG.warning(f"无法为 wxid={wxid} 获取 Gemini 历史记录。") - - # 2. 添加当前用户问题 - # 注意:格式化时已包含发送者信息 - contents.append({'role': 'user', 'parts': [{'text': question}]}) - - # 3. 处理 System Prompt Override (如果提供) - # 注意:Gemini API 目前不直接支持在 generate_content 中覆盖 system_instruction - # 如果需要动态改变系统提示,通常需要重新初始化模型或在用户消息前插入一条 'user' role 的指令 - # 这里我们暂时忽略 system_prompt_override,因为标准 API 调用不支持 - if system_prompt_override: - self.LOG.warning("Gemini API 当前不支持单次请求覆盖系统提示,将使用初始化时的提示。") - # 可以考虑在这里将 override 的内容作为一条 user message 添加到 contents 开头 - # 例如: contents.insert(0, {'role': 'user', 'parts': [{'text': f"[System Instruction Override]: {system_prompt_override}"}]}) - # 但这会影响对话历史的结构,需要谨慎使用 - - # 4. 添加当前时间信息(可选,作为用户消息的一部分) - now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # 可以将时间信息添加到最近的用户消息中,或作为一条新的 user message - # 为了简单,暂不自动添加时间信息到内容中,如果需要,可以在 prompt 中说明 - - # 5. 调用内部生成方法 - return self._generate_response(contents) - - def get_image_description(self, image_path: str, prompt: str = "请详细描述这张图片中的内容") -> str: - if not self._model: - return "Gemini 模型未初始化。" - if not self.support_vision: - return f"当前 Gemini 模型 '{self._model_name}' 不支持图片理解。" - - image_path_obj = pathlib.Path(image_path) - if not image_path_obj.is_file(): - self.LOG.error(f"图片文件不存在或不是文件: {image_path}") - return "无法读取图片文件" - - try: - # 猜测 MIME 类型 - mime_type, _ = mimetypes.guess_type(image_path_obj) - if not mime_type or not mime_type.startswith("image/"): - self.LOG.warning(f"无法确定图片 MIME 类型或类型不是 image/*: {image_path}, 猜测为 jpeg") - mime_type = "image/jpeg" # 使用默认值 - - self.LOG.info(f"使用 Gemini 分析图片: {image_path} (MIME: {mime_type})") - - # 使用 pathlib 生成 file URI - image_uri = image_path_obj.absolute().as_uri() - - image_part = {'mime_type': mime_type, 'data': image_path_obj.read_bytes()} - - # 构建包含文本提示和图片的消息 - contents = [ - # Gemini 处理多模态输入时,推荐 prompt 和 image 都在 user role 的 parts 里 - {'role': 'user', 'parts': [ - {'text': prompt}, - image_part - # 或者使用 from_uri (如果 API 支持且网络可访问该文件 URI): - # genai.types.Part.from_uri(mime_type=mime_type, uri=image_uri) - # 使用原始字节通常更可靠 - ]} - ] - - # 可以为图片分析设置不同的生成参数,例如更低的温度以获得更客观的描述 - image_gen_config = generation_types.GenerationConfig(temperature=0.4) - - # 调用内部生成方法 - return self._generate_response(contents, generation_config_override=image_gen_config) - - except FileNotFoundError: - self.LOG.error(f"读取图片文件时发生 FileNotFoundError: {image_path}", exc_info=True) - return "读取图片文件时出错。" - except Exception as e: - self.LOG.error(f"使用 Gemini 分析图片时发生未知错误: {e}", exc_info=True) - return f"分析图片时发生未知错误: {type(e).__name__}" - -# --- Main 测试部分 --- -if __name__ == "__main__": - print("--- 运行 Gemini 本地测试 ---") - # 配置日志记录 - logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - - # --- 配置加载 --- - # !!! 强烈建议从环境变量或安全的配置文件加载 API Key !!! - # 例如: api_key = os.environ.get("GEMINI_API_KEY") - # 不要将 API Key 硬编码在代码中提交 - api_key_from_env = os.environ.get("GEMINI_API_KEY") - proxy_from_env = os.environ.get("HTTP_PROXY") # 支持 http/https 代理 - - if not api_key_from_env: - print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - print("!!! 警告:环境变量 GEMINI_API_KEY 未设置。请设置该变量。 !!!") - print("!!! 测试将无法连接到 Gemini API。 !!!") - print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") - # 可以选择退出或继续(如果只想测试初始化逻辑) - # exit(1) - api_key_to_use = "DUMMY_KEY_FOR_INIT_TEST" # 仅用于测试初始化日志,无法实际调用 - else: - api_key_to_use = api_key_from_env - - mock_config = { - "api_key": api_key_to_use, - "model_name": "gemini-1.5-flash-latest", # 使用较快的模型测试 - "prompt": "你是一个风趣幽默的AI助手,擅长讲冷笑话。", - "proxy": proxy_from_env, - "max_history_messages": 3 # 测试时减少历史记录 - } - print(f"测试配置: Model={mock_config['model_name']}, Proxy={'已设置' if mock_config['proxy'] else '未设置'}") - - # --- 初始化 Gemini --- - # 在测试中不依赖 MessageSummary - print("\n--- 初始化 Gemini 实例 ---") - gemini_assistant = Gemini(mock_config, bot_wxid="test_bot_wxid") # 提供一个测试 bot_wxid - - # --- 测试文本生成 --- - if gemini_assistant._model: # 检查模型是否成功初始化 - print("\n--- 测试文本生成 (get_answer) ---") - test_question = "你好!今天天气怎么样?给我讲个关于程序员的冷笑话吧。" - print(f"提问: {test_question}") - start_time = time.time() - answer = gemini_assistant.get_answer(test_question, "test_user_wxid") # 提供测试 wxid - end_time = time.time() - print(f"\nGemini 回答 (耗时: {end_time - start_time:.2f}s):\n{answer}") - - # 测试空问题 - print("\n--- 测试空问题 ---") - empty_answer = gemini_assistant.get_answer("", "test_user_wxid") - print(f"空问题回答: {empty_answer}") - - # 测试长对话历史(如果需要,可以手动构建一个 contents 列表来模拟) - # print("\n--- 模拟长对话测试 ---") - # mock_history = [ - # {'role': 'user', 'parts': [{'text': "[UserA]: 第一次提问"}]}, - # {'role': 'model', 'parts': [{'text': "第一次回答"}]}, - # {'role': 'user', 'parts': [{'text': "[UserB]: 第二次提问"}]}, - # {'role': 'model', 'parts': [{'text': "第二次回答"}]}, - # {'role': 'user', 'parts': [{'text': "[UserA]: 第三次提问,关于第一次提问的内容"}]}, - # ] - # mock_history.append({'role': 'user', 'parts': [{'text': "当前的第四个问题"}]}) - # long_hist_answer = gemini_assistant._generate_response(mock_history) - # print(f"长历史回答:\n{long_hist_answer}") - - else: - print("\n--- Gemini 初始化失败,跳过文本生成测试 ---") - - # --- 测试图片描述 (可选) --- - if gemini_assistant._model and gemini_assistant.support_vision: - print("\n--- 测试图片描述 (get_image_description) ---") - # 将 'path/to/your/test_image.jpg' 替换为实际的图片路径 - image_test_path_str = "test_image.jpg" # 假设图片在脚本同目录下 - image_test_path = pathlib.Path(image_test_path_str) - - if image_test_path.exists(): - desc_prompt = "详细描述这张图片里的所有元素和场景氛围。" - print(f"图片路径: {image_test_path.absolute()}") - print(f"描述提示: {desc_prompt}") - start_time = time.time() - description = gemini_assistant.get_image_description(str(image_test_path), desc_prompt) - end_time = time.time() - print(f"\n图片描述 (耗时: {end_time - start_time:.2f}s):\n{description}") - else: - print(f"\n跳过图片测试,测试图片文件未找到: {image_test_path.absolute()}") - print("请将一张名为 test_image.jpg 的图片放在脚本相同目录下进行测试。") - elif gemini_assistant._model: - print(f"\n--- 跳过图片测试,当前模型 {gemini_assistant._model_name} 不支持视觉 ---") - else: - print("\n--- Gemini 初始化失败,跳过图片描述测试 ---") - - print("\n--- Gemini 本地测试结束 ---") \ No newline at end of file diff --git a/commands/handlers.py b/commands/handlers.py index ca5a0ef..851d38a 100644 --- a/commands/handlers.py +++ b/commands/handlers.py @@ -4,8 +4,6 @@ from typing import Optional, Match, Dict, Any import json # 确保已导入json from datetime import datetime # 确保已导入datetime import os # 导入os模块用于文件路径操作 -# from function.func_duel import DuelRankSystem - # 前向引用避免循环导入 from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -26,14 +24,6 @@ def handle_help(ctx: 'MessageContext', match: Optional[Match]) -> bool: "- 新闻", "- ask [问题]", "", - "【决斗 & 偷袭】", - "- 决斗@XX", - "- 偷袭@XX", - "- 决斗排行/排行榜", - "- 我的战绩/决斗战绩", - "- 我的装备/查看装备", - "- 改名 [旧名] [新名]", - "", "【提醒】", "- 提醒xxxxx:一次性、每日、每周", "- 查看提醒/我的提醒/提醒列表", @@ -1072,4 +1062,4 @@ def handle_weather_forecast(ctx: 'MessageContext', match: Optional[Match]) -> bo ctx.logger.error(f"获取城市 {city_name}({city_code}) 天气预报时出错: {e}", exc_info=True) ctx.send_text(f"😥 获取 {city_name} 天气预报时遇到问题,请稍后再试。") - return True \ No newline at end of file + return True diff --git a/config.yaml.template b/config.yaml.template index ed6005b..d517f03 100644 --- a/config.yaml.template +++ b/config.yaml.template @@ -107,13 +107,6 @@ deepseek: # -----deepseek配置这行不填----- show_reasoning: false # 是否在回复中显示思维过程,仅在启用思维链功能时有效 max_history_messages: 10 # <--- 添加这一行,设置 DeepSeek 最多回顾 10 条历史消息 -gemini: # -----gemini配置----- - api_key: "YOUR_GOOGLE_API_KEY" # 必须 - model_name: "gemini-1.5-pro-latest" # 可选, 默认是 "gemini-1.5-pro-latest" - prompt: "你是一个AI助手,请用通俗易懂的语言回答用户的问题。" # 可选 - proxy: "http://127.0.0.1:7890" # 可选, 代理地址 - max_history_messages: 20 # 可选, 对话历史长度 - aliyun_image: # -----如果要使用阿里云文生图,取消下面的注释并填写相关内容,模型到阿里云百炼找通义万相-文生图2.1-Turbo----- enable: true # 是否启用阿里文生图功能,false为关闭,默认开启,如果未配置,则会将消息发送给聊天大模型 api_key: sk-xxxxxxxxxxxxxxxxxxxxxxxx # 替换为你的DashScope API密钥 @@ -124,15 +117,6 @@ aliyun_image: # -----如果要使用阿里云文生图,取消下面的注释 trigger_keyword: 牛阿里 # 触发词,默认为"牛阿里" fallback_to_chat: true # 当服务不可用时是否转发给聊天模型处理 -gemini_image: # -----谷歌AI画图配置这行不填----- - enable: true # 是否启用谷歌AI画图功能 - api_key: # 谷歌Gemini API密钥,必填 - model: gemini-2.0-flash-exp-image-generation # 模型名称,建议保持默认,只有这一个模型可以进行绘画 - temp_dir: ./geminiimg # 图片保存目录,可选 - trigger_keyword: 牛谷歌 # 触发词,默认为"牛谷歌" - fallback_to_chat: false # 未启用时是否回退到聊天模式 - proxy: http://127.0.0.1:7890 # 使用Clash代理,格式为:http://域名或者IP地址:端口号 - perplexity: # -----perplexity配置这行不填----- key: # 填写你的Perplexity API Key api: https://api.perplexity.ai # API地址 diff --git a/configuration.py b/configuration.py index 7e7e9cd..fc86b39 100644 --- a/configuration.py +++ b/configuration.py @@ -40,7 +40,5 @@ class Config(object): self.DEEPSEEK = yconfig.get("deepseek", {}) self.PERPLEXITY = yconfig.get("perplexity", {}) self.ALIYUN_IMAGE = yconfig.get("aliyun_image", {}) - self.GEMINI_IMAGE = yconfig.get("gemini_image", {}) - self.GEMINI = yconfig.get("gemini", {}) self.MAX_HISTORY = yconfig.get("MAX_HISTORY", 300) self.SEND_RATE_LIMIT = yconfig.get("send_rate_limit", 0) diff --git a/constants.py b/constants.py index f34e8aa..0dc7cd0 100644 --- a/constants.py +++ b/constants.py @@ -6,17 +6,15 @@ class ChatType(IntEnum): # UnKnown = 0 # 未知, 即未设置 CHATGPT = 1 # ChatGPT DEEPSEEK = 2 # DeepSeek - GEMINI = 3 # Gemini - PERPLEXITY = 4 # Perplexity + PERPLEXITY = 3 # Perplexity @staticmethod def is_in_chat_types(chat_type: int) -> bool: - if chat_type in [ChatType.CHATGPT.value, - ChatType.DEEPSEEK.value, - ChatType.PERPLEXITY.value, - ChatType.GEMINI.value]: - return True - return False + return chat_type in { + ChatType.CHATGPT.value, + ChatType.DEEPSEEK.value, + ChatType.PERPLEXITY.value, + } @staticmethod def help_hint() -> str: diff --git a/function/func_duel.py b/function/func_duel.py deleted file mode 100644 index a8357a2..0000000 --- a/function/func_duel.py +++ /dev/null @@ -1,1546 +0,0 @@ -import random -import logging -import time -import json -import os -import sqlite3 -from typing import List, Dict, Tuple, Optional, Any -from threading import Thread, Lock - -# 获取 Logger 实例 -logger_duel = logging.getLogger("DuelRankSystem") - -# 排位积分系统 -class DuelRankSystem: - # 使用线程锁确保数据库操作的线程安全 - _db_lock = Lock() - - def __init__(self, group_id=None, db_path="data/message_history.db"): - """ - 初始化排位系统 - - Args: - group_id: 群组ID - db_path: 数据库文件路径 - """ - # 确保group_id不为空,现在只支持群聊 - if not group_id: - raise ValueError("决斗功能只支持群聊") - - self.group_id = group_id - self.db_path = db_path - self._init_db() # 初始化数据库 - - def _get_db_conn(self) -> sqlite3.Connection: - """获取数据库连接""" - try: - conn = sqlite3.connect(self.db_path, timeout=10, check_same_thread=False) - conn.row_factory = sqlite3.Row # 让查询结果可以像字典一样访问列 - return conn - except sqlite3.Error as e: - logger_duel.error(f"无法连接到 SQLite 数据库 '{self.db_path}': {e}", exc_info=True) - raise # 连接失败是严重问题,直接抛出异常 - - def _init_db(self): - """初始化数据库,创建表(如果不存在)""" - sql_create_players = """ - CREATE TABLE IF NOT EXISTS duel_players ( - group_id TEXT NOT NULL, - player_name TEXT NOT NULL, - score INTEGER DEFAULT 1000, - wins INTEGER DEFAULT 0, - losses INTEGER DEFAULT 0, - total_matches INTEGER DEFAULT 0, - elder_wand INTEGER DEFAULT 0, - magic_stone INTEGER DEFAULT 0, - invisibility_cloak INTEGER DEFAULT 0, - last_updated TEXT, - PRIMARY KEY (group_id, player_name) - ); - """ - # 移除了 duel_history 表的创建语句 - # 移除了相关索引的创建语句 - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - cursor.execute(sql_create_players) - # 移除了执行创建 duel_history 表和索引的命令 - conn.commit() - logger_duel.info("数据库表 'duel_players' 检查/创建 完成。") - except sqlite3.Error as e: - logger_duel.error(f"创建/检查数据库表失败: {e}", exc_info=True) - raise # 初始化失败是严重问题 - - def get_player_data(self, player_name: str) -> Dict: - """获取玩家数据,如果不存在则创建""" - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - # 查询玩家数据 - sql_query = """ - SELECT * FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_query, (self.group_id, player_name)) - result = cursor.fetchone() - - if result: - # 将 sqlite3.Row 转换为字典 - player_data = dict(result) - # 构造特殊的 items 字典 - player_data["items"] = { - "elder_wand": player_data.pop("elder_wand", 0), - "magic_stone": player_data.pop("magic_stone", 0), - "invisibility_cloak": player_data.pop("invisibility_cloak", 0) - } - return player_data - else: - # 玩家不存在,创建新玩家 - default_data = { - "score": 1000, - "wins": 0, - "losses": 0, - "total_matches": 0, - "items": { - "elder_wand": 0, - "magic_stone": 0, - "invisibility_cloak": 0 - } - } - - # 插入新玩家数据 - sql_insert = """ - INSERT INTO duel_players - (group_id, player_name, score, wins, losses, total_matches, - elder_wand, magic_stone, invisibility_cloak, last_updated) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) - """ - cursor.execute(sql_insert, ( - self.group_id, - player_name, - default_data["score"], - default_data["wins"], - default_data["losses"], - default_data["total_matches"], - default_data["items"]["elder_wand"], - default_data["items"]["magic_stone"], - default_data["items"]["invisibility_cloak"] - )) - conn.commit() - - logger_duel.info(f"创建了新玩家: {player_name} 在群组 {self.group_id}") - return default_data - - except sqlite3.Error as e: - logger_duel.error(f"获取玩家数据失败: {e}", exc_info=True) - # 出错时返回默认数据 - return { - "score": 1000, - "wins": 0, - "losses": 0, - "total_matches": 0, - "items": { - "elder_wand": 0, - "magic_stone": 0, - "invisibility_cloak": 0 - } - } - - def update_score(self, winner: str, loser: str, winner_hp: int, rounds: int) -> Tuple[int, int]: - """更新玩家积分 - - Args: - winner: 胜利者名称 - loser: 失败者名称 - winner_hp: 胜利者剩余生命值 - rounds: 决斗回合数 - - Returns: - Tuple[int, int]: (胜利者获得积分, 失败者失去积分) - """ - # 获取玩家数据 - winner_data = self.get_player_data(winner) - loser_data = self.get_player_data(loser) - - # 基础积分计算 - 回合数越少积分越高 - base_points = 100 - if rounds <= 5: # 速战速决 - base_points = 100 - elif rounds <= 10: - base_points = 60 - elif rounds >= 15: # 长时间战斗 - base_points = 40 - - # 计算总积分变化(剩余生命值作为百分比加成) - hp_percent_bonus = winner_hp / 100.0 # 血量百分比 - points = int(base_points * (hp_percent_bonus)) # 血量越多,积分越高 - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新胜利者数据 - sql_update_winner = """ - UPDATE duel_players SET - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_winner, (points, self.group_id, winner)) - - # 更新失败者数据 - sql_update_loser = """ - UPDATE duel_players SET - score = MAX(1, score - ?), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_loser, (points, self.group_id, loser)) - - # 移除了记录对战历史的代码 - - conn.commit() - logger_duel.info(f"{winner} 击败 {loser},获得 {points} 积分") - - return (points, points) # 返回胜者得分和败者失分(相同) - - except sqlite3.Error as e: - logger_duel.error(f"更新积分失败: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - - def get_rank_list(self, top_n: int = 10) -> List[Dict]: - """获取排行榜 - - Args: - top_n: 返回前几名 - - Returns: - List[Dict]: 排行榜数据 - """ - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - sql_query = """ - SELECT player_name, score, wins, losses, total_matches, - elder_wand, magic_stone, invisibility_cloak - FROM duel_players - WHERE group_id = ? - ORDER BY score DESC - LIMIT ? - """ - cursor.execute(sql_query, (self.group_id, top_n)) - results = cursor.fetchall() - - # 转换结果为字典列表,格式与原JSON格式相同 - ranked_players = [] - for row in results: - player_dict = dict(row) - player_name = player_dict.pop("player_name") - - # 构造与原格式相同的字典 - player = { - "name": player_name, - "score": player_dict["score"], - "wins": player_dict["wins"], - "losses": player_dict["losses"], - "total_matches": player_dict["total_matches"], - "items": { - "elder_wand": player_dict["elder_wand"], - "magic_stone": player_dict["magic_stone"], - "invisibility_cloak": player_dict["invisibility_cloak"] - } - } - ranked_players.append(player) - - return ranked_players - - except sqlite3.Error as e: - logger_duel.error(f"获取排行榜失败: {e}", exc_info=True) - return [] # 出错时返回空列表 - - def get_player_rank(self, player_name: str) -> Tuple[Optional[int], Dict]: - """获取玩家排名 - - Args: - player_name: 玩家名称 - - Returns: - Tuple[Optional[int], Dict]: (排名, 玩家数据) - """ - # 获取玩家数据 - player_data = self.get_player_data(player_name) - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 查询排行榜中有哪些分数比该玩家高 - sql_rank = """ - SELECT COUNT(*) + 1 as rank - FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - ) - """ - cursor.execute(sql_rank, (self.group_id, self.group_id, player_name)) - result = cursor.fetchone() - - if result: - rank = result["rank"] - return rank, player_data - else: - # 找不到玩家排名,可能是新玩家 - return None, player_data - - except sqlite3.Error as e: - logger_duel.error(f"获取玩家排名失败: {e}", exc_info=True) - return None, player_data # 出错时返回None作为排名 - - def change_player_name(self, old_name: str, new_name: str) -> bool: - """更改玩家名称 - - Args: - old_name: 旧名称 - new_name: 新名称 - - Returns: - bool: 是否成功更改 - """ - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 开启事务 - conn.execute("BEGIN TRANSACTION") - - # 检查旧名称是否存在 - sql_check_old = """ - SELECT COUNT(*) as count FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_check_old, (self.group_id, old_name)) - if cursor.fetchone()["count"] == 0: - conn.rollback() - return False - - # 检查新名称是否已存在 - sql_check_new = """ - SELECT COUNT(*) as count FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_check_new, (self.group_id, new_name)) - if cursor.fetchone()["count"] > 0: - conn.rollback() - return False - - # 更新玩家表 - sql_update_player = """ - UPDATE duel_players SET player_name = ? - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_player, (new_name, self.group_id, old_name)) - - # 移除了更新历史记录表中的胜者和败者名称的代码 - - # 提交事务 - conn.commit() - logger_duel.info(f"成功将玩家 {old_name} 改名为 {new_name}") - - return True - - except sqlite3.Error as e: - logger_duel.error(f"更改玩家名称失败: {e}", exc_info=True) - return False # 出错时返回失败 - - def update_score_by_magic(self, winner: str, loser: str, magic_power: int) -> Tuple[int, int]: - """根据魔法分数更新玩家积分 - - Args: - winner: 胜利者名称 - loser: 失败者名称 - magic_power: 决斗中所有参与者使用的魔法总分数 - - Returns: - Tuple[int, int]: (胜利者获得积分, 失败者失去积分) - """ - # 获取玩家数据 (这里只是为了确保玩家存在) - self.get_player_data(winner) - self.get_player_data(loser) - - # 使用魔法总分作为积分变化值 - points = magic_power - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新胜利者数据 - sql_update_winner = """ - UPDATE duel_players SET - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_winner, (points, self.group_id, winner)) - - # 更新失败者数据 - sql_update_loser = """ - UPDATE duel_players SET - score = MAX(1, score - ?), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_loser, (points, self.group_id, loser)) - - # 移除了记录对战历史的代码 - - conn.commit() - logger_duel.info(f"{winner} 使用魔法击败 {loser},获得 {points} 积分") - - return (points, points) # 返回胜者得分和败者失分(相同) - - except sqlite3.Error as e: - logger_duel.error(f"根据魔法分数更新积分失败: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - - def record_duel_result(self, winner: str, loser: str, winner_points: int, loser_points: int, total_magic_power: int, used_item: Optional[str] = None) -> Tuple[int, int]: - """记录决斗结果,更新玩家数据和历史记录 - - Args: - winner: 胜利者名称 - loser: 失败者名称 - winner_points: 胜利者获得的积分 - loser_points: 失败者失去的积分 - total_magic_power: 决斗中使用的总魔法力 - used_item: 本次决斗中使用的道具名称 (可选) - 可能是 "elder_wand"(老魔杖), "magic_stone"(魔法石), "invisibility_cloak"(隐身衣) - - Returns: - Tuple[int, int]: (胜利者实际获得积分, 失败者实际失去积分) - """ - # 获取玩家数据 (确保玩家存在) - self.get_player_data(winner) - self.get_player_data(loser) - - # 注意:loser_points 是正数,表示要扣除的分数 - - try: - with self._db_lock: - with self._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新胜利者数据 - sql_update_winner = """ - UPDATE duel_players SET - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_winner, (winner_points, self.group_id, winner)) - - # 更新失败者数据 - sql_update_loser = """ - UPDATE duel_players SET - score = MAX(1, score - ?), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update_loser, (loser_points, self.group_id, loser)) - - # --- 改进处理道具消耗逻辑 --- - if used_item == "elder_wand": - # 老魔杖是胜利者使用的 - cursor.execute("UPDATE duel_players SET elder_wand = MAX(0, elder_wand - 1) WHERE group_id = ? AND player_name = ?", (self.group_id, winner)) - logger_duel.info(f"消耗了 {winner} 的老魔杖 (剩余数量将被更新)") - elif used_item == "magic_stone": - # 魔法石是失败者使用的 - cursor.execute("UPDATE duel_players SET magic_stone = MAX(0, magic_stone - 1) WHERE group_id = ? AND player_name = ?", (self.group_id, loser)) - logger_duel.info(f"消耗了 {loser} 的魔法石 (剩余数量将被更新)") - elif used_item == "invisibility_cloak": - # 隐身衣由胜利者使用 - cursor.execute("UPDATE duel_players SET invisibility_cloak = MAX(0, invisibility_cloak - 1) WHERE group_id = ? AND player_name = ?", (self.group_id, winner)) - logger_duel.info(f"消耗了 {winner} 的隐身衣 (剩余数量将被更新)") - # -------------------------- - - # 移除了记录对战历史的代码 - - conn.commit() - logger_duel.info(f"{winner} 在决斗中击败 {loser},胜者积分 +{winner_points},败者积分 -{loser_points},使用道具: {used_item or '无'}") - - return (winner_points, loser_points) # 返回实际积分变化 - - except sqlite3.Error as e: - logger_duel.error(f"记录决斗结果失败: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - except Exception as e: - logger_duel.error(f"记录决斗结果时发生未知错误: {e}", exc_info=True) - return (0, 0) # 出错时返回0分 - -class HarryPotterDuel: - """决斗功能""" - - def __init__(self, player1, player2, group_id, player1_is_challenger=True): - """ - 初始化决斗 - :param player1: 玩家1的名称 - :param player2: 玩家2的名称 - :param group_id: 群组ID - :param player1_is_challenger: 玩家1是否为决斗发起者 - """ - # 确保只在群聊中决斗 - if not group_id: - raise ValueError("决斗功能只支持群聊") - - self.player1 = { - "name": player1, - "hp": 100, - "spells": [], - "is_challenger": player1_is_challenger - } - self.player2 = { - "name": player2, - "hp": 100, - "spells": [], - "is_challenger": not player1_is_challenger - } - self.rounds = 0 - self.steps = [] - self.group_id = group_id # 记录群组ID - - # 检测是否为Boss战(对手是AI"泡泡") - self.is_boss_fight = (player2 == "泡泡") - - # Boss战特殊设置 - if self.is_boss_fight: - # Boss战胜率极低,设为10% - self.player_win_chance = 0.1 - # 添加Boss战提示信息 - self.steps.append("⚔️ Boss战开始 ⚔️\n挑战强大的魔法师泡泡!") - - # 设置防御成功率 - self.defense_success_rate = 0.3 - - # 咒语列表(名称、威力、权重)- 权重越小越稀有 - self.spells = [ - {"name": "除你武器", "power": 10, "weight": 30, "desc": "🪄", - "attack_desc": ["挥动魔杖划出一道弧线,魔杖尖端发出红光,释放", "伸手一指对手的魔杖,大声喊道", "用魔杖直指对手,施放缴械咒"], - "damage_desc": ["被红光击中,魔杖瞬间脱手飞出", "的魔杖被一股无形力量扯离手掌,飞向远处", "手中魔杖突然被击飞,不得不空手应对"]}, - - {"name": "昏昏倒地", "power": 25, "weight": 25, "desc": "✨", - "attack_desc": ["魔杖发出耀眼的红光,发射昏迷咒", "快速挥舞魔杖,释放出一道猩红色闪光", "高声呼喊咒语,杖尖喷射出红色火花"], - "damage_desc": ["被红光击中,意识开始模糊,几近昏迷", "躲闪不及,被击中后身体摇晃,眼神涣散", "被咒语命中,双腿一软,差点跪倒在地"]}, - - {"name": "统统石化", "power": 40, "weight": 20, "desc": "💫", - "attack_desc": ["直指对手,魔杖尖端射出蓝白色光芒,施放石化咒", "魔杖在空中划过一道蓝光,精准施放", "双目紧盯对手,冷静施展全身束缚咒"], - "damage_desc": ["身体被蓝光罩住,四肢瞬间变得僵硬如石", "全身突然绷紧,像被无形的绳索紧紧束缚", "动作突然凝固,仿佛变成了一座雕像"]}, - - {"name": "障碍重重", "power": 55, "weight": 15, "desc": "⚡", - "attack_desc": ["魔杖猛地向前一挥,发射出闪亮的紫色光束", "大声念出咒语,同时杖尖射出炫目光芒", "旋转魔杖制造出一道旋转的障碍咒"], - "damage_desc": ["被一股无形的力量狠狠推开,猛烈撞上后方障碍物", "身体被击中后像断线风筝般飞出数米,重重摔落", "被强大的冲击波掀翻在地,一时无法站起"]}, - - {"name": "神锋无影", "power": 70, "weight": 10, "desc": "🗡️", - "attack_desc": ["低声念诵,魔杖如剑般挥下", "以危险的低沉嗓音念诵咒语,杖尖闪烁着寒光", "用魔杖在空中划出复杂轨迹,释放斯内普的秘咒"], - "damage_desc": ["身上突然出现多道无形的切割伤口,鲜血喷涌而出", "惨叫一声,胸前与面部浮现出深深的伤痕,鲜血直流", "被无形的刀刃划过全身,衣物和皮肤同时被割裂,伤痕累累"]}, - - {"name": "钻心剜骨", "power": 85, "weight": 5, "desc": "🔥", - "attack_desc": ["眼中闪过一丝狠厉,用尖利的声音喊出不可饶恕咒", "面露残忍笑容,魔杖直指对手施放酷刑咒", "用充满恶意的声音施放黑魔法,享受对方的痛苦"], - "damage_desc": ["被咒语击中,全身每一根神经都在燃烧般剧痛,倒地挣扎哀嚎", "发出撕心裂肺的惨叫,痛苦地在地上痉挛扭曲", "遭受前所未有的剧痛折磨,脸上血管暴起,痛不欲生"]}, - - {"name": "阿瓦达索命", "power": 100, "weight": 1, "desc": "💀", - "attack_desc": ["用充满杀意的声音念出死咒,魔杖喷射出刺目的绿光", "冷酷无情地发出致命死咒,绿光直射对手", "毫无犹豫地使用了最邪恶的不可饶恕咒,绿光闪耀"], - "damage_desc": ["被绿光正面击中,生命瞬间被夺走,眼神空洞地倒下", "还未来得及反应,生命便随着绿光的接触戛然而止", "被死咒击中,身体僵直地倒下,生命气息完全消失"]} - ] - - # 防御咒语列表(名称、描述)- 统一使用self.defense_success_rate作为成功率 - self.defense_spells = [ - {"name": "盔甲护身", "desc": "🛡️", - "defense_desc": ["迅速在身前制造出一道透明魔法屏障,挡住了攻击", "挥动魔杖在周身形成一道金色防御光幕,抵消了咒语", "大声喊出咒语,召唤出强力的防护盾牌"]}, - - {"name": "除你武器", "desc": "⚔️", - "defense_desc": ["用缴械咒反击,成功击飞对方魔杖", "喊道出魔咒,让对手的魔咒偏离方向", "巧妙反击,用缴械咒化解了对手的攻击"]}, - - {"name": "呼神护卫", "desc": "🧿", - "defense_desc": ["全神贯注地召唤出银色守护神,抵挡住了攻击", "魔杖射出耀眼银光,形成守护屏障吸收了咒语", "集中思念快乐回忆,释放出强大的守护神魔法"]} - ] - - # 设置胜利描述 - self.victory_descriptions = [ - "让对手失去了战斗能力", - "最终击倒了对手", - "的魔法取得了胜利", - "的致命一击决定了结果", - "的魔法赢得了这场决斗", - "对魔法的控制带来了胜利", - "在激烈的对决中占据上风", - "毫无悬念地获胜" - ] - - # 记录开场信息 - if not self.is_boss_fight: - self.steps.append(f"⚔️ 决斗开始 ⚔️\n{self.player1['name']} VS {self.player2['name']}") - - def select_spell(self): - """随机选择一个咒语,威力越高出现概率越低""" - weights = [spell["weight"] for spell in self.spells] - total_weight = sum(weights) - normalized_weights = [w/total_weight for w in weights] - return random.choices(self.spells, weights=normalized_weights, k=1)[0] - - def attempt_defense(self): - """尝试防御,返回是否成功和使用的防御咒语""" - defense = random.choice(self.defense_spells) - success = random.random() < self.defense_success_rate - return success, defense - - def start_duel(self): - """开始决斗,返回决斗过程的步骤列表""" - # 创建积分系统实例,整个方法中重用 - rank_system = DuelRankSystem(self.group_id) - - # --- 修改:提前获取双方玩家数据 --- - player1_data = rank_system.get_player_data(self.player1["name"]) - player2_data = rank_system.get_player_data(self.player2["name"]) - # --------------------------------------------- - - # Boss战特殊处理 - if self.is_boss_fight: - # 生成随机的Boss战斗过程 - boss_battle_descriptions = [ - f"🔮 强大的Boss泡泡挥动魔杖,释放出一道耀眼的紫色光束,{self.player1['name']}勉强躲开!", - f"⚡ {self.player1['name']}尝试施放昏昏倒地,但泡泡像预知一般轻松侧身避过!", - f"🌪️ 泡泡召唤出一阵魔法旋风,将{self.player1['name']}的咒语全部吹散!", - f"🔥 {self.player1['name']}使出全力施放火焰咒,泡泡却用一道水盾将其熄灭!", - f"✨ 双方魔杖相对,杖尖迸发出耀眼的金色火花,魔力在空中碰撞!", - f"🌟 泡泡释放出数十个魔法分身,{self.player1['name']}不知道哪个是真身!", - f"🧙 {self.player1['name']}召唤出守护神,但在泡泡强大的黑魔法面前迅速消散!", - f"⚔️ 一连串快速的魔咒交锋,魔法光束在空中交织成绚丽的网!", - f"🛡️ 泡泡创造出一道几乎无法破解的魔法屏障,{self.player1['name']}的咒语无法穿透!", - f"💫 {self.player1['name']}施放最强一击,能量波动让整个决斗场地震颤!" - ] - - # 只随机选择一条战斗描述添加(减少刷屏) - self.steps.append(random.choice(boss_battle_descriptions)) - - # 检查是否战胜Boss(极低概率) - if random.random() < self.player_win_chance: # 玩家赢了 - winner, loser = self.player1, self.player2 - - # 添加胜利转折点描述 - victory_turn = [ - f"✨ 关键时刻,{winner['name']}找到了泡泡防御的破绽!", - f"🌟 命运女神眷顾了{winner['name']},一个意外的反弹击中了泡泡的要害!", - f"💥 在泡泡即将施放致命一击时,{winner['name']}突然爆发出前所未有的魔法力量!" - ] - self.steps.append(random.choice(victory_turn)) - - # 随机获得一件装备 - items = ["elder_wand", "magic_stone", "invisibility_cloak"] - item_names = {"elder_wand": "老魔杖", "magic_stone": "魔法石", "invisibility_cloak": "隐身衣"} - - try: - with rank_system._db_lock: - with rank_system._get_db_conn() as conn: - cursor = conn.cursor() - - # 获取当前玩家的道具数量 - sql_query = """ - SELECT elder_wand, magic_stone, invisibility_cloak - FROM duel_players - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_query, (self.group_id, winner["name"])) - result = cursor.fetchone() - - if result: - # 更新玩家数据,增加道具 - sql_update = """ - UPDATE duel_players SET - elder_wand = elder_wand + 1, - magic_stone = magic_stone + 1, - invisibility_cloak = invisibility_cloak + 1, - score = score + ?, - wins = wins + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - winner_points = 300 # 胜利积分固定为500分 - cursor.execute(sql_update, (winner_points, self.group_id, winner["name"])) - - # 移除了记录对战历史的代码 - - conn.commit() - - # 查询更新后玩家排名 - sql_rank = """ - SELECT COUNT(*) + 1 as rank - FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - ) - """ - cursor.execute(sql_rank, (self.group_id, self.group_id, winner["name"])) - rank_result = cursor.fetchone() - rank = rank_result["rank"] if rank_result else None - - rank_text = f"第{rank}名" if rank else "暂无排名" - - # 添加获得装备的信息 - result = ( - f"🏆 {winner['name']} 以不可思议的实力击败了强大的Boss泡泡!\n\n" - f"获得了三件死亡圣器!\n" - f" 🪄 💎 🧥 \n\n" - f"积分: +{winner_points}分 ({rank_text})" - ) - - self.steps.append(result) - return self.steps - else: - # 玩家不存在,这种情况理论上不可能发生,但为安全添加 - logger_duel.error(f"Boss战获胜但找不到玩家 {winner['name']} 数据") - except sqlite3.Error as e: - logger_duel.error(f"处理Boss战胜利时出错: {e}", exc_info=True) - self.steps.append(f"⚠️ 处理战利品时遇到问题: {e}") - return self.steps - - else: # 玩家输了 - winner, loser = self.player2, self.player1 - - # 添加失败结局描述 - 更恐怖、更简洁的描述 - defeat_end = [ - f"💀 泡泡瞬间爆发出令人胆寒的强大魔力,{loser['name']}甚至来不及反应就被击倒在地!", - f"⚰️ 只见泡泡轻轻挥动魔杖,{loser['name']}如遭雷击,整个人被恐怖的魔法能量碾压!", - f"☠️ 泡泡展现出真正的实力,一道黑色闪电瞬间击穿{loser['name']}的所有防御!" - ] - self.steps.append(random.choice(defeat_end)) - - try: - with rank_system._db_lock: - with rank_system._get_db_conn() as conn: - cursor = conn.cursor() - - # 更新失败者数据 - sql_update = """ - UPDATE duel_players SET - score = MAX(1, score - 100), - losses = losses + 1, - total_matches = total_matches + 1, - last_updated = datetime('now') - WHERE group_id = ? AND player_name = ? - """ - cursor.execute(sql_update, (self.group_id, loser["name"])) - - # 移除了记录对战历史的代码 - - conn.commit() - except sqlite3.Error as e: - logger_duel.error(f"处理Boss战失败时出错: {e}", exc_info=True) - - result = ( - f"💀 {loser['name']} 不敌强大的Boss泡泡!\n\n" - f"积分: -100分\n" - f"再接再厉,下次挑战吧!" - ) - - self.steps.append(result) - return self.steps - - # --- 新增:开局检查双方隐身衣 --- - p1_cloak = player1_data["items"].get("invisibility_cloak", 0) > 0 - p2_cloak = player2_data["items"].get("invisibility_cloak", 0) > 0 - - if p1_cloak and not p2_cloak: # 只有 Player1 有隐身衣 - winner, loser = self.player1, self.player2 - winner_points = 30 - loser_points = 30 - used_item = "invisibility_cloak" - self.steps.append(f"🧥 {winner['name']} 开局使用了隐身衣,潜行偷袭,直接获胜!") - # 直接调用记录结果函数处理数据库和返回消息 - return self._handle_direct_win(rank_system, winner, loser, winner_points, loser_points, used_item, player1_data) - elif not p1_cloak and p2_cloak: # 只有 Player2 有隐身衣 - winner, loser = self.player2, self.player1 - winner_points = 30 - loser_points = 30 - used_item = "invisibility_cloak" - self.steps.append(f"🧥 {winner['name']} 开局使用了隐身衣,潜行偷袭,直接获胜!") - # 直接调用记录结果函数处理数据库和返回消息 - return self._handle_direct_win(rank_system, winner, loser, winner_points, loser_points, used_item, player2_data) - elif p1_cloak and p2_cloak: # 双方都有隐身衣 - self.steps.append(f"🧥 双方都试图使用隐身衣,魔法相互干扰,隐身效果失效!决斗正常进行!") - # (可选)可以在这里添加消耗双方隐身衣的逻辑,但为了简化,暂时不加 - # --- 隐身衣检查结束 --- - - # 普通决斗流程,保持原有逻辑 - # 根据决斗发起者设置先手概率 - if self.player1["is_challenger"]: - # 获取挑战者的排名和总玩家数 - challenger = self.player1["name"] - challenger_rank, _ = rank_system.get_player_rank(challenger) - - # 获取总玩家数 - all_players = rank_system.get_rank_list(9999) # 获取所有玩家 - total_players = len(all_players) - - # 计算先手概率:基础概率50% + (排名/总人数)*30% - # 如果没有排名或总玩家数为0,则使用基础概率50% - if challenger_rank is not None and total_players > 0: - # 排名越大(越靠后),先手优势越大 - first_attack_prob = 0.5 + (challenger_rank / total_players) * 0.3 - else: - first_attack_prob = 0.5 # 默认概率 - - current_attacker = "player1" if random.random() < first_attack_prob else "player2" - else: - # 获取挑战者的排名和总玩家数 - challenger = self.player2["name"] - challenger_rank, _ = rank_system.get_player_rank(challenger) - - # 获取总玩家数 - all_players = rank_system.get_rank_list(9999) # 获取所有玩家 - total_players = len(all_players) - - # 计算先手概率:基础概率50% + (排名/总人数)*30% - # 如果没有排名或总玩家数为0,则使用基础概率50% - if challenger_rank is not None and total_players > 0: - # 排名越大(越靠后),先手优势越大 - first_attack_prob = 0.5 + (challenger_rank / total_players) * 0.3 - else: - first_attack_prob = 0.5 # 默认概率 - - current_attacker = "player2" if random.random() < first_attack_prob else "player1" - - # 随机选择先手介绍语 - first_move_descriptions = [ - "抢先出手,迅速进入战斗状态,", - "反应更快,抢得先机,", - "魔杖一挥,率先发动攻击,", - "眼疾手快,先发制人,", - "气势如虹,先声夺人,", - "以迅雷不及掩耳之势抢先出手," - ] - - # 记录所有魔法分数的总和 - total_magic_power = 0 - - # 一击必胜模式,只有一回合 - self.rounds = 1 - - # 确定当前回合的攻击者和防御者 - if current_attacker == "player1": - attacker = self.player1 - defender = self.player2 - else: - attacker = self.player2 - defender = self.player1 - - # 选择咒语 - spell = self.select_spell() - - # 记录使用的魔法分数 - total_magic_power += spell["power"] - attacker["spells"].append(spell) - - # 先手介绍与咒语专属攻击描述组合在一起 - first_move_desc = random.choice(first_move_descriptions) - # 从咒语的专属攻击描述中随机选择一个 - spell_attack_desc = random.choice(spell["attack_desc"]) - attack_info = f"🎲 {attacker['name']} {first_move_desc}{spell_attack_desc} {spell['name']}{spell['desc']}" - self.steps.append(attack_info) - - # 尝试防御 - defense_success, defense = self.attempt_defense() - - if defense_success: - # 防御成功,使用防御咒语的专属描述 - defense_desc = random.choice(defense["defense_desc"]) - defense_info = f"{defender['name']} {defense_desc},使用 {defense['name']}{defense['desc']} 防御成功!" - self.steps.append(defense_info) - - # 记录防御使用的魔法分数 - for defense_spell in self.defense_spells: - if defense_spell["name"] == defense["name"]: - total_magic_power += 20 # 防御魔法固定20分 - break - - # 转折描述与反击描述组合 - counter_transition = [ - "防御成功后立即抓住机会反击,", - "挡下攻击的同时,立刻准备反攻,", - "借着防御的势头,迅速转为攻势,", - "一个漂亮的防御后,立刻发起反击,", - "丝毫不给对手喘息的机会,立即反击," - ] - - # 反制:防守方变为攻击方 - counter_spell = self.select_spell() - - # 记录反制使用的魔法分数 - total_magic_power += counter_spell["power"] - defender["spells"].append(counter_spell) - - # 转折与咒语专属反击描述组合在一起 - counter_transition_desc = random.choice(counter_transition) - # 从反制咒语的专属攻击描述中随机选择一个 - counter_spell_attack_desc = random.choice(counter_spell["attack_desc"]) - counter_info = f"{defender['name']} {counter_transition_desc}{counter_spell_attack_desc} {counter_spell['name']}{counter_spell['desc']}" - self.steps.append(counter_info) - - # 显示反击造成的伤害描述 - counter_damage_desc = random.choice(counter_spell["damage_desc"]) - if current_attacker == "player1": - damage_info = f"{self.player1['name']} {counter_damage_desc}!" - else: - damage_info = f"{self.player2['name']} {counter_damage_desc}!" - self.steps.append(damage_info) - - # 防御成功并反制,原攻击者直接失败 - if current_attacker == "player1": - self.player1["hp"] = 0 - winner, loser = self.player2, self.player1 - else: - self.player2["hp"] = 0 - winner, loser = self.player1, self.player2 - else: - # 防御失败,直接被击败 - # 从攻击咒语的专属伤害描述中随机选择一个 - damage_desc = random.choice(spell["damage_desc"]) - damage_info = f"{defender['name']} {damage_desc}!" - self.steps.append(damage_info) - - if current_attacker == "player1": - self.player2["hp"] = 0 - winner, loser = self.player1, self.player2 - else: - self.player1["hp"] = 0 - winner, loser = self.player2, self.player1 - - # --- 修改:获取胜利者和失败者的最新数据 --- - # 在决斗结束后,重新获取双方数据以确保道具数量是当前的 - winner_data = rank_system.get_player_data(winner["name"]) - loser_data = rank_system.get_player_data(loser["name"]) - # --------------------------------------- - - # --- 修改:道具效果处理逻辑 --- - used_item_winner = None # 记录胜利者使用的道具 - used_item_loser = None # 记录失败者使用的道具 - winner_points = total_magic_power # 基础胜利积分 - loser_points = total_magic_power # 基础失败扣分 - - # 检查失败者是否有魔法石 - 失败不扣分 - if loser_data["items"].get("magic_stone", 0) > 0: - self.steps.append(f"💎 {loser['name']} 使用了魔法石,虽然失败但是痊愈了!") - used_item_loser = "magic_stone" - loser_points = 0 # 不扣分 - - # 检查胜利者是否有老魔杖 - 胜利积分×5 (独立于魔法石判断) - if winner_data["items"].get("elder_wand", 0) > 0: - # 如果失败者没用魔法石,才显示胜利加成信息(避免信息重复) - if used_item_loser != "magic_stone": - self.steps.append(f"🪄 {winner['name']} 使用了老魔杖,魔法威力增加了五倍!") - else: # 如果失败者用了魔法石,补充说明胜利者也用了老魔杖 - self.steps.append(f"🪄 同时,{winner['name']} 使用了老魔杖,得分加倍!") - used_item_winner = "elder_wand" - winner_points *= 5 # 积分乘以5 - - # --- 整合使用的道具信息 --- - # 注意:record_duel_result 目前只支持记录一个 used_item - # 为了兼容,优先记录影响积分计算的道具 - final_used_item = used_item_winner or used_item_loser # 优先记录胜利者道具,其次失败者道具 - # -------------------------- - - # 使用 record_duel_result 方法记录结果并更新数据库 - try: - # 调用新的记录结果方法,它会处理积分更新、道具消耗和历史记录 - # **注意:需要修改 record_duel_result 来正确处理道具消耗** - actual_winner_points, actual_loser_points = rank_system.record_duel_result( - winner=winner["name"], - loser=loser["name"], - winner_points=winner_points, - loser_points=loser_points, # 传递可能为0的扣分值 - total_magic_power=total_magic_power, - used_item=final_used_item # 传递最终决定的使用道具 - ) - logger_duel.info(f"数据库更新成功: 胜者 {winner['name']} +{actual_winner_points}, 败者 {loser['name']} -{actual_loser_points}") - except Exception as e: - logger_duel.error(f"调用 record_duel_result 时发生错误: {e}", exc_info=True) - self.steps.append(f"⚠️ 保存决斗结果时发生错误: {e}") - - # 获取胜利者当前排名 - rank, _ = rank_system.get_player_rank(winner["name"]) - rank_text = f"第{rank}名" if rank else "暂无排名" - - # 重新获取玩家数据以显示正确的道具数量 (在调用 record_duel_result 后获取) - updated_winner_data = rank_system.get_player_data(winner["name"]) - updated_loser_data = rank_system.get_player_data(loser["name"]) - - # 选择胜利描述 - victory_desc = random.choice(self.victory_descriptions) - - # 结果信息 - result = ( - f"🏆 {winner['name']} {victory_desc}!\n\n" - f"积分: {winner['name']} +{winner_points}分 ({rank_text})\n" # 显示计算出的得分 - f"{loser['name']} -{loser_points}分" # 显示计算出的扣分 (可能为0) - ) - - # 如果使用了道具,显示剩余次数 - if used_item_winner == "elder_wand": - result += f"\n\n📦 {winner['name']} 剩余老魔杖: {updated_winner_data['items'].get('elder_wand', 0)}次" - if used_item_loser == "magic_stone": - result += f"\n\n📦 {loser['name']} 剩余魔法石: {updated_loser_data['items'].get('magic_stone', 0)}次" - # 如果开局隐身衣获胜,这里不会执行 - - # 添加结果 - self.steps.append(result) - return self.steps - - # --- 新增:处理隐身衣直接获胜的辅助方法 --- - def _handle_direct_win(self, rank_system, winner, loser, winner_points, loser_points, used_item, winner_original_data): - """处理因隐身衣直接获胜的情况,更新数据库并格式化消息""" - try: - # 直接调用 record_duel_result 来处理数据库更新 - # 注意:这里 total_magic_power 为 0,因为没有进行魔法对决 - rank_system.record_duel_result( - winner=winner["name"], - loser=loser["name"], - winner_points=winner_points, - loser_points=loser_points, - total_magic_power=0, # 隐身衣获胜没有魔法力计算 - used_item=used_item - ) - logger_duel.info(f"{winner['name']} 使用隐身衣击败 {loser['name']},积分 +{winner_points}") - - # 重新获取更新后的玩家数据以显示剩余道具 - updated_winner_data = rank_system.get_player_data(winner["name"]) - - except sqlite3.Error as e: - logger_duel.error(f"处理隐身衣胜利时数据库出错: {e}", exc_info=True) - self.steps.append(f"⚠️ 处理隐身衣胜利时遇到数据库问题: {e}") - # 数据库出错时,仍使用原始数据显示结果,避免程序崩溃 - updated_winner_data = winner_original_data - except Exception as e: # 捕获其他可能的异常 - logger_duel.error(f"处理隐身衣胜利时发生未知错误: {e}", exc_info=True) - self.steps.append(f"⚠️ 处理隐身衣胜利时发生内部错误: {e}") - updated_winner_data = winner_original_data - - # 获取胜利者当前排名 - rank, _ = rank_system.get_player_rank(winner["name"]) - rank_text = f"第{rank}名" if rank else "暂无排名" - - # 添加结果 - result = ( - f"🏆 {winner['name']} 使用隐身衣获胜!\n\n" - f"积分: {winner['name']} +{winner_points}分 ({rank_text})\n" - f"{loser['name']} -{loser_points}分\n\n" - f"📦 剩余隐身衣: {updated_winner_data['items'].get('invisibility_cloak', 0)}次" - ) - self.steps.append(result) - return self.steps - # --- 辅助方法结束 --- - -def start_duel(player1: str, player2: str, group_id=None, player1_is_challenger=True) -> List[str]: - """ - 启动一场决斗 - - Args: - player1: 玩家1的名称 - player2: 玩家2的名称 - group_id: 群组ID,必须提供 - player1_is_challenger: 玩家1是否为挑战发起者 - - Returns: - List[str]: 决斗过程的步骤 - """ - # 确保只在群聊中决斗 - if not group_id: - return ["❌ 决斗功能只支持群聊"] - - try: - duel = HarryPotterDuel(player1, player2, group_id, player1_is_challenger) - return duel.start_duel() - except Exception as e: - logging.error(f"决斗过程中发生错误: {e}") - return [f"决斗过程中发生错误: {e}"] - -def get_rank_list(top_n: int = 10, group_id=None) -> str: - """获取排行榜信息 - - Args: - top_n: 返回前几名 - group_id: 群组ID,必须提供 - """ - # 确保只在群聊中获取排行榜 - if not group_id: - return "❌ 决斗排行榜功能只支持群聊" - - try: - rank_system = DuelRankSystem(group_id) - ranks = rank_system.get_rank_list(top_n) - - if not ranks: - return "📊 决斗排行榜还没有数据" - - result = [f"📊 本群决斗排行榜 Top {len(ranks)}"] - for i, player in enumerate(ranks): - medal = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else f"{i+1}." - win_rate = int((player["wins"] / player["total_matches"]) * 100) if player["total_matches"] > 0 else 0 - result.append(f"{medal} {player['name']}: {player['score']}分 ({player['wins']}/{player['losses']}/{win_rate}%)") - - return "\n".join(result) - except Exception as e: - logging.error(f"获取排行榜失败: {e}") - return f"获取排行榜失败: {e}" - -def get_player_stats(player_name: str, group_id=None) -> str: - """获取玩家战绩 - - Args: - player_name: 玩家名称 - group_id: 群组ID,必须提供 - """ - # 确保只在群聊中获取战绩 - if not group_id: - return "❌ 决斗战绩查询功能只支持群聊" - - try: - rank_system = DuelRankSystem(group_id) - rank, player_data = rank_system.get_player_rank(player_name) - - win_rate = int((player_data["wins"] / player_data["total_matches"]) * 100) if player_data["total_matches"] > 0 else 0 - - result = [ - f"📊 {player_name} 的本群决斗战绩", - f"排名: {rank if rank else '暂无排名'}", - f"积分: {player_data['score']}", - f"胜场: {player_data['wins']}", - f"败场: {player_data['losses']}", - f"总场次: {player_data['total_matches']}", - f"胜率: {win_rate}%" - ] - - return "\n".join(result) - except Exception as e: - logging.error(f"获取玩家战绩失败: {e}") - return f"获取玩家战绩失败: {e}" - -def change_player_name(old_name: str, new_name: str, group_id=None) -> str: - """更改玩家名称 - - Args: - old_name: 旧名称 - new_name: 新名称 - group_id: 群组ID,必须提供 - - Returns: - str: 操作结果消息 - """ - # 确保只在群聊中更改玩家名称 - if not group_id: - return "❌ 更改玩家名称功能只支持群聊" - - try: - rank_system = DuelRankSystem(group_id) - result = rank_system.change_player_name(old_name, new_name) - - if result: - return f"✅ 已成功将决斗记录中的玩家 \"{old_name}\" 重命名为 \"{new_name}\"" - else: - return f"❌ 改名失败:请确认 \"{old_name}\" 在本群中有战绩记录,且 \"{new_name}\" 名称未被使用" - except Exception as e: - logging.error(f"更改玩家名称失败: {e}") - return f"❌ 更改玩家名称失败: {e}" - -class DuelManager: - """决斗管理器,处理决斗线程和消息发送""" - - def __init__(self, message_sender_func): - """ - 初始化决斗管理器 - - Args: - message_sender_func: 消息发送函数,接收(message, receiver)两个参数 - """ - self.message_sender = message_sender_func - self._duel_thread = None - self._duel_lock = Lock() - self.LOG = logging.getLogger("DuelManager") - - def send_duel_message(self, msg: str, receiver: str) -> None: - """发送决斗消息 - - Args: - msg: 消息内容 - receiver: 接收者ID(通常是群ID) - """ - try: - self.LOG.info(f"发送决斗消息 To {receiver}: {msg[:20]}...") - self.message_sender(msg, receiver) - except Exception as e: - self.LOG.error(f"发送决斗消息失败: {e}") - - def run_duel(self, challenger_name, opponent_name, receiver, is_group=False): - """在单独线程中运行决斗 - - Args: - challenger_name: 挑战者名称 - opponent_name: 对手名称 - receiver: 消息接收者(群id) - is_group: 是否是群聊 - """ - try: - # 确保只在群聊中运行决斗 - if not is_group: - self.send_duel_message("❌ 决斗功能只支持群聊", receiver) - return - - # 开始决斗 - # 传递群组ID参数 - group_id = receiver - duel_steps = start_duel(challenger_name, opponent_name, group_id, True) # challenger_name是发起者 - - # 逐步发送决斗过程 - for step in duel_steps: - self.send_duel_message(step, receiver) - time.sleep(1.5) # 每步之间添加适当延迟 - except Exception as e: - self.LOG.error(f"决斗过程中发生错误: {e}") - self.send_duel_message(f"决斗过程中发生错误: {e}", receiver) - finally: - # 释放决斗线程 - with self._duel_lock: - self._duel_thread = None - self.LOG.info("决斗线程已结束并销毁") - - def start_duel_thread(self, challenger_name, opponent_name, receiver, is_group=False): - """启动决斗线程 - - Args: - challenger_name: 挑战者名称 - opponent_name: 对手名称 - receiver: 消息接收者 - is_group: 是否是群聊 - - Returns: - bool: 是否成功启动决斗线程 - """ - with self._duel_lock: - if self._duel_thread is not None and self._duel_thread.is_alive(): - return False - - self._duel_thread = Thread( - target=self.run_duel, - args=(challenger_name, opponent_name, receiver, is_group), - daemon=True - ) - self._duel_thread.start() - return True - - def is_duel_running(self): - """检查是否有决斗正在进行 - - Returns: - bool: 是否有决斗正在进行 - """ - with self._duel_lock: - return self._duel_thread is not None and self._duel_thread.is_alive() - -# --- 新增:偷袭成功/失败的随机句子 --- -SNEAK_ATTACK_SUCCESS_MESSAGES = [ - "趁其不备,{attacker} 悄悄从 {target} 的口袋里摸走了 {points} 积分!真是个小机灵鬼!👻", - "月黑风高夜,正是下手时!{attacker} 成功偷袭 {target},顺走了 {points} 积分!🌙", - "{target} 一时大意,被 {attacker} 抓住了破绽,损失了 {points} 积分!💸", - "神不知鬼不觉,{attacker} 从 {target} 那里\"借\"来了 {points} 积分!🤫", - "手法娴熟!{attacker} 像一阵风一样掠过,{target} 发现时已经少了 {points} 积分!💨", -] - -SNEAK_ATTACK_FAILURE_MESSAGES = [ - "哎呀!{attacker} 的鬼祟行踪被 {target} 发现了,偷袭失败!👀", - "{target} 警惕性很高,{attacker} 的小动作没能得逞。🛡️", - "差点就成功了!可惜 {attacker} 不小心弄出了声响,被 {target} 逮个正着!🔔", - "{target} 哼了一声:\"就这点伎俩?\" {attacker} 的偷袭计划泡汤了。😏", - "运气不佳,{attacker} 刚伸手就被 {target} 的护身符弹开了,偷袭失败!✨", - "{attacker} 脚底一滑,在 {target} 面前摔了个狗啃泥,偷袭什么的早就忘光了!🤣", - "{target} 突然转身,和 {attacker} 对视,场面一度十分尴尬... 偷袭失败!😅", - "{attacker} 刚准备动手,{target} 的口袋里突然钻出一只嗅嗅,叼走了 {attacker} 的...嗯?偷袭失败!👃", - "{target} 拍了拍 {attacker} 的肩膀:\"兄弟,想啥呢?\",{attacker} 只好悻悻收手。🤝", - "一阵妖风刮过,把 {attacker} 准备用来偷袭的工具吹跑了... 时运不济啊!🌬️", - "{attacker} 发现 {target} 的口袋是画上去的!可恶,被摆了一道!🖌️", -] - -# --- 新增:偷到道具的随机句子 --- -SNEAK_ATTACK_ITEM_SUCCESS_MESSAGES = [ - "趁乱摸鱼!{attacker} 竟然从 {target} 身上摸走了一件 {item_name_cn}!真是妙手空空!👏", - "运气爆棚!{attacker} 偷袭失败,但顺走了 {target} 的一件 {item_name_cn}!🥳", - "{target} 光顾着得意,没注意到 {attacker} 悄悄拿走了一件 {item_name_cn}!🤭", - "失之东隅,收之桑榆。{attacker} 虽然没偷到分,但拐走了一件 {item_name_cn}!🎁", - "神偷再现!{attacker} 从 {target} 那里顺走了一件 {item_name_cn}!🔮", -] - -# --- 新增:道具英文名到中文名的映射 --- -ITEM_NAME_MAP = { - "elder_wand": "老魔杖 🪄", - "magic_stone": "魔法石 💎", - "invisibility_cloak": "隐身衣 🧥" -} - -# --- 新增:处理偷袭逻辑的函数 --- -def attempt_sneak_attack(attacker_name: str, target_name: str, group_id: str) -> str: - """ - 处理玩家尝试偷袭另一个玩家的逻辑 - - Args: - attacker_name: 偷袭者名称 - target_name: 被偷袭者名称 - group_id: 群组ID - - Returns: - str: 偷袭结果的消息 - """ - if not group_id: - return "❌ 偷袭功能也只支持群聊哦。" - - try: - rank_system = DuelRankSystem(group_id) - - # 检查玩家是否存在 - with rank_system._db_lock: - with rank_system._get_db_conn() as conn: - cursor = conn.cursor() - - # 检查偷袭者是否存在 - cursor.execute( - "SELECT COUNT(*) as count FROM duel_players WHERE group_id = ? AND player_name = ?", - (group_id, attacker_name) - ) - if cursor.fetchone()["count"] == 0: - return f"❌ 偷袭发起者 {attacker_name} 还没有决斗记录。" - - # 检查目标是否存在 - cursor.execute( - "SELECT COUNT(*) as count FROM duel_players WHERE group_id = ? AND player_name = ?", - (group_id, target_name) - ) - if cursor.fetchone()["count"] == 0: - return f"❌ 目标 {target_name} 还没有决斗记录。" - - # 获取偷袭者排名 - cursor.execute(""" - SELECT COUNT(*) + 1 as rank FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - )""", (group_id, group_id, attacker_name)) - attacker_rank_result = cursor.fetchone() - attacker_rank = attacker_rank_result["rank"] if attacker_rank_result else None - - # 获取目标排名 - cursor.execute(""" - SELECT COUNT(*) + 1 as rank FROM duel_players - WHERE group_id = ? AND score > ( - SELECT score FROM duel_players - WHERE group_id = ? AND player_name = ? - )""", (group_id, group_id, target_name)) - target_rank_result = cursor.fetchone() - target_rank = target_rank_result["rank"] if target_rank_result else None - - # 获取总玩家数 - cursor.execute("SELECT COUNT(*) as count FROM duel_players WHERE group_id = ?", (group_id,)) - total_players = cursor.fetchone()["count"] - - # 计算成功率 - success_prob = 0.3 # 基础成功率 30% - - # 计算概率加成(仅当双方都有排名且总人数大于0时) - if attacker_rank is not None and target_rank is not None and total_players > 0: - if attacker_rank > target_rank: # 偷袭者排名更低 - rank_difference = attacker_rank - target_rank - # 排名差值影响概率,最多增加 40% - success_prob += min((rank_difference / total_players) * 0.4, 0.4) - # else: 偷袭者排名更高或相同,使用基础概率 30% - - # 确保概率在 0 到 1 之间 - success_prob = max(0, min(1, success_prob)) - - # 格式化概率显示为0-100%的百分比 - prob_percent = success_prob * 100 - logger_duel.info(f"偷袭计算: {attacker_name}({attacker_rank}) vs {target_name}({target_rank}), 总人数: {total_players}, 成功率: {prob_percent:.1f}%") - - roll_successful = random.random() < success_prob - points_exchanged_successfully = False # 标记是否成功转移了分数 - - # 决定偷袭是否成功 - if roll_successful: - # --- 偷袭概率判定成功,尝试计算分数转移 --- - # 获取分数差 - cursor.execute(""" - SELECT t1.score as attacker_score, t2.score as target_score - FROM duel_players t1, duel_players t2 - WHERE t1.group_id = ? AND t1.player_name = ? - AND t2.group_id = ? AND t2.player_name = ? - """, (group_id, attacker_name, group_id, target_name)) - result = cursor.fetchone() - # 添加检查,以防万一查询不到结果 - if not result: - logger_duel.error(f"偷袭成功后查询分数失败: {attacker_name} vs {target_name}") - return "❌ 处理偷袭时发生内部错误:无法获取玩家分数。" - - attacker_score = result["attacker_score"] - target_score = result["target_score"] - - # 1. 计算潜在偷取分数 - score_difference = abs(attacker_score - target_score) - potential_points_stolen = max(random.randint(10, 50), int(score_difference * 0.1)) # 偷取(10-50)或分数差的10%,取最大值 - - # 2. 计算目标实际能损失的最大分数 (最低保留1分) - max_points_target_can_lose = max(0, target_score - 1) - - # 3. 确定实际交换的分数 - actual_points_exchanged = min(potential_points_stolen, max_points_target_can_lose) - - # 只有实际交换分数大于0时才更新数据库和记录历史 - if actual_points_exchanged > 0: - # 更新分数 (零和交换) - cursor.execute( - "UPDATE duel_players SET score = score + ? WHERE group_id = ? AND player_name = ?", - (actual_points_exchanged, group_id, attacker_name) - ) - cursor.execute( - "UPDATE duel_players SET score = score - ? WHERE group_id = ? AND player_name = ?", - (actual_points_exchanged, group_id, target_name) - ) - - # 移除了记录到历史记录的代码 - - # 提交事务 - conn.commit() - logger_duel.info(f"偷袭成功: {attacker_name} 偷取 {target_name} {actual_points_exchanged} 分 (原目标分数: {target_score}, 潜在偷取: {potential_points_stolen})") - - # 选择并格式化成功消息 (使用 actual_points_exchanged) - message_template = random.choice(SNEAK_ATTACK_SUCCESS_MESSAGES) - result_message = message_template.format(attacker=attacker_name, target=target_name, points=actual_points_exchanged) - - points_exchanged_successfully = True # 标记成功转移了分数 - return result_message # 只有在成功转移分数时才直接返回 - else: - # 如果实际交换分数为0 (例如目标只有1分) - logger_duel.info(f"偷袭概率判定成功但未发生分数转移: {attacker_name} 偷袭 {target_name} (目标分数: {target_score}),转为尝试偷道具...") - # 不设置 points_exchanged_successfully = True - # 不返回,继续执行下面的偷道具逻辑 - - # --- 如果偷袭概率判定失败,或者判定成功但未转移分数,则尝试偷道具 --- - if not points_exchanged_successfully: # 这个条件覆盖了概率判定失败和概率判定成功但未转移分数两种情况 - # 根据情况选择日志消息 - if not roll_successful: # 如果是概率判定失败的情况 - logger_duel.info(f"偷袭分数失败: {attacker_name} 偷袭 {target_name}. 尝试根据目标道具数量计算偷道具概率...") - - # --- 修改:提前获取目标道具信息以计算概率 --- - cursor.execute(""" - SELECT elder_wand, magic_stone, invisibility_cloak - FROM duel_players - WHERE group_id = ? AND player_name = ? - """, (group_id, target_name)) - target_items_result = cursor.fetchone() # 使用新变量名避免混淆 - - item_steal_prob = 0.0 # 初始化概率为 0 - total_items_count = 0 - - if target_items_result: - # 计算总道具数量 - total_items_count = (target_items_result["elder_wand"] + - target_items_result["magic_stone"] + - target_items_result["invisibility_cloak"]) - - # 计算动态概率,每件道具增加 1% - item_steal_prob = total_items_count * 0.01 - logger_duel.info(f"目标共有 {total_items_count} 件道具,计算出的偷道具概率为: {item_steal_prob*100:.1f}% ") - else: - # 如果查询不到目标道具信息(理论上不应发生,因为前面检查过玩家存在) - logger_duel.warning(f"未能查询到目标 {target_name} 的道具信息,无法计算偷道具概率。") - item_steal_prob = 0.0 # 无法计算则概率为0 - - # --- 使用计算出的 item_steal_prob 进行判断 --- - if total_items_count > 0 and random.random() < item_steal_prob: - # --- 概率判定成功,且目标确实有道具可偷 --- - logger_duel.info(f"偷道具判定成功 (概率 {item_steal_prob*100:.1f}%),开始选择道具...") - - # --- 复用之前获取的 target_items_result 构建列表 --- - available_item_names = [] - item_weights = [] - - if target_items_result["elder_wand"] > 0: - available_item_names.append("elder_wand") - item_weights.append(target_items_result["elder_wand"]) - - if target_items_result["magic_stone"] > 0: - available_item_names.append("magic_stone") - item_weights.append(target_items_result["magic_stone"]) - - if target_items_result["invisibility_cloak"] > 0: - available_item_names.append("invisibility_cloak") - item_weights.append(target_items_result["invisibility_cloak"]) - - # 这个检查理论上可以省略,因为前面 total_items_count > 0 已经保证了列表非空 - # 但为了代码健壮性可以保留 - if available_item_names: - # 根据权重随机选择一件道具 - item_stolen = random.choices(available_item_names, weights=item_weights, k=1)[0] - item_name_cn = ITEM_NAME_MAP.get(item_stolen, item_stolen) - - # 更新数据库:目标减道具,攻击者加道具 - sql_update_target = f"UPDATE duel_players SET {item_stolen} = MAX(0, {item_stolen} - 1) WHERE group_id = ? AND player_name = ?" - sql_update_attacker = f"UPDATE duel_players SET {item_stolen} = {item_stolen} + 1 WHERE group_id = ? AND player_name = ?" - - cursor.execute(sql_update_target, (group_id, target_name)) - cursor.execute(sql_update_attacker, (group_id, attacker_name)) - conn.commit() # 偷道具成功,提交事务 - - # 选择并格式化偷道具成功消息 - message_template = random.choice(SNEAK_ATTACK_ITEM_SUCCESS_MESSAGES) - result_message = message_template.format(attacker=attacker_name, target=target_name, item_name_cn=item_name_cn) - logger_duel.info(f"偷道具成功: {attacker_name} 偷取了 {target_name} 的 {item_stolen}") - # 偷到道具直接返回,不再执行后面的失败逻辑 - return result_message - else: - # 如果因为某种原因(例如并发问题),刚才还有道具现在没了 - logger_duel.warning(f"尝试偷取 {target_name} 道具时发现其道具列表为空,虽然 total_items_count > 0。") - # 这里会继续执行下面的通用失败逻辑 - - # --- 偷道具判定失败 或 目标没有任何道具 --- - # (包括 total_items_count 为 0 的情况, 以及 random.random() >= item_steal_prob 的情况) - message_template = random.choice(SNEAK_ATTACK_FAILURE_MESSAGES) - result_message = message_template.format(attacker=attacker_name, target=target_name) - if total_items_count == 0: - logger_duel.info(f"偷袭完全失败: {attacker_name} 偷袭 {target_name},且目标没有任何道具。") - else: - logger_duel.info(f"偷袭完全失败: {attacker_name} 偷袭 {target_name},未达到偷道具概率 {item_steal_prob*100:.1f}%。") - return result_message - - except sqlite3.Error as e: - logger_duel.error(f"处理偷袭时发生数据库错误: {e}", exc_info=True) - return f"处理偷袭时发生内部错误: {e}" - except Exception as e: - logger_duel.error(f"处理偷袭时发生未知错误: {e}", exc_info=True) - return f"处理偷袭时发生内部错误: {e}" - diff --git a/function_call_agent_flow.md b/function_call_agent_flow.md new file mode 100644 index 0000000..083c28e --- /dev/null +++ b/function_call_agent_flow.md @@ -0,0 +1,87 @@ +# Function Call Agent 消息流程 + +## 概览 +- 所有传入的微信消息都会先由机器人入口处理,并在路由前写入消息记录(`robot.py:190-274`)。 +- 使用装饰器注册的函数处理器在运行时组成结构化的注册表,供路由器查询(`function_calls/init_handlers.py:1-9`, `function_calls/handlers.py:37-180`)。 +- 路由器优先采用确定性的命令解析,只有在必要时才升级到 LLM 编排循环(`function_calls/router.py:81-200`)。 +- LLM 协调器同时支持原生函数调用模型和基于提示词的回退模式,统一返回 `LLMRunResult`(`function_calls/llm.py:33-186`)。 +- 结构化处理器返回 `FunctionResult`,路由器可以直接发送回复或将 JSON 结果反馈给 LLM(`function_calls/spec.py:12-36`)。 + +## 流程图 +```mermaid +flowchart TD + A[来自 Wcf 的 WxMsg] --> B[Robot.processMsg] + B --> C[MessageSummary 记录消息] + B --> D[_select_model_for_message] + B --> E[_get_specific_history_limit] + B --> F[preprocess → MessageContext] + F --> G{FunctionCallRouter.dispatch} + G -->|直接匹配| H[_check_scope & require_at] + H --> I[_extract_arguments] + I --> J[_invoke_function] + J --> K{FunctionResult.handled?} + K -->|是| L[FunctionResult.dispatch → ctx.send_text] + K -->|否| M[升级到 LLM] + G -->|无直接匹配| M + M --> N[FunctionCallLLM.run] + N --> O{模型是否支持 call_with_functions?} + O -->|是| P[原生工具循环] + P --> Q[调用 call_with_functions] + Q --> R{是否返回 tool call?} + R -->|是| S[_invoke_function → FunctionResult] + S --> T[formatter → 追加工具 JSON] + T --> P + R -->|否| U[模型最终回答] + U --> V[ctx.send_text 最终回复] + P --> W[达到最大轮次] + W --> X[handled = False] + O -->|否| Y[提示词回退] + Y --> Z[get_answer + 解析 JSON] + Z --> AA{action_type == "function"?} + AA -->|是| S + AA -->|否| X + X --> AB[路由返回 False] + AB --> AC[回退:闲聊/帮助路径] +``` + +## 分步说明 +### 1. 消息接入(`robot.py:190-274`) +- 每条消息都会通过 `MessageSummary.process_message_from_wxmsg` 持久化,方便后续上下文检索。 +- 机器人依据群聊/私聊映射与历史限制选择 AI 模型,再构建包含发送者信息与清洗文本的 `MessageContext`。 +- 在进入路由前,Context 会注入当前 `chat` 模型及会话级的历史上限。 + +### 2. 函数注册(`function_calls/init_handlers.py:1-9`) +- 机器人启动时导入 `function_calls.init_handlers`,所有 `@tool_function` 装饰器即被执行。 +- 每个处理器声明名称、描述、JSON Schema、作用域以及是否需要 @,注册表因此具备自描述能力。 + +### 3. 直接命令快路(`function_calls/router.py:81-175`) +- 路由器先对 `ctx.text` 归一化,再走 `_try_direct_command_match` 匹配已知关键词。 +- 作用域、@ 以及(待实现的)权限检查会阻止不符合条件的调用。参数会按 JSON Schema 校验,避免脏数据。 +- 命中后处理器立即执行,`FunctionResult.dispatch` 会直接向聊天对象推送,无需经过模型。 + +### 4. LLM 编排(`function_calls/llm.py:33-186`) +- 若无直接命中,`FunctionCallLLM.run` 会判断当前模型是否支持 OpenAI 风格的工具调用。 +- **原生循环**:协调器不断发送最新对话,执行指定工具,并把结构化 JSON 响应回灌给模型,直到拿到最终回复或达到轮次上限。 +- **提示词回退**:不支持原生工具的模型会收到列出所有函数的 system prompt,必须返回 JSON 决策供路由器执行。 +- 两种路径最终都返回 `LLMRunResult`,路由器据此决定是否直接回复或继续走其他兜底逻辑。 + +### 5. 处理器执行(`function_calls/handlers.py:37-180`) +- 处理器依赖 `function_calls/services` 中的业务封装,统一返回 `FunctionResult`。 +- 群聊场景会在 `at` 字段写入 `ctx.msg.sender`,确保回复时点名原始请求者。 + +### 6. 兜底逻辑(`robot.py:229-273`) +- 当路由返回未处理状态时,机器人会回退到旧流程:自动通过好友请求、发送欢迎消息或调用 `handle_chitchat`。 +- 即使 Function Call 路由失败,整体对话体验依旧有保障。 + +## 优势 +- 对已知命令走直连路径,既避免额外的模型耗时,又能通过 JSON Schema 保证参数质量(`function_calls/router.py:103-175`)。 +- LLM 协调器清晰区分原生工具与提示词回退,后续替换模型时无需大改(`function_calls/llm.py:33-186`)。 +- `FunctionResult` 既可直接回复,也能作为工具输出反馈给模型,减少重复实现(`function_calls/spec.py:12-36`)。 + +## 仍需关注的点 +- 进入 LLM 流程后,工具输出依赖模型二次组织文本;关键函数可考虑直接派发 `FunctionResult`,避免模型返回空字符串时用户无感知(`function_calls/llm.py:83-136`)。 +- 天气命令的直接路径默认关键词与城市之间存在空格;若要支持“天气北京”这类写法,需要放宽解析逻辑(`function_calls/router.py:148-156`)。 +- 权限检查字段(`spec.auth`)仍是占位符,新增高权限工具前需补齐校验实现(`function_calls/router.py:35-38`)。 + +--- +为快速理解全新的 Function Call Agent 流程而生成。 diff --git a/function_call_argument_reference.md b/function_call_argument_reference.md new file mode 100644 index 0000000..cfe048f --- /dev/null +++ b/function_call_argument_reference.md @@ -0,0 +1,105 @@ +# Function Call 参数调用参考 + +## 调用流程概览 +- Function Call 路由器会将所有消息交给 `FunctionCallLLM.run`,模型基于已注册的函数规格选择要调用的工具(`function_calls/router.py:47`)。 +- 每个处理器都通过 `@tool_function` 装饰器注册,它会读取参数类型注解并生成 JSON Schema,用于 LLM 约束和运行期验证(`function_calls/handlers.py:24`, `function_calls/registry.py:64`)。 +- LLM 返回的参数会在 `_create_args_instance` 中转为对应的 Pydantic 模型;若字段缺失或类型不符则会抛错并被记录(`function_calls/router.py:147`)。 + +下表总结了当前仍启用的 5 个函数及其参数 Schema 与业务用途。 +所有关键字段已通过 Pydantic `Field(..., description=...)` 写入属性说明,LLM 在工具定义中可以读取这些 `description` 以理解含义。 + +## 函数明细 + +### reminder_set +- **Handler**:`function_calls/handlers.py:24` +- **模型**:`ReminderArgs`(`function_calls/models.py:19`) +- **Schema 摘要**: + ```json + { + "type": "object", + "required": ["type", "time", "content"], + "properties": { + "type": { + "type": "string", + "enum": ["once", "daily", "weekly"], + "description": "提醒类型:once=一次性提醒,daily=每天,weekly=每周" + }, + "time": { + "type": "string", + "description": "提醒时间。once 使用 'YYYY-MM-DD HH:MM',daily/weekly 使用 'HH:MM'" + }, + "content": { + "type": "string", + "description": "提醒内容,将直接发送给用户" + }, + "weekday": { + "type": ["integer", "null"], + "default": null, + "description": "当 type=weekly 时的星期索引,0=周一 … 6=周日" + } + } + } + ``` +- **参数含义**: + - `type`:提醒频率。一次性(`once`)用于绝对时间点;`daily` 为每天定时;`weekly` 需要同时给出星期索引。 + - `time`:触发时间。一次性提醒要求未来时间,格式 `YYYY-MM-DD HH:MM`;每日/每周提醒使用 `HH:MM` 24 小时制。 + - `content`:提醒内容文本,最终会被发送给触发对象。 + - `weekday`:仅在 `type=weekly` 时有效,使用 0-6 表示周一到周日。 +- **调用方式**:处理器从 `args.model_dump()` 得到完整参数字典并传给提醒服务的 `create_reminder`(`function_calls/handlers.py:37`)。`weekday` 仅在 `type="weekly"` 时使用;未提供值时默认为 `null`。 + +### reminder_list +- **Handler**:`function_calls/handlers.py:46` +- **模型**:`ReminderListArgs`(`function_calls/models.py:28`) +- **Schema 摘要**:空对象,没有任何必填字段。 +- **调用方式**:函数只验证请求体为空对象,然后调用 `list_reminders` 读取当前用户/群的提醒列表(`function_calls/handlers.py:59`)。 + +### reminder_delete +- **Handler**:`function_calls/handlers.py:63` +- **模型**:`ReminderDeleteArgs`(`function_calls/models.py:33`) +- **Schema 摘要**: + ```json + { + "type": "object", + "required": ["reminder_id"], + "properties": { + "reminder_id": { + "type": "string", + "description": "提醒列表中的 ID(日志或界面可显示前几位帮助用户复制)" + } + } + } + ``` +- **参数含义**:`reminder_id` 即提醒列表返回的唯一 ID,用于定位要删除的记录。 +- **调用方式**:直接读取 `args.reminder_id` 并转交给 `delete_reminder` 执行删除逻辑(`function_calls/handlers.py:76`)。 + +### perplexity_search +- **Handler**:`function_calls/handlers.py:80` +- **模型**:`PerplexityArgs`(`function_calls/models.py:39`) +- **Schema 摘要**: + ```json + { + "type": "object", + "required": ["query"], + "properties": { + "query": { + "type": "string", + "description": "要搜索的问题或主题" + } + } + } + ``` +- **参数含义**:`query` 即用户想让 Perplexity 查找的自然语言问题或主题。 +- **调用方式**:使用 `args.query` 调用 `run_perplexity`,若外部服务已自行回复则返回空消息,否则把搜索结果转成回复文本(`function_calls/handlers.py:87`)。 + +### summary +- **Handler**:`function_calls/handlers.py:96` +- **模型**:`SummaryArgs`(`function_calls/models.py:49`) +- **Schema 摘要**:空对象,无需参数。 +- **调用方式**:只要模型选择该函数即执行 `summarize_messages(ctx)`,返回群聊近期消息总结(`function_calls/handlers.py:104`)。 + +## 运行时验证逻辑 +1. **模型侧约束**:`FunctionCallLLM` 会把上述 Schema 转成 OpenAI 样式的工具定义,使模型在生成参数时遵循字段、类型与枚举限制(`function_calls/llm.py:112`)。 +2. **路由二次校验**:收到工具调用后,路由器会再次调用 `validate_arguments` 确认必填字段存在且类型正确(`function_calls/llm.py:215`)。 +3. **Pydantic 转换**:最后在 `_create_args_instance` 中实例化 Pydantic 模型,确保所有字段通过严格校验并可在 handler 中以 `args` 访问(`function_calls/router.py:147`)。 + +若未来新增函数,只需为 handler 的 `args` 参数提供 Pydantic 模型或 dataclass,装饰器会自动生成相应 Schema 并同步到本流程中。 diff --git a/function_calls/handlers.py b/function_calls/handlers.py index 6f314ed..c31b611 100644 --- a/function_calls/handlers.py +++ b/function_calls/handlers.py @@ -1,64 +1,25 @@ """Function Call handlers built on top of structured services.""" from __future__ import annotations -import logging from commands.context import MessageContext from .models import ( - WeatherArgs, - NewsArgs, ReminderArgs, ReminderListArgs, ReminderDeleteArgs, PerplexityArgs, - HelpArgs, SummaryArgs, - ClearMessagesArgs, - InsultArgs, ) from .registry import tool_function from .spec import FunctionResult from .services import ( - build_help_text, - build_insult, - clear_group_messages, create_reminder, delete_reminder, - get_news_digest, - get_weather_report, list_reminders, run_perplexity, summarize_messages, ) -logger = logging.getLogger(__name__) - - -@tool_function( - name="weather_query", - description="查询城市天气预报", - examples=["北京天气怎么样", "上海天气", "深圳明天会下雨吗"], - scope="both", - require_at=True, -) -def handle_weather(ctx: MessageContext, args: WeatherArgs) -> FunctionResult: - result = get_weather_report(args.city) - at = ctx.msg.sender if ctx.is_group else "" - return FunctionResult(handled=True, messages=[result.message], at=at if at else "") - - -@tool_function( - name="news_query", - description="获取今日新闻", - examples=["看看今天的新闻", "今日要闻", "新闻"], - scope="both", - require_at=True, -) -def handle_news(ctx: MessageContext, args: NewsArgs) -> FunctionResult: - result = get_news_digest() - at = ctx.msg.sender if ctx.is_group else "" - return FunctionResult(handled=True, messages=[result.message], at=at if at else "") - @tool_function( name="reminder_set", @@ -68,7 +29,7 @@ def handle_news(ctx: MessageContext, args: NewsArgs) -> FunctionResult: require_at=True, ) def handle_reminder_set(ctx: MessageContext, args: ReminderArgs) -> FunctionResult: - manager = getattr(ctx.robot, 'reminder_manager', None) + manager = getattr(ctx.robot, "reminder_manager", None) at = ctx.msg.sender if ctx.is_group else "" if not manager: return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at) @@ -90,7 +51,7 @@ def handle_reminder_set(ctx: MessageContext, args: ReminderArgs) -> FunctionResu require_at=True, ) def handle_reminder_list(ctx: MessageContext, args: ReminderListArgs) -> FunctionResult: - manager = getattr(ctx.robot, 'reminder_manager', None) + manager = getattr(ctx.robot, "reminder_manager", None) at = ctx.msg.sender if ctx.is_group else "" if not manager: return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at) @@ -107,7 +68,7 @@ def handle_reminder_list(ctx: MessageContext, args: ReminderListArgs) -> Functio require_at=True, ) def handle_reminder_delete(ctx: MessageContext, args: ReminderDeleteArgs) -> FunctionResult: - manager = getattr(ctx.robot, 'reminder_manager', None) + manager = getattr(ctx.robot, "reminder_manager", None) at = ctx.msg.sender if ctx.is_group else "" if not manager: return FunctionResult(handled=True, messages=["❌ 内部错误:提醒管理器未初始化。"], at=at) @@ -132,18 +93,6 @@ def handle_perplexity_search(ctx: MessageContext, args: PerplexityArgs) -> Funct return FunctionResult(handled=True, messages=service_result.messages, at=at if at else "") -@tool_function( - name="help", - description="显示机器人帮助信息", - examples=["help", "帮助", "指令"], - scope="both", - require_at=False, -) -def handle_help(ctx: MessageContext, args: HelpArgs) -> FunctionResult: - help_text = build_help_text() - return FunctionResult(handled=True, messages=[help_text]) - - @tool_function( name="summary", description="总结群聊最近的消息", @@ -154,27 +103,3 @@ def handle_help(ctx: MessageContext, args: HelpArgs) -> FunctionResult: def handle_summary(ctx: MessageContext, args: SummaryArgs) -> FunctionResult: result = summarize_messages(ctx) return FunctionResult(handled=True, messages=[result.message]) - - -@tool_function( - name="clear_messages", - description="清除群聊历史消息记录", - examples=["clearmessages", "清除历史"], - scope="group", - require_at=True, -) -def handle_clear_messages(ctx: MessageContext, args: ClearMessagesArgs) -> FunctionResult: - result = clear_group_messages(ctx) - return FunctionResult(handled=True, messages=[result.message]) - - -@tool_function( - name="insult", - description="骂指定用户(仅限群聊)", - examples=["骂一下@某人"], - scope="group", - require_at=True, -) -def handle_insult(ctx: MessageContext, args: InsultArgs) -> FunctionResult: - result = build_insult(ctx, args.target_user) - return FunctionResult(handled=True, messages=[result.message]) diff --git a/function_calls/models.py b/function_calls/models.py index 87bada2..6c2039d 100644 --- a/function_calls/models.py +++ b/function_calls/models.py @@ -3,7 +3,7 @@ Function Call 参数模型定义 """ from typing import Literal, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field class WeatherArgs(BaseModel): @@ -19,10 +19,17 @@ class NewsArgs(BaseModel): class ReminderArgs(BaseModel): """设置提醒参数""" - type: Literal["once", "daily", "weekly"] - time: str - content: str - weekday: Optional[int] = None + type: Literal["once", "daily", "weekly"] = Field( + ..., description="提醒类型:once=一次性提醒,daily=每天,weekly=每周" + ) + time: str = Field( + ..., description="提醒时间。once 使用 'YYYY-MM-DD HH:MM',daily/weekly 使用 'HH:MM'" + ) + content: str = Field(..., description="提醒内容,将直接发送给用户") + weekday: Optional[int] = Field( + default=None, + description="当 type=weekly 时的星期索引,0=周一 … 6=周日", + ) class ReminderListArgs(BaseModel): @@ -33,12 +40,13 @@ class ReminderListArgs(BaseModel): class ReminderDeleteArgs(BaseModel): """删除提醒参数""" - reminder_id: str + reminder_id: str = Field(..., description="提醒列表中的 ID(前端可展示前几位)") class PerplexityArgs(BaseModel): """Perplexity搜索参数""" - query: str + + query: str = Field(..., description="要搜索的问题或主题") class HelpArgs(BaseModel): diff --git a/function_calls/router.py b/function_calls/router.py index c94decd..23e2f56 100644 --- a/function_calls/router.py +++ b/function_calls/router.py @@ -1,7 +1,7 @@ """Function Call 路由器""" import logging -from typing import Any, Dict, Optional +from typing import Any, Dict from commands.context import MessageContext @@ -39,45 +39,6 @@ class FunctionCallRouter: return True - def _try_direct_command_match(self, ctx: MessageContext) -> Optional[str]: - """ - 尝试直接命令匹配,避免不必要的LLM调用 - - 返回匹配的函数名,如果没有匹配则返回None - """ - text = ctx.text.strip().lower() - - # 定义一些明确的命令关键字映射 - direct_commands = { - "help": "help", - "帮助": "help", - "指令": "help", - "新闻": "news_query", - "summary": "summary", - "总结": "summary", - "clearmessages": "clear_messages", - "清除历史": "clear_messages" - } - - # 检查完全匹配 - if text in direct_commands: - return direct_commands[text] - - # 检查以特定前缀开头的命令 - if text.startswith("ask ") and len(text) > 4: - return "perplexity_search" - - if text.startswith("天气") or text.startswith("天气预报"): - return "weather_query" - - if text in ["查看提醒", "我的提醒", "提醒列表"]: - return "reminder_list" - - if text.startswith("骂一下"): - return "insult" - - return None - def dispatch(self, ctx: MessageContext) -> bool: """ 分发消息到函数处理器 @@ -100,26 +61,7 @@ class FunctionCallRouter: self.logger.warning("没有注册任何函数") return False - # 第一步:尝试直接命令匹配 - direct_function = self._try_direct_command_match(ctx) - if direct_function and direct_function in functions: - spec = functions[direct_function] - - if not self._check_scope_and_permissions(ctx, spec): - return False - - arguments = self._extract_arguments_for_direct_command(ctx, direct_function) - if not self.llm.validate_arguments(arguments, spec.parameters_schema): - self.logger.warning(f"直接命令 {direct_function} 参数验证失败") - return False - - result = self._invoke_function(ctx, spec, arguments) - if result.handled: - result.dispatch(ctx) - return True - # 如果没有处理成功,继续尝试LLM流程 - - # 第二步:使用LLM执行多轮函数调用 + # 使用 LLM 执行函数调用流程 llm_result = self.llm.run( ctx, functions, @@ -141,38 +83,6 @@ class FunctionCallRouter: self.logger.error(f"FunctionCallRouter dispatch 异常: {e}") return False - def _extract_arguments_for_direct_command(self, ctx: MessageContext, function_name: str) -> Dict[str, Any]: - """为直接命令提取参数""" - text = ctx.text.strip() - - if function_name == "weather_query": - # 提取城市名 - if text.startswith("天气预报 "): - city = text[4:].strip() - elif text.startswith("天气 "): - city = text[3:].strip() - else: - city = "" - return {"city": city} - - elif function_name == "perplexity_search": - # 提取搜索查询 - if text.startswith("ask "): - query = text[4:].strip() - else: - query = text - return {"query": query} - - elif function_name == "insult": - # 提取要骂的用户 - import re - match = re.search(r"骂一下\s*@([^\s@]+)", text) - target_user = match.group(1) if match else "" - return {"target_user": target_user} - - # 对于不需要参数的函数,返回空字典 - return {} - def _invoke_function(self, ctx: MessageContext, spec: FunctionSpec, arguments: Dict[str, Any]) -> FunctionResult: """调用函数处理器,返回结构化结果""" try: diff --git a/function_calls/services/__init__.py b/function_calls/services/__init__.py index c340c18..e62ec68 100644 --- a/function_calls/services/__init__.py +++ b/function_calls/services/__init__.py @@ -1,22 +1,13 @@ """Service helpers for Function Call handlers.""" -from .weather import get_weather_report -from .news import get_news_digest from .reminder import create_reminder, list_reminders, delete_reminder -from .help import build_help_text -from .group_tools import summarize_messages, clear_group_messages +from .group_tools import summarize_messages from .perplexity import run_perplexity -from .insult import build_insult __all__ = [ - "get_weather_report", - "get_news_digest", "create_reminder", "list_reminders", "delete_reminder", - "build_help_text", "summarize_messages", - "clear_group_messages", "run_perplexity", - "build_insult", ] diff --git a/function_calls/services/help.py b/function_calls/services/help.py index 3de91db..46d8910 100644 --- a/function_calls/services/help.py +++ b/function_calls/services/help.py @@ -10,14 +10,6 @@ HELP_LINES = [ "- 新闻", "- ask [问题]", "", - "【决斗 & 偷袭】", - "- 决斗@XX", - "- 偷袭@XX", - "- 决斗排行/排行榜", - "- 我的战绩/决斗战绩", - "- 我的装备/查看装备", - "- 改名 [旧名] [新名]", - "", "【提醒】", "- 提醒xxxxx:一次性、每日、每周", "- 查看提醒/我的提醒/提醒列表", diff --git a/image/__init__.py b/image/__init__.py index 185416e..6688d6a 100644 --- a/image/__init__.py +++ b/image/__init__.py @@ -1,11 +1,5 @@ -"""图像生成功能模块 - -包含以下功能: -- AliyunImage: 阿里云文生图 -- GeminiImage: 谷歌Gemini文生图 -""" +"""图像生成功能模块""" from .img_aliyun_image import AliyunImage -from .img_gemini_image import GeminiImage -__all__ = ['AliyunImage', 'GeminiImage'] \ No newline at end of file +__all__ = ['AliyunImage'] diff --git a/image/img_gemini_image.py b/image/img_gemini_image.py deleted file mode 100644 index 708a2bd..0000000 --- a/image/img_gemini_image.py +++ /dev/null @@ -1,113 +0,0 @@ -#! /usr/bin/env python3 -# -*- coding: utf-8 -*- - -import logging -import os -import mimetypes -import time -import random -from google import genai -from google.genai import types - -class GeminiImage: - """谷歌AI画图API调用 - """ - - def __init__(self, config={}) -> None: - self.LOG = logging.getLogger("GeminiImage") - - self.enable = config.get("enable", True) - self.api_key = config.get("api_key", "") or os.environ.get("GEMINI_API_KEY", "") - self.model = config.get("model", "gemini-2.0-flash-exp-image-generation") - self.proxy = config.get("proxy", "") - - project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - self.temp_dir = config.get("temp_dir", os.path.join(project_dir, "geminiimg")) - - if not os.path.exists(self.temp_dir): - os.makedirs(self.temp_dir) - - if not self.api_key: - self.enable = False - return - - try: - # 设置代理 - if self.proxy: - os.environ["HTTP_PROXY"] = self.proxy - os.environ["HTTPS_PROXY"] = self.proxy - - # 初始化客户端 - self.client = genai.Client(api_key=self.api_key) - except Exception: - self.enable = False - - def generate_image(self, prompt: str) -> str: - """生成图像并返回图像文件路径 - """ - try: - # 设置代理 - if self.proxy: - os.environ["HTTP_PROXY"] = self.proxy - os.environ["HTTPS_PROXY"] = self.proxy - - image_prompt = f"生成一张高质量的图片: {prompt}。请直接提供图像,不需要描述。" - - # 发送请求 - response = self.client.models.generate_content( - model=self.model, - contents=image_prompt, - config=types.GenerateContentConfig( - response_modalities=['Text', 'Image'] - ) - ) - - # 处理响应 - if hasattr(response, 'candidates') and response.candidates: - for candidate in response.candidates: - if hasattr(candidate, 'content') and candidate.content: - for part in candidate.content.parts: - if hasattr(part, 'inline_data') and part.inline_data: - # 保存图像 - file_name = f"gemini_image_{int(time.time())}_{random.randint(1000, 9999)}" - file_extension = mimetypes.guess_extension(part.inline_data.mime_type) or ".png" - file_path = os.path.join(self.temp_dir, f"{file_name}{file_extension}") - - with open(file_path, "wb") as f: - f.write(part.inline_data.data) - - return file_path - - # 如果没有找到图像,尝试获取文本响应 - try: - text_content = response.text - if text_content: - return f"模型未能生成图像: {text_content[:100]}..." - except (AttributeError, TypeError): - pass - - return "图像生成失败,可能需要更新模型或调整提示词" - - except Exception as e: - error_str = str(e) - self.LOG.error(f"图像生成出错: {error_str}") - - # 处理500错误 - if "500 INTERNAL" in error_str: - self.LOG.error("遇到谷歌服务器内部错误") - return "谷歌AI服务器临时故障,请稍后再试。这是谷歌服务器的问题,不是你的请求有误。" - - if "timeout" in error_str.lower(): - return "图像生成超时,请检查网络或代理设置" - - if "violated" in error_str.lower() or "policy" in error_str.lower(): - return "请求包含违规内容,无法生成图像" - - # 其他常见错误类型处理 - if "quota" in error_str.lower() or "rate" in error_str.lower(): - return "API使用配额已用尽或请求频率过高,请稍后再试" - - if "authentication" in error_str.lower() or "auth" in error_str.lower(): - return "API密钥验证失败,请联系管理员检查配置" - - return f"图像生成失败,错误原因: {error_str.split('.')[-1] if '.' in error_str else error_str}" \ No newline at end of file diff --git a/image/img_manager.py b/image/img_manager.py index b6a45af..4147112 100644 --- a/image/img_manager.py +++ b/image/img_manager.py @@ -5,7 +5,7 @@ import shutil import time from wcferry import Wcf from configuration import Config -from image import AliyunImage, GeminiImage +from image import AliyunImage class ImageGenerationManager: @@ -30,22 +30,9 @@ class ImageGenerationManager: # 初始化图像生成服务 self.aliyun_image = None - self.gemini_image = None self.LOG.info("开始初始化图像生成服务...") - # 初始化Gemini图像生成服务 - try: - if hasattr(self.config, 'GEMINI_IMAGE'): - self.gemini_image = GeminiImage(self.config.GEMINI_IMAGE) - else: - self.gemini_image = GeminiImage({}) - - if getattr(self.gemini_image, 'enable', False): - self.LOG.info("谷歌Gemini图像生成功能已启用") - except Exception as e: - self.LOG.error(f"初始化谷歌Gemini图像生成服务失败: {e}") - # 初始化AliyunImage服务 if hasattr(self.config, 'ALIYUN_IMAGE') and self.config.ALIYUN_IMAGE.get('enable', False): try: @@ -56,7 +43,7 @@ class ImageGenerationManager: def handle_image_generation(self, service_type, prompt, receiver, at_user=None): """处理图像生成请求的通用函数 - :param service_type: 服务类型,'aliyun'/'gemini' + :param service_type: 服务类型,当前支持 'aliyun' :param prompt: 图像生成提示词 :param receiver: 接收者ID :param at_user: 被@的用户ID,用于群聊 @@ -78,13 +65,6 @@ class ImageGenerationManager: wait_message = "当前模型为阿里V1模型,生成速度非常慢,可能需要等待较长时间,请耐心等候..." else: wait_message = "正在生成图像,请稍等..." - elif service_type == 'gemini': - if not self.gemini_image or not getattr(self.gemini_image, 'enable', False): - self.send_text("谷歌文生图服务未启用", receiver, at_user) - return True - - service = self.gemini_image - wait_message = "正在通过谷歌AI生成图像,请稍等..." else: self.LOG.error(f"未知的图像生成服务类型: {service_type}") return False @@ -93,12 +73,11 @@ class ImageGenerationManager: self.send_text(wait_message, receiver, at_user) image_url = service.generate_image(prompt) - + if image_url and (image_url.startswith("http") or os.path.exists(image_url)): try: self.LOG.info(f"开始处理图片: {image_url}") - # 谷歌API直接返回本地文件路径,无需下载 - image_path = image_url if service_type == 'gemini' else service.download_image(image_url) + image_path = service.download_image(image_url) if image_path: # 创建一个临时副本,避免文件占用问题 @@ -167,4 +146,4 @@ class ImageGenerationManager: else: self.LOG.error(f"无法删除文件 {file_path} 经过 {max_retries} 次尝试: {str(e)}") - return False \ No newline at end of file + return False diff --git a/image/文生图功能的使用说明.MD b/image/文生图功能的使用说明.MD index 3ef30cf..dec71f6 100644 --- a/image/文生图功能的使用说明.MD +++ b/image/文生图功能的使用说明.MD @@ -1,58 +1,38 @@ # 图像生成配置说明 -#### 文生图相关功能的加入,可在此说明文件内加入贡献者的GitHub链接,方便以后的更新,以及BUG的修改! - +#### 文生图相关功能的加入,可在此说明文件内加入贡献者的 GitHub 链接,方便以后的更新,以及 BUG 的修改! 智谱AI绘画:[JiQingzhe2004 (JiQingzhe)](https://github.com/JiQingzhe2004) 阿里云AI绘画:[JiQingzhe2004 (JiQingzhe)](https://github.com/JiQingzhe2004) -谷歌AI绘画:[JiQingzhe2004 (JiQingzhe)](https://github.com/JiQingzhe2004) - ------ -在`config.yaml`中进行以下配置才可以调用: +在 `config.yaml` 中进行以下配置才可以调用: ```yaml - aliyun_image: # -----如果要使用阿里云文生图,取消下面的注释并填写相关内容,模型到阿里云百炼找通义万相-文生图2.1-Turbo----- enable: true # 是否启用阿里文生图功能,false为关闭,默认开启,如果未配置,则会将消息发送给聊天大模型 - api_key: sk-xxxxxxxxxxxxxxxxxxxxxxxx # 替换为你的DashScope API密钥 - model: wanx2.1-t2i-turbo # 模型名称,默认使用wanx2.1-t2i-turbo(快),wanx2.1-t2i-plus(中),wanx-v1(慢),会给用户不同的提示! + api_key: sk-xxxxxxxxxxxxxxxxxxxxxxxx # 替换为你的 DashScope API 密钥 + model: wanx2.1-t2i-turbo # 模型名称,默认使用 wanx2.1-t2i-turbo(快), wanx2.1-t2i-plus(中), wanx-v1(慢) size: 1024*1024 # 图像尺寸,格式为宽*高 n: 1 # 生成图像的数量 temp_dir: ./temp # 临时文件存储路径 - trigger_keyword: 牛阿里 # 触发词,默认为"牛阿里" + trigger_keyword: 牛阿里 # 触发词,默认为 "牛阿里" fallback_to_chat: true # 当未启用绘画功能时:true=将请求发给聊天模型处理,false=回复固定的未启用提示信息 - -gemini_image: # -----谷歌AI画图配置这行不填----- - enable: true # 是否启用谷歌AI画图功能 - api_key: your-api-key-here # 谷歌Gemini API密钥,必填 - model: gemini-2.0-flash-exp-image-generation # 模型名称,建议保持默认,只有这一个模型可以进行绘画 - temp_dir: ./geminiimg # 图片保存目录,可选 - trigger_keyword: 牛谷歌 # 触发词,默认为"牛谷歌" - fallback_to_chat: false # 当未启用绘画功能时:true=将请求发给聊天模型处理,false=回复固定的未启用提示信息 ``` -## 如何获取API密钥 - -1. 访问 [Google AI Studio](https://aistudio.google.com/) -2. 创建一个账号或登录 -3. 访问 [API Keys](https://aistudio.google.com/app/apikeys) 页面 -4. 创建一个新的API密钥 -5. 复制API密钥并填入配置文件 - ## 使用方法 -直接发送消息或在群聊中@机器人,使用触发词加提示词,例如: +直接发送消息或在群聊中 @机器人,使用触发词加提示词,例如: -# 单人聊天的使用 +### 单人聊天示例 ``` 牛智谱 一只可爱的猫咪在阳光下玩耍 牛阿里 一只可爱的猫咪在阳光下玩耍 -牛谷歌 一只可爱的猫咪在阳光下玩耍 ``` -## 群组的使用方法 + +### 群组示例 ``` @ 牛图图 一只可爱的猫咪在阳光下玩耍 diff --git a/requirements.txt b/requirements.txt index a7f5eb0..c041dec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,4 @@ pillow jupyter_client zhdate ipykernel -google-generativeai>=0.3.0 dashscope -google-genai \ No newline at end of file diff --git a/robot.py b/robot.py index 20ee095..7ac2055 100644 --- a/robot.py +++ b/robot.py @@ -9,18 +9,15 @@ from threading import Thread import os import random import shutil -from image import AliyunImage, GeminiImage from image.img_manager import ImageGenerationManager from wcferry import Wcf, WxMsg from ai_providers.ai_chatgpt import ChatGPT from ai_providers.ai_deepseek import DeepSeek -from ai_providers.ai_gemini import Gemini from ai_providers.ai_perplexity import Perplexity from function.func_weather import Weather from function.func_news import News -from function.func_duel import start_duel, get_rank_list, get_player_stats, change_player_name, DuelManager, attempt_sneak_attack from function.func_summary import MessageSummary # 导入新的MessageSummary类 from function.func_reminder import ReminderManager # 导入ReminderManager类 from configuration import Config @@ -52,8 +49,6 @@ class Robot(Job): self.wxid = self.wcf.get_self_wxid() # 获取机器人自己的wxid self.allContacts = self.getAllContacts() self._msg_timestamps = [] - self.duel_manager = DuelManager(self.sendDuelMsg) - try: db_path = "data/message_history.db" # 使用 getattr 安全地获取 MAX_HISTORY,如果不存在则默认为 300 @@ -95,19 +90,6 @@ class Robot(Job): except Exception as e: self.LOG.error(f"初始化 DeepSeek 模型时出错: {str(e)}") - # 初始化Gemini - if Gemini.value_check(self.config.GEMINI): - try: - # 传入 message_summary 和 wxid - self.chat_models[ChatType.GEMINI.value] = Gemini( - self.config.GEMINI, - message_summary_instance=self.message_summary, - bot_wxid=self.wxid - ) - self.LOG.info(f"已加载 Gemini 模型") - except Exception as e: - self.LOG.error(f"初始化 Gemini 模型时出错: {str(e)}") - # 初始化Perplexity if Perplexity.value_check(self.config.PERPLEXITY): self.chat_models[ChatType.PERPLEXITY.value] = Perplexity(self.config.PERPLEXITY) @@ -417,36 +399,12 @@ class Robot(Job): for r in receivers: self.sendTextMsg(report, r) - def sendDuelMsg(self, msg: str, receiver: str) -> None: - """发送决斗消息,不受消息频率限制,不记入历史记录 - :param msg: 消息字符串 - :param receiver: 接收人wxid或者群id - """ - try: - self.wcf.send_text(f"{msg}", receiver, "") - except Exception as e: - self.LOG.error(f"发送决斗消息失败: {e}") - def cleanup_perplexity_threads(self): """清理所有Perplexity线程""" # 如果已初始化Perplexity实例,调用其清理方法 perplexity_instance = self.get_perplexity_instance() if perplexity_instance: perplexity_instance.cleanup() - - # 检查并等待决斗线程结束 - if hasattr(self, 'duel_manager') and self.duel_manager.is_duel_running(): - self.LOG.info("等待决斗线程结束...") - # 最多等待5秒 - for i in range(5): - if not self.duel_manager.is_duel_running(): - break - time.sleep(1) - - if self.duel_manager.is_duel_running(): - self.LOG.warning("决斗线程在退出时仍在运行") - else: - self.LOG.info("决斗线程已结束") def cleanup(self): """清理所有资源,在程序退出前调用"""