diff --git a/commands/handlers.py b/commands/handlers.py index 49058b1..97ac8db 100644 --- a/commands/handlers.py +++ b/commands/handlers.py @@ -4,6 +4,8 @@ import json # 确保已导入json from datetime import datetime # 确保已导入datetime import os # 导入os模块用于文件路径操作 +from function.func_persona import build_persona_system_prompt + # 前向引用避免循环导入 from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -440,11 +442,11 @@ def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool: tools = [history_lookup_tool] tool_handler = handle_tool_call - system_prompt_override = None persona_text = getattr(ctx, 'persona', None) - if persona_text and getattr(ctx, 'robot', None): + system_prompt_override = None + if persona_text: try: - system_prompt_override = ctx.robot._build_system_prompt(chat_model, persona_text) + system_prompt_override = build_persona_system_prompt(chat_model, persona_text) except Exception as persona_exc: if ctx.logger: ctx.logger.error(f"构建人设系统提示失败: {persona_exc}", exc_info=True) @@ -575,9 +577,13 @@ def handle_perplexity_ask(ctx: 'MessageContext', match: Optional[Match]) -> bool specific_max_history = getattr(ctx, 'specific_max_history', None) override_prompt = fallback_prompt persona_text = getattr(ctx, 'persona', None) - if persona_text and getattr(ctx, 'robot', None): + if persona_text: try: - override_prompt = ctx.robot._build_system_prompt(chat_model, persona_text, override_prompt=fallback_prompt) + override_prompt = build_persona_system_prompt( + chat_model, + persona_text, + override_prompt=fallback_prompt + ) except Exception as persona_exc: if ctx.logger: ctx.logger.error(f"构建人设系统提示失败: {persona_exc}", exc_info=True) diff --git a/function/func_persona.py b/function/func_persona.py index 0a04b3a..d71061b 100644 --- a/function/func_persona.py +++ b/function/func_persona.py @@ -2,7 +2,15 @@ import logging import os import sqlite3 from datetime import datetime -from typing import Optional +from typing import Optional, Tuple + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover + from commands.context import MessageContext + from robot import Robot + +PERSONA_PREFIX = "## 角色\n" class PersonaManager: @@ -113,3 +121,144 @@ class PersonaManager: self.LOG.info("PersonaManager database connection closed") except sqlite3.Error as exc: self.LOG.error(f"Failed to close persona database connection: {exc}") + + +def fetch_persona_for_context(robot: "Robot", ctx: "MessageContext") -> Optional[str]: + """Return persona text for the context receiver.""" + manager = getattr(robot, "persona_manager", None) + if not manager: + return None + try: + persona_text = manager.get_persona(ctx.get_receiver()) + if persona_text: + persona_text = persona_text.strip() + return persona_text or None + except Exception as exc: + robot.LOG.error(f"获取会话人设失败: {exc}", exc_info=True) + return None + + +def handle_persona_command(robot: "Robot", ctx: "MessageContext") -> bool: + """Process /set and /persona commands.""" + text = (ctx.text or "").strip() + if not text or not text.startswith("/"): + return False + + parts = text.split(None, 1) + command = parts[0].lower() + payload = parts[1] if len(parts) > 1 else "" + + at_list = ctx.msg.sender if ctx.is_group else "" + scope_label = "本群" if ctx.is_group else "当前会话" + + manager = getattr(robot, "persona_manager", None) + if command in {"/persona", "/set"} and not manager: + ctx.send_text("❌ 人设功能暂不可用。", at_list) + return True + + if command == "/persona": + persona_text = getattr(ctx, "persona", None) + if persona_text is None and manager: + try: + persona_text = manager.get_persona(ctx.get_receiver()) + if persona_text: + persona_text = persona_text.strip() + setattr(ctx, "persona", persona_text) + except Exception as exc: + robot.LOG.error(f"查询人设失败: {exc}", exc_info=True) + persona_text = None + + if persona_text: + ctx.send_text(f"{scope_label}当前的人设是:\n{PERSONA_PREFIX}{persona_text}", at_list) + else: + ctx.send_text(f"{scope_label}当前没有设置人设,可发送“/set 你的人设描述”来设定。", at_list) + return True + + if command != "/set": + return False + + persona_body = payload.strip() + chat_id = ctx.get_receiver() + + if not persona_body: + current = getattr(ctx, "persona", None) + if current: + ctx.send_text( + f"{scope_label}当前的人设是:\n{PERSONA_PREFIX}{current}\n发送“/set clear”可以清空,或重新发送“/set + 新人设”进行更新。\n也可以使用“/persona”随时查看当前人设。", + at_list + ) + else: + ctx.send_text("请在 /set 后输入人设描述,例如:/set 你是一个幽默的机器人助手。", at_list) + return True + + if persona_body.lower() in {"clear", "reset"}: + cleared = manager.clear_persona(chat_id) + setattr(ctx, "persona", None) + if cleared: + ctx.send_text(f"✅ 已清空{scope_label}的人设。", at_list) + else: + ctx.send_text(f"{scope_label}当前没有设置人设。", at_list) + return True + + try: + manager.set_persona(chat_id, persona_body, setter_wxid=ctx.msg.sender) + persona_body = persona_body.strip() + setattr(ctx, "persona", persona_body) + preview, truncated = _build_preview(persona_body) + ellipsis = "..." if truncated else "" + ctx.send_text( + f"✅ {scope_label}人设设定成功:\n{PERSONA_PREFIX}{preview}{ellipsis}\n如需查看完整内容,可发送“/persona”。", + at_list + ) + except Exception as exc: + robot.LOG.error(f"设置人设失败: {exc}", exc_info=True) + ctx.send_text("❌ 设置人设时遇到问题,请稍后再试。", at_list) + return True + + +def _build_preview(persona: str, limit: int = 120) -> Tuple[str, bool]: + if len(persona) <= limit: + return persona, False + return persona[:limit], True + + +def build_persona_system_prompt(chat_model, persona: Optional[str] = None, override_prompt: Optional[str] = None) -> Optional[str]: + """Merge persona section with existing system prompt.""" + base_prompt = override_prompt if override_prompt is not None else _get_model_base_prompt(chat_model) + return _merge_prompt_with_persona(base_prompt, persona) + + +def _get_model_base_prompt(chat_model) -> Optional[str]: + if not chat_model: + return None + + system_msg = getattr(chat_model, "system_content_msg", None) + if isinstance(system_msg, dict): + prompt = system_msg.get("content") + if prompt: + return prompt + + if hasattr(chat_model, "_base_prompt"): + prompt = getattr(chat_model, "_base_prompt") + if prompt: + return prompt + + if hasattr(chat_model, "prompt"): + prompt = getattr(chat_model, "prompt") + if prompt: + return prompt + + return None + + +def _merge_prompt_with_persona(prompt: Optional[str], persona: Optional[str]) -> Optional[str]: + persona = (persona or "").strip() + prompt = (prompt or "").strip() if prompt else "" + + if persona: + persona_section = f"{PERSONA_PREFIX}{persona}" + if prompt: + return f"{persona_section}\n\n{prompt}" + return persona_section + + return prompt or None diff --git a/robot.py b/robot.py index d5fb0e1..1ca21c9 100644 --- a/robot.py +++ b/robot.py @@ -21,7 +21,11 @@ from function.func_weather import Weather from function.func_news import News from function.func_summary import MessageSummary # 导入新的MessageSummary类 from function.func_reminder import ReminderManager # 导入ReminderManager类 -from function.func_persona import PersonaManager # 导入PersonaManager用于人设存储 +from function.func_persona import ( + PersonaManager, + fetch_persona_for_context, + handle_persona_command, +) # 导入人设相关工具 from configuration import Config from constants import ChatType from job_mgmt import Job @@ -276,13 +280,7 @@ class Robot(Job): # 确保context能访问到当前选定的chat模型及特定历史限制 setattr(ctx, 'chat', self.chat) setattr(ctx, 'specific_max_history', specific_limit) - persona_text = None - if getattr(self, 'persona_manager', None): - try: - persona_text = self.persona_manager.get_persona(ctx.get_receiver()) - except Exception as persona_error: - self.LOG.error(f"获取会话人设失败: {persona_error}", exc_info=True) - persona_text = None + persona_text = fetch_persona_for_context(self, ctx) setattr(ctx, 'persona', persona_text) ctx.reasoning_requested = bool( ctx.text @@ -290,7 +288,7 @@ class Robot(Job): and (not ctx.is_group or ctx.is_at_bot) ) - if self._handle_persona_command(ctx): + if handle_persona_command(self, ctx): return if ctx.reasoning_requested: @@ -609,126 +607,6 @@ class Robot(Job): return None - def _handle_persona_command(self, ctx: MessageContext) -> bool: - """处理 /set 人设命令""" - text = (ctx.text or "").strip() - if not text or not text.startswith("/"): - return False - - parts = text.split(None, 1) - command = parts[0].lower() - payload = parts[1] if len(parts) > 1 else "" - - at_list = ctx.msg.sender if ctx.is_group else "" - scope_label = "本群" if ctx.is_group else "当前会话" - - if command == "/persona": - if not getattr(self, "persona_manager", None): - ctx.send_text("❌ 人设功能暂不可用。", at_list) - return True - - persona_text = getattr(ctx, "persona", None) - if persona_text is None: - try: - persona_text = self.persona_manager.get_persona(ctx.get_receiver()) - setattr(ctx, "persona", persona_text) - except Exception as exc: - self.LOG.error(f"查询人设失败: {exc}", exc_info=True) - persona_text = None - - if persona_text: - ctx.send_text(f"{scope_label}当前的人设是:\n## 角色\n{persona_text}", at_list) - else: - ctx.send_text(f"{scope_label}当前没有设置人设,可发送“/set 你的人设描述”来设定。", at_list) - return True - - if command != "/set": - return False - - if not getattr(self, "persona_manager", None): - ctx.send_text("❌ 人设功能暂不可用。", at_list) - return True - - persona_body = payload.strip() - chat_id = ctx.get_receiver() - - if not persona_body: - current = getattr(ctx, "persona", None) - if current: - ctx.send_text( - f"{scope_label}当前的人设是:\n{current}\n\n发送“/set clear”可以清空,或重新发送“/set + 新人设”进行更新。\n也可以使用“/persona”随时查看当前人设。", - at_list - ) - else: - ctx.send_text("请在 /set 后输入人设描述,例如:/set 你是一个幽默的机器人助手。", at_list) - return True - - if persona_body.lower() in {"clear", "reset"}: - cleared = self.persona_manager.clear_persona(chat_id) - setattr(ctx, "persona", None) - if cleared: - ctx.send_text(f"✅ 已清空{scope_label}的人设。", at_list) - else: - ctx.send_text(f"{scope_label}当前没有设置人设。", at_list) - return True - - try: - self.persona_manager.set_persona(chat_id, persona_body, setter_wxid=ctx.msg.sender) - setattr(ctx, "persona", persona_body) - preview = persona_body if len(persona_body) <= 120 else persona_body[:120] + "..." - ctx.send_text( - f"✅ {scope_label}人设设定成功:\n## 角色\n{preview}" - f"{'' if len(preview) == len(persona_body) else '...'}\n\n如需查看完整内容,可发送“/persona”\n如需清空人设,可发送“/set clear”", - at_list - ) - except Exception as exc: - self.LOG.error(f"设置人设失败: {exc}", exc_info=True) - ctx.send_text("❌ 设置人设时遇到问题,请稍后再试。", at_list) - return True - - @staticmethod - def _get_model_base_prompt(chat_model): - """获取模型默认的系统提示""" - if not chat_model: - return None - - system_msg = getattr(chat_model, "system_content_msg", None) - if isinstance(system_msg, dict): - prompt = system_msg.get("content") - if prompt: - return prompt - - if hasattr(chat_model, "_base_prompt"): - prompt = getattr(chat_model, "_base_prompt") - if prompt: - return prompt - - if hasattr(chat_model, "prompt"): - prompt = getattr(chat_model, "prompt") - if prompt: - return prompt - - return None - - @staticmethod - def _merge_prompt_with_persona(prompt, persona): - """将人设信息附加到系统提示后""" - persona = (persona or "").strip() - prompt = (prompt or "").strip() if prompt else "" - - if persona: - persona_section = f"## 角色\n{persona}" - if prompt: - return f"{persona_section}\n\n{prompt}" - return persona_section - - return prompt or None - - def _build_system_prompt(self, chat_model, persona=None, override_prompt=None): - """生成包含人设的系统提示""" - base_prompt = override_prompt if override_prompt is not None else self._get_model_base_prompt(chat_model) - return self._merge_prompt_with_persona(base_prompt, persona) - def _get_reasoning_chat_model(self): """获取当前聊天模型对应的推理模型实例""" model_id = getattr(self, 'current_model_id', None)