diff --git a/README.MD b/README.MD index dfbee86..2f524da 100644 --- a/README.MD +++ b/README.MD @@ -64,7 +64,6 @@ Bubbles 是一个功能丰富的微信机器人框架,基于 [wcferry](https:/ - 支持为不同的群聊和私聊设置不同的 AI 模型和 system prompt - OpenAI (ChatGPT) - DeepSeek - - Gemini #### 🛠️ 智能路由系统 - 基于 AI 的意图识别,无需记住特定命令格式 diff --git a/ai_providers/ai_gemini.py b/ai_providers/ai_gemini.py deleted file mode 100644 index 56e4850..0000000 --- a/ai_providers/ai_gemini.py +++ /dev/null @@ -1,411 +0,0 @@ -# ai_providers/ai_gemini.py -#! /usr/bin/env python3 -# -*- coding: utf-8 -*- - -import logging -import os -import time -import httpx -import pathlib # 用于处理文件路径 -import mimetypes # 用于猜测图片类型 -import google.generativeai as genai -from google.generativeai.types import generation_types, safety_types # 显式导入需要的类型 -from google.api_core.exceptions import GoogleAPICallError, ClientError - -# 引入 MessageSummary 类型提示 -try: - from function.func_summary import MessageSummary -except ImportError: - MessageSummary = object # Fallback - -class Gemini: - DEFAULT_MODEL = "gemini-1.5-pro-latest" - DEFAULT_PROMPT = "You are a helpful assistant." - DEFAULT_MAX_HISTORY = 30 - SAFETY_SETTINGS = { # 默认安全设置 - 可根据需要调整或从配置加载 - safety_types.HarmCategory.HARM_CATEGORY_HARASSMENT: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - safety_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - safety_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - safety_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: safety_types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, - } - - def __init__(self, conf: dict, message_summary_instance: MessageSummary = None, bot_wxid: str = None) -> None: - self.LOG = logging.getLogger("Gemini") - self._api_key = conf.get("api_key") - self._model_name = conf.get("model_name", self.DEFAULT_MODEL) - # 存储原始配置的 prompt,用于初始化和可能的重载 - self._base_prompt = conf.get("prompt", self.DEFAULT_PROMPT) - self._proxy = conf.get("proxy") - self.max_history_messages = conf.get("max_history_messages", self.DEFAULT_MAX_HISTORY) - - self.message_summary = message_summary_instance - self.bot_wxid = bot_wxid - self._model = None - self.support_vision = False # 初始化时假设不支持,成功加载模型后再判断 - - if not self._api_key: - self.LOG.error("Gemini API Key 未在配置中提供!") - return # 没有 API Key 无法继续 - - if not self.message_summary: - self.LOG.warning("MessageSummary 实例未提供给 Gemini,上下文功能将不可用!") - if not self.bot_wxid: - self.LOG.warning("bot_wxid 未提供给 Gemini,可能无法正确识别机器人自身消息!") - - try: - # 1. 配置代理 (如果提供) - transport = None - if self._proxy: - try: - transport = httpx.HTTPTransport(proxy=self._proxy) - self.LOG.info(f"Gemini 使用代理: {self._proxy}") - except Exception as proxy_err: - self.LOG.error(f"配置 Gemini 代理失败: {proxy_err}", exc_info=True) - # 代理配置失败,可以选择不使用代理继续或直接失败 - # 这里选择继续,不使用代理 - transport = None - - # 2. 配置 Google AI Client - genai.configure(api_key=self._api_key, transport=transport) - - # 3. 初始化模型 - # 将基础 prompt 作为 system_instruction 传递 - self._model = genai.GenerativeModel( - self._model_name, - system_instruction=self._base_prompt, - safety_settings=self.SAFETY_SETTINGS # 应用安全设置 - ) - self.LOG.info(f"已加载 Gemini 模型") - - # 4. 检查视觉能力 (依赖模型名称的简单检查) - # 注意:更可靠的方式是调用 list_models 并检查支持的方法 - if "vision" in self._model_name or "pro" in self._model_name or "gemini-1.5" in self._model_name or "flash" in self._model_name: - self.support_vision = True - self.LOG.info(f"模型 {self._model_name} 被认为支持视觉能力。") - else: - self.LOG.info(f"模型 {self._model_name} 根据名称判断可能不支持视觉能力。") - - except (GoogleAPICallError, ClientError) as api_error: - self.LOG.error(f"初始化 Gemini 时发生 API 错误: {api_error}", exc_info=True) - self._model = None - except Exception as e: - self.LOG.error(f"初始化 Gemini 时发生未知错误: {e}", exc_info=True) - self._model = None - - def __repr__(self): - return f'Gemini(model={self._model_name}, initialized={self._model is not None})' - - @staticmethod - def value_check(conf: dict) -> bool: - # 只需要 API Key 是必须的 - return bool(conf and conf.get("api_key")) - - def _format_history(self, history: list) -> list: - """将数据库历史消息转换为 Gemini API 的 contents 格式""" - contents = [] - for msg in history: - role = "model" if msg.get("sender_wxid") == self.bot_wxid else "user" - content = msg.get('content', '') - sender_name = msg.get('sender', '未知用户') # 获取发送者名称 - - if content: # 避免添加空内容 - # Gemini 推荐 user role 包含发言者信息,model role 不需要 - if role == "user": - formatted_content = f"[{sender_name}]: {content}" # 添加发送者标记 - contents.append({'role': role, 'parts': [{'text': formatted_content}]}) - else: # role == "model" - contents.append({'role': role, 'parts': [{'text': content}]}) - return contents - - def _generate_response(self, contents: list, generation_config_override: generation_types.GenerationConfig | None = None) -> str: - """内部方法,用于调用 Gemini API 并处理响应""" - if not self._model: - return "Gemini 模型未成功初始化,请检查配置和网络。" - - # 配置生成参数 (可以从 config 中读取更多参数) - # 默认使用适中的温度,可以根据需要调整 - default_config = generation_types.GenerationConfig(temperature=0.7) - config_to_use = generation_config_override if generation_config_override else default_config - - self.LOG.debug(f"发送给 Gemini API 的内容条数: {len(contents)}") - # self.LOG.debug(f"使用的 GenerationConfig: {config_to_use}") - # self.LOG.debug(f"发送内容详情: {contents}") # DEBUG: 打印发送内容 - - rsp_text = "" - try: - # 调用 API - response = self._model.generate_content( - contents=contents, - generation_config=config_to_use, - # safety_settings=... # 如果需要覆盖初始化时的安全设置 - stream=False # 非流式响应 - ) - - # 1. 检查 Prompt 是否被阻止 (在获取 candidates 之前) - if response.prompt_feedback and response.prompt_feedback.block_reason: - reason = response.prompt_feedback.block_reason.name - self.LOG.warning(f"Gemini 提示被阻止,原因: {reason}") - return f"抱歉,您的请求因包含不适内容而被阻止 (原因: {reason})。" - - # 2. 检查 Candidates 和 Finish Reason - # 尝试从 response 中安全地提取文本 - try: - rsp_text = response.text # .text 是获取聚合文本的便捷方式 - except ValueError: - # 如果 .text 不可用 (例如,因为 finish_reason 不是 STOP 或 MAX_TOKENS) - # 检查具体的 finish_reason - if response.candidates: - candidate = response.candidates[0] - finish_reason = candidate.finish_reason # 类型是 FinishReason 枚举 - finish_reason_name = finish_reason.name - - if finish_reason == generation_types.FinishReason.SAFETY: - self.LOG.warning(f"Gemini 响应被安全策略阻止。") - # 可以尝试查看 safety_ratings 获取更详细信息 - ratings_info = getattr(candidate, 'safety_ratings', []) - self.LOG.debug(f"Safety Ratings: {ratings_info}") - return "抱歉,生成的响应可能包含不安全内容,已被阻止。" - elif finish_reason == generation_types.FinishReason.RECITATION: - self.LOG.warning(f"Gemini 响应因引用保护被阻止。") - return "抱歉,回答可能包含受版权保护的内容,已被部分阻止。" - elif finish_reason == generation_types.FinishReason.OTHER: - self.LOG.warning(f"Gemini 响应因未知原因停止 (FinishReason: OTHER)") - return "抱歉,生成响应时遇到未知问题。" - else: - # 包括 MAX_TOKENS, STOP 等预期情况,但没有文本 - self.LOG.warning(f"Gemini 未返回文本内容,但完成原因可接受: {finish_reason_name}") - return f"生成内容时遇到问题 (完成原因: {finish_reason_name})" - else: - # 没有 candidates,也没有 prompt block,未知情况 - self.LOG.error("Gemini API 调用成功但未返回任何候选内容或提示反馈。") - return "抱歉,Gemini未能生成响应。" - - except (GoogleAPICallError, ClientError) as api_error: - self.LOG.error(f"Gemini API 调用错误:{api_error}", exc_info=True) - if "API key not valid" in str(api_error): - return "Gemini API 密钥无效或已过期。" - elif "quota" in str(api_error).lower(): - return "Gemini API 调用已达配额限制。" - elif "Model not found" in str(api_error): - return f"配置的 Gemini 模型 '{self._model_name}' 未找到或不可用。" - elif "Resource has been exhausted" in str(api_error): - return "Gemini API 资源耗尽,请稍后再试或检查配额。" - else: - return f"与 Gemini 通信时出错: {type(api_error).__name__}" - except generation_types.StopCandidateException as sce: # 明确捕获这个 - self.LOG.error(f"Gemini API 响应被停止 (StopCandidateException): {sce}", exc_info=True) - # 通常在流式处理中遇到,但也可能在非流式中因某些原因触发 - return "抱歉,Gemini 生成的响应被意外停止了。" - # BlockedPromptException 似乎不直接抛出,而是通过 prompt_feedback 反馈 - # except generation_types.BlockedPromptException as bpe: - # self.LOG.error(f"Gemini API 提示被阻止:{bpe}", exc_info=True) - # return "抱歉,您的请求内容被 Gemini 阻止了。" - except Exception as e: - self.LOG.error(f"调用 Gemini 时发生未知错误: {e}", exc_info=True) - return f"处理您的请求时发生未知错误: {type(e).__name__}" - - return rsp_text.strip() - - def get_answer( - self, - question: str, - wxid: str, - system_prompt_override=None, - specific_max_history=None, - tools=None, - tool_handler=None, - tool_choice=None, - tool_max_iterations: int = 10 - ) -> str: - if not self._model: - return "Gemini 模型未成功初始化,请检查配置和网络。" - - if not question: - self.LOG.warning(f"尝试为 wxid={wxid} 获取答案,但问题为空。") - return "您没有提问哦。" - - if tools: - self.LOG.debug("Gemini 提供的实现暂不支持工具调用,请忽略 tools 参数。") - - # 1. 准备历史消息 - contents = [] - if self.message_summary and self.bot_wxid: - history = self.message_summary.get_messages(wxid) - limit = specific_max_history if specific_max_history is not None else self.max_history_messages - self.LOG.debug(f"获取 Gemini 历史 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit}") - - if limit > 0: - history = history[-limit:] - elif limit == 0: - history = [] # 明确清空历史 - - self.LOG.debug(f"应用限制后 Gemini 历史条数: {len(history)}") - contents.extend(self._format_history(history)) - else: - self.LOG.warning(f"无法为 wxid={wxid} 获取 Gemini 历史记录。") - - # 2. 添加当前用户问题 - # 注意:格式化时已包含发送者信息 - contents.append({'role': 'user', 'parts': [{'text': question}]}) - - # 3. 处理 System Prompt Override (如果提供) - # 注意:Gemini API 目前不直接支持在 generate_content 中覆盖 system_instruction - # 如果需要动态改变系统提示,通常需要重新初始化模型或在用户消息前插入一条 'user' role 的指令 - # 这里我们暂时忽略 system_prompt_override,因为标准 API 调用不支持 - if system_prompt_override: - self.LOG.warning("Gemini API 当前不支持单次请求覆盖系统提示,将使用初始化时的提示。") - # 可以考虑在这里将 override 的内容作为一条 user message 添加到 contents 开头 - # 例如: contents.insert(0, {'role': 'user', 'parts': [{'text': f"[System Instruction Override]: {system_prompt_override}"}]}) - # 但这会影响对话历史的结构,需要谨慎使用 - - # 4. 添加当前时间信息(可选,作为用户消息的一部分) - now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # 可以将时间信息添加到最近的用户消息中,或作为一条新的 user message - # 为了简单,暂不自动添加时间信息到内容中,如果需要,可以在 prompt 中说明 - - # 5. 调用内部生成方法 - return self._generate_response(contents) - - def get_image_description(self, image_path: str, prompt: str = "请详细描述这张图片中的内容") -> str: - if not self._model: - return "Gemini 模型未初始化。" - if not self.support_vision: - return f"当前 Gemini 模型 '{self._model_name}' 不支持图片理解。" - - image_path_obj = pathlib.Path(image_path) - if not image_path_obj.is_file(): - self.LOG.error(f"图片文件不存在或不是文件: {image_path}") - return "无法读取图片文件" - - try: - # 猜测 MIME 类型 - mime_type, _ = mimetypes.guess_type(image_path_obj) - if not mime_type or not mime_type.startswith("image/"): - self.LOG.warning(f"无法确定图片 MIME 类型或类型不是 image/*: {image_path}, 猜测为 jpeg") - mime_type = "image/jpeg" # 使用默认值 - - self.LOG.info(f"使用 Gemini 分析图片: {image_path} (MIME: {mime_type})") - - # 使用 pathlib 生成 file URI - image_uri = image_path_obj.absolute().as_uri() - - image_part = {'mime_type': mime_type, 'data': image_path_obj.read_bytes()} - - # 构建包含文本提示和图片的消息 - contents = [ - # Gemini 处理多模态输入时,推荐 prompt 和 image 都在 user role 的 parts 里 - {'role': 'user', 'parts': [ - {'text': prompt}, - image_part - # 或者使用 from_uri (如果 API 支持且网络可访问该文件 URI): - # genai.types.Part.from_uri(mime_type=mime_type, uri=image_uri) - # 使用原始字节通常更可靠 - ]} - ] - - # 可以为图片分析设置不同的生成参数,例如更低的温度以获得更客观的描述 - image_gen_config = generation_types.GenerationConfig(temperature=0.4) - - # 调用内部生成方法 - return self._generate_response(contents, generation_config_override=image_gen_config) - - except FileNotFoundError: - self.LOG.error(f"读取图片文件时发生 FileNotFoundError: {image_path}", exc_info=True) - return "读取图片文件时出错。" - except Exception as e: - self.LOG.error(f"使用 Gemini 分析图片时发生未知错误: {e}", exc_info=True) - return f"分析图片时发生未知错误: {type(e).__name__}" - -# --- Main 测试部分 --- -if __name__ == "__main__": - print("--- 运行 Gemini 本地测试 ---") - # 配置日志记录 - logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - - # --- 配置加载 --- - # !!! 强烈建议从环境变量或安全的配置文件加载 API Key !!! - # 例如: api_key = os.environ.get("GEMINI_API_KEY") - # 不要将 API Key 硬编码在代码中提交 - api_key_from_env = os.environ.get("GEMINI_API_KEY") - proxy_from_env = os.environ.get("HTTP_PROXY") # 支持 http/https 代理 - - if not api_key_from_env: - print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - print("!!! 警告:环境变量 GEMINI_API_KEY 未设置。请设置该变量。 !!!") - print("!!! 测试将无法连接到 Gemini API。 !!!") - print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") - # 可以选择退出或继续(如果只想测试初始化逻辑) - # exit(1) - api_key_to_use = "DUMMY_KEY_FOR_INIT_TEST" # 仅用于测试初始化日志,无法实际调用 - else: - api_key_to_use = api_key_from_env - - mock_config = { - "api_key": api_key_to_use, - "model_name": "gemini-1.5-flash-latest", # 使用较快的模型测试 - "prompt": "你是一个风趣幽默的AI助手,擅长讲冷笑话。", - "proxy": proxy_from_env, - "max_history_messages": 3 # 测试时减少历史记录 - } - print(f"测试配置: Model={mock_config['model_name']}, Proxy={'已设置' if mock_config['proxy'] else '未设置'}") - - # --- 初始化 Gemini --- - # 在测试中不依赖 MessageSummary - print("\n--- 初始化 Gemini 实例 ---") - gemini_assistant = Gemini(mock_config, bot_wxid="test_bot_wxid") # 提供一个测试 bot_wxid - - # --- 测试文本生成 --- - if gemini_assistant._model: # 检查模型是否成功初始化 - print("\n--- 测试文本生成 (get_answer) ---") - test_question = "你好!今天天气怎么样?给我讲个关于程序员的冷笑话吧。" - print(f"提问: {test_question}") - start_time = time.time() - answer = gemini_assistant.get_answer(test_question, "test_user_wxid") # 提供测试 wxid - end_time = time.time() - print(f"\nGemini 回答 (耗时: {end_time - start_time:.2f}s):\n{answer}") - - # 测试空问题 - print("\n--- 测试空问题 ---") - empty_answer = gemini_assistant.get_answer("", "test_user_wxid") - print(f"空问题回答: {empty_answer}") - - # 测试长对话历史(如果需要,可以手动构建一个 contents 列表来模拟) - # print("\n--- 模拟长对话测试 ---") - # mock_history = [ - # {'role': 'user', 'parts': [{'text': "[UserA]: 第一次提问"}]}, - # {'role': 'model', 'parts': [{'text': "第一次回答"}]}, - # {'role': 'user', 'parts': [{'text': "[UserB]: 第二次提问"}]}, - # {'role': 'model', 'parts': [{'text': "第二次回答"}]}, - # {'role': 'user', 'parts': [{'text': "[UserA]: 第三次提问,关于第一次提问的内容"}]}, - # ] - # mock_history.append({'role': 'user', 'parts': [{'text': "当前的第四个问题"}]}) - # long_hist_answer = gemini_assistant._generate_response(mock_history) - # print(f"长历史回答:\n{long_hist_answer}") - - else: - print("\n--- Gemini 初始化失败,跳过文本生成测试 ---") - - # --- 测试图片描述 (可选) --- - if gemini_assistant._model and gemini_assistant.support_vision: - print("\n--- 测试图片描述 (get_image_description) ---") - # 将 'path/to/your/test_image.jpg' 替换为实际的图片路径 - image_test_path_str = "test_image.jpg" # 假设图片在脚本同目录下 - image_test_path = pathlib.Path(image_test_path_str) - - if image_test_path.exists(): - desc_prompt = "详细描述这张图片里的所有元素和场景氛围。" - print(f"图片路径: {image_test_path.absolute()}") - print(f"描述提示: {desc_prompt}") - start_time = time.time() - description = gemini_assistant.get_image_description(str(image_test_path), desc_prompt) - end_time = time.time() - print(f"\n图片描述 (耗时: {end_time - start_time:.2f}s):\n{description}") - else: - print(f"\n跳过图片测试,测试图片文件未找到: {image_test_path.absolute()}") - print("请将一张名为 test_image.jpg 的图片放在脚本相同目录下进行测试。") - elif gemini_assistant._model: - print(f"\n--- 跳过图片测试,当前模型 {gemini_assistant._model_name} 不支持视觉 ---") - else: - print("\n--- Gemini 初始化失败,跳过图片描述测试 ---") - - print("\n--- Gemini 本地测试结束 ---") diff --git a/commands/handlers.py b/commands/handlers.py index 30d8410..49058b1 100644 --- a/commands/handlers.py +++ b/commands/handlers.py @@ -440,9 +440,20 @@ def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool: tools = [history_lookup_tool] tool_handler = handle_tool_call + system_prompt_override = None + persona_text = getattr(ctx, 'persona', None) + if persona_text and getattr(ctx, 'robot', None): + try: + system_prompt_override = ctx.robot._build_system_prompt(chat_model, persona_text) + except Exception as persona_exc: + if ctx.logger: + ctx.logger.error(f"构建人设系统提示失败: {persona_exc}", exc_info=True) + system_prompt_override = None + rsp = chat_model.get_answer( question=latest_message_prompt, wxid=ctx.get_receiver(), + system_prompt_override=system_prompt_override, specific_max_history=specific_max_history, tools=tools, tool_handler=tool_handler, @@ -562,10 +573,20 @@ def handle_perplexity_ask(ctx: 'MessageContext', match: Optional[Match]) -> bool # 需要调整 get_answer 方法以支持 system_prompt_override 参数 # 这里我们假设已对各AI模型实现了这个参数 specific_max_history = getattr(ctx, 'specific_max_history', None) + override_prompt = fallback_prompt + persona_text = getattr(ctx, 'persona', None) + if persona_text and getattr(ctx, 'robot', None): + try: + override_prompt = ctx.robot._build_system_prompt(chat_model, persona_text, override_prompt=fallback_prompt) + except Exception as persona_exc: + if ctx.logger: + ctx.logger.error(f"构建人设系统提示失败: {persona_exc}", exc_info=True) + override_prompt = fallback_prompt + rsp = chat_model.get_answer( question=latest_message_prompt, wxid=ctx.get_receiver(), - system_prompt_override=fallback_prompt, + system_prompt_override=override_prompt, specific_max_history=specific_max_history ) diff --git a/config.yaml.template b/config.yaml.template index e53665b..83c0952 100644 --- a/config.yaml.template +++ b/config.yaml.template @@ -108,14 +108,6 @@ deepseek: # -----deepseek配置这行不填----- show_reasoning: false # 是否在回复中显示思维过程,仅在启用思维链功能时有效 max_history_messages: 10 # <--- 添加这一行,设置 DeepSeek 最多回顾 10 条历史消息 -gemini: # -----gemini配置----- - api_key: "YOUR_GOOGLE_API_KEY" # 必须 - model_flash: "gemini-1.5-pro-latest" # 快速回复模型(可选) - model_reasoning: "gemini-1.5-pro-latest" # 深度思考模型(可选) - prompt: "你是一个AI助手,请用通俗易懂的语言回答用户的问题。" # 可选 - proxy: "http://127.0.0.1:7890" # 可选, 代理地址 - max_history_messages: 20 # 可选, 对话历史长度 - aliyun_image: # -----如果要使用阿里云文生图,取消下面的注释并填写相关内容,模型到阿里云百炼找通义万相-文生图2.1-Turbo----- enable: true # 是否启用阿里文生图功能,false为关闭,默认开启,如果未配置,则会将消息发送给聊天大模型 api_key: sk-xxxxxxxxxxxxxxxxxxxxxxxx # 替换为你的DashScope API密钥 @@ -126,15 +118,6 @@ aliyun_image: # -----如果要使用阿里云文生图,取消下面的注释 trigger_keyword: 牛阿里 # 触发词,默认为"牛阿里" fallback_to_chat: true # 当服务不可用时是否转发给聊天模型处理 -gemini_image: # -----谷歌AI画图配置这行不填----- - enable: true # 是否启用谷歌AI画图功能 - api_key: # 谷歌Gemini API密钥,必填 - model: gemini-2.0-flash-exp-image-generation # 模型名称,建议保持默认,只有这一个模型可以进行绘画 - temp_dir: ./geminiimg # 图片保存目录,可选 - trigger_keyword: 牛谷歌 # 触发词,默认为"牛谷歌" - fallback_to_chat: false # 未启用时是否回退到聊天模式 - proxy: http://127.0.0.1:7890 # 使用Clash代理,格式为:http://域名或者IP地址:端口号 - perplexity: # -----perplexity配置这行不填----- key: # 填写你的Perplexity API Key api: https://api.perplexity.ai # API地址 diff --git a/configuration.py b/configuration.py index ca242df..71c4624 100644 --- a/configuration.py +++ b/configuration.py @@ -89,8 +89,6 @@ class Config(object): self.DEEPSEEK = yconfig.get("deepseek", {}) self.PERPLEXITY = yconfig.get("perplexity", {}) self.ALIYUN_IMAGE = yconfig.get("aliyun_image", {}) - self.GEMINI_IMAGE = yconfig.get("gemini_image", {}) - self.GEMINI = yconfig.get("gemini", {}) self.AI_ROUTER = yconfig.get("ai_router", {"enable": True, "allowed_groups": []}) self.AUTO_ACCEPT_FRIEND_REQUEST = yconfig.get("auto_accept_friend_request", False) self.MAX_HISTORY = yconfig.get("MAX_HISTORY", 300) diff --git a/constants.py b/constants.py index f34e8aa..10f7b3c 100644 --- a/constants.py +++ b/constants.py @@ -6,15 +6,13 @@ class ChatType(IntEnum): # UnKnown = 0 # 未知, 即未设置 CHATGPT = 1 # ChatGPT DEEPSEEK = 2 # DeepSeek - GEMINI = 3 # Gemini PERPLEXITY = 4 # Perplexity @staticmethod def is_in_chat_types(chat_type: int) -> bool: if chat_type in [ChatType.CHATGPT.value, ChatType.DEEPSEEK.value, - ChatType.PERPLEXITY.value, - ChatType.GEMINI.value]: + ChatType.PERPLEXITY.value]: return True return False diff --git a/function/func_persona.py b/function/func_persona.py new file mode 100644 index 0000000..0a04b3a --- /dev/null +++ b/function/func_persona.py @@ -0,0 +1,115 @@ +import logging +import os +import sqlite3 +from datetime import datetime +from typing import Optional + + +class PersonaManager: + """Manage persona profiles per chat session.""" + + def __init__(self, db_path: str = "data/message_history.db") -> None: + self.LOG = logging.getLogger("PersonaManager") + self.db_path = db_path + self.conn: Optional[sqlite3.Connection] = None + self.cursor: Optional[sqlite3.Cursor] = None + self._connect() + self._prepare_table() + + def _connect(self) -> None: + try: + db_dir = os.path.dirname(self.db_path) + if db_dir and not os.path.exists(db_dir): + os.makedirs(db_dir, exist_ok=True) + self.LOG.info(f"Created persona database directory: {db_dir}") + + self.conn = sqlite3.connect(self.db_path, check_same_thread=False) + self.cursor = self.conn.cursor() + self.LOG.info(f"PersonaManager connected to database: {self.db_path}") + except sqlite3.Error as exc: + self.LOG.error(f"Failed to connect persona database: {exc}") + raise + + def _prepare_table(self) -> None: + assert self.cursor is not None + try: + self.cursor.execute( + """ + CREATE TABLE IF NOT EXISTS personas ( + chat_id TEXT PRIMARY KEY, + persona TEXT NOT NULL, + setter_wxid TEXT, + updated_at TEXT NOT NULL + ) + """ + ) + self.conn.commit() + except sqlite3.Error as exc: + self.LOG.error(f"Failed to ensure personas table exists: {exc}") + raise + + def set_persona(self, chat_id: str, persona: str, setter_wxid: Optional[str] = None) -> None: + if not chat_id: + raise ValueError("chat_id must not be empty when setting persona") + if persona is None: + raise ValueError("persona must not be None when setting persona") + + persona = persona.strip() + assert self.cursor is not None + + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + self.cursor.execute( + """ + INSERT INTO personas (chat_id, persona, setter_wxid, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(chat_id) DO UPDATE SET + persona=excluded.persona, + setter_wxid=excluded.setter_wxid, + updated_at=excluded.updated_at + """, + (chat_id, persona, setter_wxid, timestamp), + ) + self.conn.commit() + self.LOG.info(f"Persona updated for chat_id={chat_id}") + except sqlite3.Error as exc: + self.conn.rollback() + self.LOG.error(f"Failed to set persona for {chat_id}: {exc}") + raise + + def clear_persona(self, chat_id: str) -> bool: + if not chat_id: + return False + assert self.cursor is not None + try: + self.cursor.execute("DELETE FROM personas WHERE chat_id = ?", (chat_id,)) + deleted = self.cursor.rowcount + self.conn.commit() + if deleted: + self.LOG.info(f"Persona cleared for chat_id={chat_id}") + return bool(deleted) + except sqlite3.Error as exc: + self.conn.rollback() + self.LOG.error(f"Failed to clear persona for {chat_id}: {exc}") + return False + + def get_persona(self, chat_id: str) -> Optional[str]: + if not chat_id: + return None + assert self.cursor is not None + try: + self.cursor.execute("SELECT persona FROM personas WHERE chat_id = ?", (chat_id,)) + row = self.cursor.fetchone() + return row[0] if row else None + except sqlite3.Error as exc: + self.LOG.error(f"Failed to fetch persona for {chat_id}: {exc}") + return None + + def close(self) -> None: + if self.conn: + try: + self.conn.commit() + self.conn.close() + self.LOG.info("PersonaManager database connection closed") + except sqlite3.Error as exc: + self.LOG.error(f"Failed to close persona database connection: {exc}") diff --git a/image/__init__.py b/image/__init__.py index 185416e..87f3422 100644 --- a/image/__init__.py +++ b/image/__init__.py @@ -2,10 +2,8 @@ 包含以下功能: - AliyunImage: 阿里云文生图 -- GeminiImage: 谷歌Gemini文生图 """ from .img_aliyun_image import AliyunImage -from .img_gemini_image import GeminiImage -__all__ = ['AliyunImage', 'GeminiImage'] \ No newline at end of file +__all__ = ['AliyunImage'] diff --git a/image/img_gemini_image.py b/image/img_gemini_image.py deleted file mode 100644 index 708a2bd..0000000 --- a/image/img_gemini_image.py +++ /dev/null @@ -1,113 +0,0 @@ -#! /usr/bin/env python3 -# -*- coding: utf-8 -*- - -import logging -import os -import mimetypes -import time -import random -from google import genai -from google.genai import types - -class GeminiImage: - """谷歌AI画图API调用 - """ - - def __init__(self, config={}) -> None: - self.LOG = logging.getLogger("GeminiImage") - - self.enable = config.get("enable", True) - self.api_key = config.get("api_key", "") or os.environ.get("GEMINI_API_KEY", "") - self.model = config.get("model", "gemini-2.0-flash-exp-image-generation") - self.proxy = config.get("proxy", "") - - project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - self.temp_dir = config.get("temp_dir", os.path.join(project_dir, "geminiimg")) - - if not os.path.exists(self.temp_dir): - os.makedirs(self.temp_dir) - - if not self.api_key: - self.enable = False - return - - try: - # 设置代理 - if self.proxy: - os.environ["HTTP_PROXY"] = self.proxy - os.environ["HTTPS_PROXY"] = self.proxy - - # 初始化客户端 - self.client = genai.Client(api_key=self.api_key) - except Exception: - self.enable = False - - def generate_image(self, prompt: str) -> str: - """生成图像并返回图像文件路径 - """ - try: - # 设置代理 - if self.proxy: - os.environ["HTTP_PROXY"] = self.proxy - os.environ["HTTPS_PROXY"] = self.proxy - - image_prompt = f"生成一张高质量的图片: {prompt}。请直接提供图像,不需要描述。" - - # 发送请求 - response = self.client.models.generate_content( - model=self.model, - contents=image_prompt, - config=types.GenerateContentConfig( - response_modalities=['Text', 'Image'] - ) - ) - - # 处理响应 - if hasattr(response, 'candidates') and response.candidates: - for candidate in response.candidates: - if hasattr(candidate, 'content') and candidate.content: - for part in candidate.content.parts: - if hasattr(part, 'inline_data') and part.inline_data: - # 保存图像 - file_name = f"gemini_image_{int(time.time())}_{random.randint(1000, 9999)}" - file_extension = mimetypes.guess_extension(part.inline_data.mime_type) or ".png" - file_path = os.path.join(self.temp_dir, f"{file_name}{file_extension}") - - with open(file_path, "wb") as f: - f.write(part.inline_data.data) - - return file_path - - # 如果没有找到图像,尝试获取文本响应 - try: - text_content = response.text - if text_content: - return f"模型未能生成图像: {text_content[:100]}..." - except (AttributeError, TypeError): - pass - - return "图像生成失败,可能需要更新模型或调整提示词" - - except Exception as e: - error_str = str(e) - self.LOG.error(f"图像生成出错: {error_str}") - - # 处理500错误 - if "500 INTERNAL" in error_str: - self.LOG.error("遇到谷歌服务器内部错误") - return "谷歌AI服务器临时故障,请稍后再试。这是谷歌服务器的问题,不是你的请求有误。" - - if "timeout" in error_str.lower(): - return "图像生成超时,请检查网络或代理设置" - - if "violated" in error_str.lower() or "policy" in error_str.lower(): - return "请求包含违规内容,无法生成图像" - - # 其他常见错误类型处理 - if "quota" in error_str.lower() or "rate" in error_str.lower(): - return "API使用配额已用尽或请求频率过高,请稍后再试" - - if "authentication" in error_str.lower() or "auth" in error_str.lower(): - return "API密钥验证失败,请联系管理员检查配置" - - return f"图像生成失败,错误原因: {error_str.split('.')[-1] if '.' in error_str else error_str}" \ No newline at end of file diff --git a/image/img_manager.py b/image/img_manager.py index b6a45af..fa8d527 100644 --- a/image/img_manager.py +++ b/image/img_manager.py @@ -5,7 +5,7 @@ import shutil import time from wcferry import Wcf from configuration import Config -from image import AliyunImage, GeminiImage +from image import AliyunImage class ImageGenerationManager: @@ -30,22 +30,9 @@ class ImageGenerationManager: # 初始化图像生成服务 self.aliyun_image = None - self.gemini_image = None self.LOG.info("开始初始化图像生成服务...") - # 初始化Gemini图像生成服务 - try: - if hasattr(self.config, 'GEMINI_IMAGE'): - self.gemini_image = GeminiImage(self.config.GEMINI_IMAGE) - else: - self.gemini_image = GeminiImage({}) - - if getattr(self.gemini_image, 'enable', False): - self.LOG.info("谷歌Gemini图像生成功能已启用") - except Exception as e: - self.LOG.error(f"初始化谷歌Gemini图像生成服务失败: {e}") - # 初始化AliyunImage服务 if hasattr(self.config, 'ALIYUN_IMAGE') and self.config.ALIYUN_IMAGE.get('enable', False): try: @@ -56,7 +43,7 @@ class ImageGenerationManager: def handle_image_generation(self, service_type, prompt, receiver, at_user=None): """处理图像生成请求的通用函数 - :param service_type: 服务类型,'aliyun'/'gemini' + :param service_type: 服务类型,目前仅支持 'aliyun' :param prompt: 图像生成提示词 :param receiver: 接收者ID :param at_user: 被@的用户ID,用于群聊 @@ -78,13 +65,6 @@ class ImageGenerationManager: wait_message = "当前模型为阿里V1模型,生成速度非常慢,可能需要等待较长时间,请耐心等候..." else: wait_message = "正在生成图像,请稍等..." - elif service_type == 'gemini': - if not self.gemini_image or not getattr(self.gemini_image, 'enable', False): - self.send_text("谷歌文生图服务未启用", receiver, at_user) - return True - - service = self.gemini_image - wait_message = "正在通过谷歌AI生成图像,请稍等..." else: self.LOG.error(f"未知的图像生成服务类型: {service_type}") return False @@ -98,7 +78,7 @@ class ImageGenerationManager: try: self.LOG.info(f"开始处理图片: {image_url}") # 谷歌API直接返回本地文件路径,无需下载 - image_path = image_url if service_type == 'gemini' else service.download_image(image_url) + image_path = service.download_image(image_url) if image_path: # 创建一个临时副本,避免文件占用问题 @@ -167,4 +147,4 @@ class ImageGenerationManager: else: self.LOG.error(f"无法删除文件 {file_path} 经过 {max_retries} 次尝试: {str(e)}") - return False \ No newline at end of file + return False diff --git a/image/文生图功能的使用说明.MD b/image/文生图功能的使用说明.MD index 3ef30cf..8f28e14 100644 --- a/image/文生图功能的使用说明.MD +++ b/image/文生图功能的使用说明.MD @@ -7,8 +7,6 @@ 阿里云AI绘画:[JiQingzhe2004 (JiQingzhe)](https://github.com/JiQingzhe2004) -谷歌AI绘画:[JiQingzhe2004 (JiQingzhe)](https://github.com/JiQingzhe2004) - ------ 在`config.yaml`中进行以下配置才可以调用: @@ -25,23 +23,8 @@ aliyun_image: # -----如果要使用阿里云文生图,取消下面的注释 trigger_keyword: 牛阿里 # 触发词,默认为"牛阿里" fallback_to_chat: true # 当未启用绘画功能时:true=将请求发给聊天模型处理,false=回复固定的未启用提示信息 -gemini_image: # -----谷歌AI画图配置这行不填----- - enable: true # 是否启用谷歌AI画图功能 - api_key: your-api-key-here # 谷歌Gemini API密钥,必填 - model: gemini-2.0-flash-exp-image-generation # 模型名称,建议保持默认,只有这一个模型可以进行绘画 - temp_dir: ./geminiimg # 图片保存目录,可选 - trigger_keyword: 牛谷歌 # 触发词,默认为"牛谷歌" - fallback_to_chat: false # 当未启用绘画功能时:true=将请求发给聊天模型处理,false=回复固定的未启用提示信息 ``` -## 如何获取API密钥 - -1. 访问 [Google AI Studio](https://aistudio.google.com/) -2. 创建一个账号或登录 -3. 访问 [API Keys](https://aistudio.google.com/app/apikeys) 页面 -4. 创建一个新的API密钥 -5. 复制API密钥并填入配置文件 - ## 使用方法 直接发送消息或在群聊中@机器人,使用触发词加提示词,例如: @@ -50,7 +33,6 @@ gemini_image: # -----谷歌AI画图配置这行不填----- ``` 牛智谱 一只可爱的猫咪在阳光下玩耍 牛阿里 一只可爱的猫咪在阳光下玩耍 -牛谷歌 一只可爱的猫咪在阳光下玩耍 ``` ## 群组的使用方法 ``` diff --git a/robot.py b/robot.py index 3f84e9c..a3927ac 100644 --- a/robot.py +++ b/robot.py @@ -10,19 +10,18 @@ import os import random import shutil import copy -from image import AliyunImage, GeminiImage from image.img_manager import ImageGenerationManager from wcferry import Wcf, WxMsg from ai_providers.ai_chatgpt import ChatGPT from ai_providers.ai_deepseek import DeepSeek -from ai_providers.ai_gemini import Gemini from ai_providers.ai_perplexity import Perplexity from function.func_weather import Weather from function.func_news import News from function.func_summary import MessageSummary # 导入新的MessageSummary类 from function.func_reminder import ReminderManager # 导入ReminderManager类 +from function.func_persona import PersonaManager # 导入PersonaManager用于人设存储 from configuration import Config from constants import ChatType from job_mgmt import Job @@ -155,31 +154,6 @@ class Robot(Job): except Exception as e: self.LOG.error(f"初始化 DeepSeek 模型时出错: {str(e)}") - # 初始化Gemini - if Gemini.value_check(self.config.GEMINI): - try: - gemini_flash_conf = copy.deepcopy(self.config.GEMINI) - flash_model_name = gemini_flash_conf.get("model_flash", Gemini.DEFAULT_MODEL) - gemini_flash_conf["model_name"] = flash_model_name - self.chat_models[ChatType.GEMINI.value] = Gemini( - gemini_flash_conf, - message_summary_instance=self.message_summary, - bot_wxid=self.wxid - ) - self.LOG.info(f"已加载 Gemini 模型: {flash_model_name}") - - reasoning_model_name = self.config.GEMINI.get("model_reasoning") - if reasoning_model_name and reasoning_model_name != flash_model_name: - gemini_reason_conf = copy.deepcopy(self.config.GEMINI) - gemini_reason_conf["model_name"] = reasoning_model_name - self.reasoning_chat_models[ChatType.GEMINI.value] = Gemini( - gemini_reason_conf, - message_summary_instance=self.message_summary, - bot_wxid=self.wxid - ) - self.LOG.info(f"已加载 Gemini 推理模型: {reasoning_model_name}") - except Exception as e: - self.LOG.error(f"初始化 Gemini 模型时出错: {str(e)}") # 初始化Perplexity if Perplexity.value_check(self.config.PERPLEXITY): @@ -266,6 +240,15 @@ class Robot(Job): except Exception as e: self.LOG.error(f"初始化提醒管理器失败: {e}", exc_info=True) + # 初始化人设管理器 + persona_db_path = getattr(self.message_summary, 'db_path', "data/message_history.db") if getattr(self, 'message_summary', None) else "data/message_history.db" + try: + self.persona_manager = PersonaManager(persona_db_path) + self.LOG.info("人设管理器已初始化。") + except Exception as e: + self.LOG.error(f"初始化人设管理器失败: {e}", exc_info=True) + self.persona_manager = None + @staticmethod def value_check(args: dict) -> bool: if args: @@ -293,12 +276,23 @@ class Robot(Job): # 确保context能访问到当前选定的chat模型及特定历史限制 setattr(ctx, 'chat', self.chat) setattr(ctx, 'specific_max_history', specific_limit) + persona_text = None + if getattr(self, 'persona_manager', None): + try: + persona_text = self.persona_manager.get_persona(ctx.get_receiver()) + except Exception as persona_error: + self.LOG.error(f"获取会话人设失败: {persona_error}", exc_info=True) + persona_text = None + setattr(ctx, 'persona', persona_text) ctx.reasoning_requested = bool( ctx.text and "想想" in ctx.text and (not ctx.is_group or ctx.is_at_bot) ) + if self._handle_persona_command(ctx): + return + if ctx.reasoning_requested: self.LOG.info("检测到推理模式触发词,跳过AI路由,直接进入闲聊推理模式。") self._handle_chitchat(ctx, None) @@ -581,6 +575,12 @@ class Robot(Job): if hasattr(self, 'message_summary') and self.message_summary: self.LOG.info("正在关闭消息历史数据库...") self.message_summary.close_db() + if hasattr(self, 'persona_manager') and self.persona_manager: + self.LOG.info("正在关闭人设数据库连接...") + try: + self.persona_manager.close() + except Exception as e: + self.LOG.error(f"关闭人设数据库时出错: {e}") self.LOG.info("机器人资源清理完成") @@ -609,6 +609,126 @@ class Robot(Job): return None + def _handle_persona_command(self, ctx: MessageContext) -> bool: + """处理 /set 人设命令""" + text = (ctx.text or "").strip() + if not text or not text.startswith("/"): + return False + + parts = text.split(None, 1) + command = parts[0].lower() + payload = parts[1] if len(parts) > 1 else "" + + at_list = ctx.msg.sender if ctx.is_group else "" + scope_label = "本群" if ctx.is_group else "当前会话" + + if command == "/persona": + if not getattr(self, "persona_manager", None): + ctx.send_text("❌ 人设功能暂不可用。", at_list) + return True + + persona_text = getattr(ctx, "persona", None) + if persona_text is None: + try: + persona_text = self.persona_manager.get_persona(ctx.get_receiver()) + setattr(ctx, "persona", persona_text) + except Exception as exc: + self.LOG.error(f"查询人设失败: {exc}", exc_info=True) + persona_text = None + + if persona_text: + ctx.send_text(f"{scope_label}当前的人设是:\n## 角色\n{persona_text}", at_list) + else: + ctx.send_text(f"{scope_label}当前没有设置人设,可发送“/set 你的人设描述”来设定。", at_list) + return True + + if command != "/set": + return False + + if not getattr(self, "persona_manager", None): + ctx.send_text("❌ 人设功能暂不可用。", at_list) + return True + + persona_body = payload.strip() + chat_id = ctx.get_receiver() + + if not persona_body: + current = getattr(ctx, "persona", None) + if current: + ctx.send_text( + f"{scope_label}当前的人设是:\n{current}\n发送“/set clear”可以清空,或重新发送“/set + 新人设”进行更新。\n也可以使用“/persona”随时查看当前人设。", + at_list + ) + else: + ctx.send_text("请在 /set 后输入人设描述,例如:/set 你是一个幽默的机器人助手。", at_list) + return True + + if persona_body.lower() in {"clear", "reset"}: + cleared = self.persona_manager.clear_persona(chat_id) + setattr(ctx, "persona", None) + if cleared: + ctx.send_text(f"✅ 已清空{scope_label}的人设。", at_list) + else: + ctx.send_text(f"{scope_label}当前没有设置人设。", at_list) + return True + + try: + self.persona_manager.set_persona(chat_id, persona_body, setter_wxid=ctx.msg.sender) + setattr(ctx, "persona", persona_body) + preview = persona_body if len(persona_body) <= 120 else persona_body[:120] + "..." + ctx.send_text( + f"✅ {scope_label}人设设定成功:\n## 角色\n{preview}" + f"{'' if len(preview) == len(persona_body) else '...'}\n如需查看完整内容,可发送“/persona”。", + at_list + ) + except Exception as exc: + self.LOG.error(f"设置人设失败: {exc}", exc_info=True) + ctx.send_text("❌ 设置人设时遇到问题,请稍后再试。", at_list) + return True + + @staticmethod + def _get_model_base_prompt(chat_model): + """获取模型默认的系统提示""" + if not chat_model: + return None + + system_msg = getattr(chat_model, "system_content_msg", None) + if isinstance(system_msg, dict): + prompt = system_msg.get("content") + if prompt: + return prompt + + if hasattr(chat_model, "_base_prompt"): + prompt = getattr(chat_model, "_base_prompt") + if prompt: + return prompt + + if hasattr(chat_model, "prompt"): + prompt = getattr(chat_model, "prompt") + if prompt: + return prompt + + return None + + @staticmethod + def _merge_prompt_with_persona(prompt, persona): + """将人设信息附加到系统提示后""" + persona = (persona or "").strip() + prompt = (prompt or "").strip() if prompt else "" + + if persona: + persona_section = f"## 角色\n{persona}" + if prompt: + return f"{persona_section}\n\n{prompt}" + return persona_section + + return prompt or None + + def _build_system_prompt(self, chat_model, persona=None, override_prompt=None): + """生成包含人设的系统提示""" + base_prompt = override_prompt if override_prompt is not None else self._get_model_base_prompt(chat_model) + return self._merge_prompt_with_persona(base_prompt, persona) + def _get_reasoning_chat_model(self): """获取当前聊天模型对应的推理模型实例""" model_id = getattr(self, 'current_model_id', None) @@ -668,7 +788,6 @@ class Robot(Job): mapping = { ChatType.CHATGPT.value: getattr(self.config, 'CHATGPT', None), ChatType.DEEPSEEK.value: getattr(self.config, 'DEEPSEEK', None), - ChatType.GEMINI.value: getattr(self.config, 'GEMINI', None), ChatType.PERPLEXITY.value: getattr(self.config, 'PERPLEXITY', None), } return mapping.get(model_id)