functioncall

This commit is contained in:
zihanjian
2025-09-24 08:34:34 +08:00
parent a1b3799c0c
commit 3152319811
13 changed files with 620 additions and 2584 deletions

1
.gitignore vendored
View File

@@ -10,5 +10,4 @@ logs/
*.log.*
config.yaml
duel_ranks.json
data/

View File

@@ -56,57 +56,70 @@ class ChatGPT():
return True
return False
def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None) -> str:
# 获取并格式化数据库历史记录
api_messages = []
# 1. 添加系统提示
effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"]
if effective_system_prompt: # 确保有内容才添加
api_messages.append({"role": "system", "content": effective_system_prompt})
# 添加当前时间提示(可选,但原代码有)
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
time_mk = "Current time is: " # 或者其他合适的提示
api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"})
# 2. 获取并格式化历史消息
if self.message_summary and self.bot_wxid:
history = self.message_summary.get_messages(wxid)
# -限制历史消息数量
# 优先使用传入的特定限制,如果没有则使用模型默认限制
limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages
self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}")
if limit_to_use is not None and limit_to_use > 0:
history = history[-limit_to_use:] # 取最新的 N 条
elif limit_to_use == 0: # 如果设置为0则不包含历史
history = []
self.LOG.debug(f"应用限制后历史条数: {len(history)}")
for msg in history:
role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user"
content = msg.get('content', '')
if content: # 避免添加空内容
if role == "user":
sender_name = msg.get('sender', '未知用户') # 获取发送者名字,如果不存在则使用默认值
formatted_content = f"{sender_name}: {content}" # 格式化内容,加入发送者名字
api_messages.append({"role": role, "content": formatted_content})
else: # 如果是助手(机器人自己)的消息,则不加名字
api_messages.append({"role": role, "content": content})
def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None, tools=None, conversation_history=None):
# 标准Function Call模式使用传入的对话历史
if conversation_history:
api_messages = []
# 添加系统提示
effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"]
if effective_system_prompt:
api_messages.append({"role": "system", "content": effective_system_prompt})
# 添加当前时间提示
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
api_messages.append({"role": "system", "content": f"Current time is: {now_time}"})
# 使用传入的对话历史
api_messages.extend(conversation_history)
else:
self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。")
# 传统模式:从数据库获取历史记录
api_messages = []
# 3. 添加当前用户问题
if question: # 确保问题非空
api_messages.append({"role": "user", "content": question})
# 1. 添加系统提示
effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"]
if effective_system_prompt: # 确保有内容才添加
api_messages.append({"role": "system", "content": effective_system_prompt})
# 添加当前时间提示(可选,但原代码有)
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
time_mk = "Current time is: " # 或者其他合适的提示
api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"})
# 2. 获取并格式化历史消息
if self.message_summary and self.bot_wxid:
history = self.message_summary.get_messages(wxid)
# -限制历史消息数量
# 优先使用传入的特定限制,如果没有则使用模型默认限制
limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages
self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}")
if limit_to_use is not None and limit_to_use > 0:
history = history[-limit_to_use:] # 取最新的 N 条
elif limit_to_use == 0: # 如果设置为0则不包含历史
history = []
self.LOG.debug(f"应用限制后历史条数: {len(history)}")
for msg in history:
role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user"
content = msg.get('content', '')
if content: # 避免添加空内容
if role == "user":
sender_name = msg.get('sender', '未知用户') # 获取发送者名字,如果不存在则使用默认值
formatted_content = f"{sender_name}: {content}" # 格式化内容,加入发送者名字
api_messages.append({"role": role, "content": formatted_content})
else: # 如果是助手(机器人自己)的消息,则不加名字
api_messages.append({"role": role, "content": content})
else:
self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。")
# 3. 添加当前用户问题
if question: # 确保问题非空
api_messages.append({"role": "user", "content": question})
rsp = ""
try:
# 使用格式化后的 api_messages
# 使用格式化后的 api_messages
params = {
"model": self.model,
"messages": api_messages # 使用从数据库构建的消息列表
@@ -116,10 +129,32 @@ class ChatGPT():
if not self.model.startswith("o"):
params["temperature"] = 0.2
# 如果提供了tools添加到参数中
if tools:
params["tools"] = tools
params["tool_choice"] = "auto" # 让AI自动决定是否调用function
ret = self.client.chat.completions.create(**params)
rsp = ret.choices[0].message.content
rsp = rsp[2:] if rsp.startswith("\n\n") else rsp
rsp = rsp.replace("\n\n", "\n")
# 检查是否有tool_calls
if tools and ret.choices[0].message.tool_calls:
# 返回tool_calls而不是普通文本
return {
"tool_calls": [
{
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
}
}
for tool_call in ret.choices[0].message.tool_calls
]
}
else:
# 普通文本响应
rsp = ret.choices[0].message.content or ""
rsp = rsp[2:] if rsp.startswith("\n\n") else rsp
rsp = rsp.replace("\n\n", "\n")
except AuthenticationError:
self.LOG.error("OpenAI API 认证失败,请检查 API 密钥是否正确")

View File

@@ -52,65 +52,99 @@ class DeepSeek():
return True
return False
def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None) -> str:
# 获取并格式化数据库历史记录
api_messages = []
def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None, tools=None, conversation_history=None):
# 标准Function Call模式使用传入的对话历史
if conversation_history:
api_messages = []
# 添加系统提示
effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"]
if effective_system_prompt:
api_messages.append({"role": "system", "content": effective_system_prompt})
# 添加当前时间提示
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
api_messages.append({"role": "system", "content": f"Current time is: {now_time}"})
# 使用传入的对话历史
api_messages.extend(conversation_history)
else:
# 传统模式:从数据库获取历史记录
api_messages = []
# 1. 添加系统提示
effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"]
if effective_system_prompt:
api_messages.append({"role": "system", "content": effective_system_prompt})
# 1. 添加系统提示
effective_system_prompt = system_prompt_override if system_prompt_override else self.system_content_msg["content"]
if effective_system_prompt:
api_messages.append({"role": "system", "content": effective_system_prompt})
# 添加当前时间提示 (可选)
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
time_mk = "Current time is: "
api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"})
# 添加当前时间提示 (可选)
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
time_mk = "Current time is: "
api_messages.append({"role": "system", "content": f"{time_mk}{now_time}"})
# 2. 获取并格式化历史消息
if self.message_summary and self.bot_wxid:
history = self.message_summary.get_messages(wxid)
# 2. 获取并格式化历史消息
if self.message_summary and self.bot_wxid:
history = self.message_summary.get_messages(wxid)
# 限制历史消息数量
# 优先使用传入的特定限制,如果没有则使用模型默认限制
limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages
self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}")
if limit_to_use is not None and limit_to_use > 0:
history = history[-limit_to_use:] # 取最新的 N 条
elif limit_to_use == 0: # 如果设置为0则不包含历史
history = []
self.LOG.debug(f"应用限制后历史条数: {len(history)}")
# 限制历史消息数量
# 优先使用传入的特定限制,如果没有则使用模型默认限制
limit_to_use = specific_max_history if specific_max_history is not None else self.max_history_messages
self.LOG.debug(f"获取历史记录 for {wxid}, 原始条数: {len(history)}, 使用限制: {limit_to_use}")
for msg in history:
role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user"
content = msg.get('content', '')
if content:
if role == "user":
sender_name = msg.get('sender', '未知用户') # 获取发送者名字
if limit_to_use is not None and limit_to_use > 0:
history = history[-limit_to_use:] # 取最新的 N 条
elif limit_to_use == 0: # 如果设置为0则不包含历史
history = []
self.LOG.debug(f"应用限制后历史条数: {len(history)}")
for msg in history:
role = "assistant" if msg.get("sender_wxid") == self.bot_wxid else "user"
content = msg.get('content', '')
if content:
if role == "user":
sender_name = msg.get('sender', '未知用户') # 获取发送者名字
formatted_content = f"{sender_name}: {content}" # 格式化内容
api_messages.append({"role": role, "content": formatted_content})
else: # 助手消息
api_messages.append({"role": role, "content": content})
else:
self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。")
else:
self.LOG.warning(f"无法为 wxid={wxid} 获取历史记录,因为 message_summary 或 bot_wxid 未设置。")
# 3. 添加当前用户问题
if question:
api_messages.append({"role": "user", "content": question})
# 3. 添加当前用户问题
if question:
api_messages.append({"role": "user", "content": question})
try:
# 使用格式化后的 api_messages
response = self.client.chat.completions.create(
model=self.model,
messages=api_messages, # 使用构建的消息列表
stream=False
)
final_response = response.choices[0].message.content
# 构建API参数
params = {
"model": self.model,
"messages": api_messages, # 使用构建的消息列表
"stream": False
}
# 如果提供了tools添加到参数中
if tools:
params["tools"] = tools
params["tool_choice"] = "auto" # 让AI自动决定是否调用function
return final_response
response = self.client.chat.completions.create(**params)
# 检查是否有tool_calls
if tools and response.choices[0].message.tool_calls:
# 返回tool_calls而不是普通文本
return {
"tool_calls": [
{
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
}
}
for tool_call in response.choices[0].message.tool_calls
]
}
else:
# 普通文本响应
return response.choices[0].message.content or ""
except (APIConnectionError, APIError, AuthenticationError) as e1:
self.LOG.error(f"DeepSeek API 返回了错误:{str(e1)}")

View File

@@ -206,7 +206,12 @@ class Gemini:
return rsp_text.strip()
def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None) -> str:
def get_answer(self, question: str, wxid: str, system_prompt_override=None, specific_max_history=None, tools=None):
# Function Call支持检查
if tools:
# Gemini暂时不支持function calling返回提示
return "当前Gemini模型暂不支持Function Call功能请使用ChatGPT或DeepSeek模型来使用智能功能。"
if not self._model:
return "Gemini 模型未成功初始化,请检查配置和网络。"

View File

@@ -341,7 +341,7 @@ class Perplexity:
return all(value is not None for key, value in args.items() if key != 'proxy')
return False
def get_answer(self, prompt, session_id=None):
def get_answer(self, prompt, session_id=None, wxid=None, system_prompt_override=None, specific_max_history=None, tools=None):
"""获取Perplexity回答
Args:
@@ -351,6 +351,11 @@ class Perplexity:
Returns:
str: Perplexity的回答
"""
# Function Call支持检查
if tools:
# Perplexity暂时不支持function calling返回提示
return "当前Perplexity模型暂不支持Function Call功能请使用ChatGPT或DeepSeek模型来使用智能功能。"
try:
if not self.api_key or not self.client:
return "Perplexity API key 未配置或客户端初始化失败"

View File

@@ -15,12 +15,20 @@ from .context import MessageContext
@ai_router.register(
name="weather_query",
description="查询城市未来五天的简要天气预报",
examples=["北京天气怎么样", "上海天气"],
params_description="城市名称"
parameters={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "要查询天气的城市名称,如:北京、上海、深圳"
}
},
"required": ["city"]
}
)
def ai_handle_weather(ctx: MessageContext, params: str) -> bool:
def ai_handle_weather(ctx: MessageContext, city: str, **kwargs) -> bool:
"""AI路由的天气查询处理"""
city_name = params.strip()
city_name = city.strip()
if not city_name:
ctx.send_text("🤔 请告诉我你想查询哪个城市的天气")
return True
@@ -66,11 +74,14 @@ def ai_handle_weather(ctx: MessageContext, params: str) -> bool:
# ======== 新闻功能 ========
@ai_router.register(
name="news_query",
description="获取今日",
examples=["看看今天的新闻", "今日要闻"],
params_description="无需参数"
description="获取今日重要新闻和要",
parameters={
"type": "object",
"properties": {},
"required": []
}
)
def ai_handle_news(ctx: MessageContext, params: str) -> bool:
def ai_handle_news(ctx: MessageContext, **kwargs) -> bool:
"""AI路由的新闻查询处理"""
try:
from function.func_news import News
@@ -95,76 +106,259 @@ def ai_handle_news(ctx: MessageContext, params: str) -> bool:
# ======== 提醒功能 ========
@ai_router.register(
name="reminder_set",
description="设置提醒",
examples=["提醒我明天下午3点开会", "每天早上8点提醒我吃早餐"],
params_description="时间和内容"
description="为用户设置一个新的提醒",
parameters={
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "提醒的具体内容和时间明天下午3点开会、每天早上8点吃早餐"
}
},
"required": ["content"]
}
)
def ai_handle_reminder_set(ctx: MessageContext, params: str) -> bool:
def ai_handle_reminder_set(ctx: MessageContext, content: str, **kwargs) -> bool:
"""AI路由的提醒设置处理"""
if not params.strip():
if not content.strip():
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("请告诉我需要提醒什么内容和时间呀~", at_list)
return True
# 调用原有的提醒处理逻辑
from .handlers import handle_reminder
# 临时修改消息内容以适配原有处理器
original_content = ctx.msg.content
ctx.msg.content = f"提醒我{params}"
ctx.msg.content = f"提醒我{content}"
# handle_reminder不使用match参数直接传None
result = handle_reminder(ctx, None)
# 恢复原始内容
ctx.msg.content = original_content
return result
@ai_router.register(
name="reminder_list",
description="查看所有提醒",
examples=["查看我的提醒", "我有哪些提醒"],
params_description="无需参数"
description="查看用户已经设置的所有提醒列表",
parameters={
"type": "object",
"properties": {},
"required": []
}
)
def ai_handle_reminder_list(ctx: MessageContext, params: str) -> bool:
def ai_handle_reminder_list(ctx: MessageContext, **kwargs) -> bool:
"""AI路由的提醒列表查看处理"""
from .handlers import handle_list_reminders
return handle_list_reminders(ctx, None)
@ai_router.register(
name="reminder_delete",
description="删除提醒",
examples=["删除开会的提醒", "取消明天的提醒"],
params_description="提醒描述"
description="删除指定的提醒",
parameters={
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "要删除的提醒的描述或关键词,如:开会、早餐、明天的提醒"
}
},
"required": ["description"]
}
)
def ai_handle_reminder_delete(ctx: MessageContext, params: str) -> bool:
def ai_handle_reminder_delete(ctx: MessageContext, description: str, **kwargs) -> bool:
"""AI路由的提醒删除处理"""
# 调用原有的删除提醒逻辑
from .handlers import handle_delete_reminder
# 临时修改消息内容
original_content = ctx.msg.content
ctx.msg.content = f"删除提醒 {params}"
ctx.msg.content = f"删除提醒 {description}"
# handle_delete_reminder不使用match参数直接传None
result = handle_delete_reminder(ctx, None)
# 恢复原始内容
ctx.msg.content = original_content
return result
# ======== 帮助功能 ========
@ai_router.register(
name="help",
description="显示机器人的帮助信息和可用指令列表",
parameters={
"type": "object",
"properties": {},
"required": []
}
)
def ai_handle_help(ctx: MessageContext, **kwargs) -> bool:
"""AI路由的帮助处理"""
help_text = [
"🤖 泡泡智能助手 🤖",
"",
"🌟 我现在支持自然语言交互!你可以用平常说话的方式和我对话:",
"",
"【天气查询】",
"💬 \"北京今天天气怎么样\"",
"💬 \"上海明天会下雨吗\"",
"💬 \"查一下深圳的天气预报\"",
"",
"【新闻资讯】",
"💬 \"看看今天的新闻\"",
"💬 \"有什么重要新闻吗\"",
"",
"【智能提醒】",
"💬 \"提醒我明天下午3点开会\"",
"💬 \"每天早上8点提醒我吃早餐\"",
"💬 \"查看我的提醒\"",
"💬 \"删掉开会的提醒\"",
"",
"【智能搜索】",
"💬 \"搜索Python最新特性\"",
"💬 \"查一下机器学习教程\"",
"",
"【群聊管理】",
"💬 \"总结一下最近的聊天\" (仅群聊)",
"💬 \"清除聊天历史\" (仅群聊)",
"💬 \"查看我的装备\" (仅群聊)",
"",
"【娱乐功能】",
"💬 \"骂一下@张三\" (仅群聊)",
"",
"✨ 直接用自然语言告诉我你想做什么,我会智能理解你的意图!",
"🔧 在群聊中需要@我才能使用功能哦~"
]
help_message = "\n".join(help_text)
# 发送消息
ctx.send_text(help_message)
return True
# ======== 消息管理功能 ========
@ai_router.register(
name="summary",
description="总结群聊中最近的聊天消息内容",
parameters={
"type": "object",
"properties": {},
"required": []
}
)
def ai_handle_summary(ctx: MessageContext, **kwargs) -> bool:
"""AI路由的消息总结处理"""
if not ctx.is_group:
ctx.send_text("⚠️ 消息总结功能仅支持群聊")
return True
from .handlers import handle_summary
return handle_summary(ctx, None)
@ai_router.register(
name="clear_messages",
description="清除当前群聊的历史消息记录",
parameters={
"type": "object",
"properties": {},
"required": []
}
)
def ai_handle_clear_messages(ctx: MessageContext, **kwargs) -> bool:
"""AI路由的消息历史清除处理"""
if not ctx.is_group:
ctx.send_text("⚠️ 消息历史管理功能仅支持群聊")
return True
from .handlers import handle_clear_messages
return handle_clear_messages(ctx, None)
# ======== 决斗功能 ========
@ai_router.register(
name="check_equipment",
description="查看玩家在决斗游戏中的魔法装备和道具",
parameters={
"type": "object",
"properties": {},
"required": []
}
)
def ai_handle_check_equipment(ctx: MessageContext, **kwargs) -> bool:
"""AI路由的装备查看处理"""
if not ctx.is_group:
ctx.send_text("❌ 装备查看功能只支持群聊")
return True
from .handlers import handle_check_equipment
return handle_check_equipment(ctx, None)
# ======== 娱乐功能 ========
@ai_router.register(
name="insult_user",
description="骂指定的用户(娱乐功能,无恶意)",
parameters={
"type": "object",
"properties": {
"target_user": {
"type": "string",
"description": "要骂的目标用户的名称或昵称"
}
},
"required": ["target_user"]
}
)
def ai_handle_insult(ctx: MessageContext, target_user: str, **kwargs) -> bool:
"""AI路由的骂人处理"""
if not ctx.is_group:
ctx.send_text("❌ 骂人功能只支持群聊")
return True
# 解析参数,提取用户名
user_name = target_user.strip()
if not user_name:
ctx.send_text("🤔 请告诉我要骂谁")
return True
# 移除@符号
user_name = user_name.replace("@", "").strip()
# 创建一个假的match对象因为原始处理器需要
fake_match = type('MockMatch', (), {
'group': lambda self, n: user_name if n == 1 else None
})()
from .handlers import handle_insult
# 临时修改消息内容以适配原有处理器
original_content = ctx.msg.content
ctx.msg.content = f"骂一下@{user_name}"
result = handle_insult(ctx, fake_match)
# 恢复原始内容
ctx.msg.content = original_content
return result
# ======== Perplexity搜索功能 ========
@ai_router.register(
name="perplexity_search",
description="搜索查询资料并深度研究某个专业问题",
examples=["搜索Python最新特性", "查查机器学习教程"],
params_description="搜索内容"
description="使用Perplexity搜索查询资料并深度研究某个专业问题",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要搜索的关键词或问题Python最新特性、机器学习教程"
}
},
"required": ["query"]
}
)
def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool:
def ai_handle_perplexity(ctx: MessageContext, query: str, **kwargs) -> bool:
"""AI路由的Perplexity搜索处理"""
if not params.strip():
if not query.strip():
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("请告诉我你想搜索什么内容", at_list)
return True
@@ -176,7 +370,7 @@ def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool:
return True
# 调用Perplexity处理
content_for_perplexity = f"ask {params}"
content_for_perplexity = f"ask {query}"
chat_id = ctx.get_receiver()
sender_wxid = ctx.msg.sender
room_id = ctx.msg.roomid if ctx.is_group else None
@@ -198,7 +392,7 @@ def ai_handle_perplexity(ctx: MessageContext, params: str) -> bool:
try:
import time
current_time = time.strftime("%H:%M", time.localtime())
q_with_info = f"[{current_time}] {ctx.sender_name}: {params}"
q_with_info = f"[{current_time}] {ctx.sender_name}: {query}"
rsp = chat_model.get_answer(
question=q_with_info,

View File

@@ -9,12 +9,26 @@ logger = logging.getLogger(__name__)
@dataclass
class AIFunction:
"""AI可调用的功能定义"""
"""AI可调用的功能定义 - 最原生实现"""
name: str # 功能唯一标识名
handler: Callable # 处理函数
description: str # 功能描述给AI看的
examples: list[str] = field(default_factory=list) # 示例用法
params_description: str = "" # 参数说明
parameters: dict = field(default_factory=dict) # OpenAI function call参数定义
def to_function_schema(self) -> dict:
"""转换为OpenAI function call schema格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters or {
"type": "object",
"properties": {},
"required": []
}
}
}
class AIRouter:
"""AI智能路由器"""
@@ -23,17 +37,28 @@ class AIRouter:
self.functions: Dict[str, AIFunction] = {}
self.logger = logger
def register(self, name: str, description: str, examples: list[str] = None, params_description: str = ""):
def register(self, name: str, description: str, parameters: dict = None):
"""
装饰器注册一个功能到AI路由器
装饰器注册一个功能到AI路由器 - 最原生实现
Args:
name: 功能名称
description: 功能描述
parameters: OpenAI function call参数定义
@ai_router.register(
name="weather_query",
description="查询指定城市的天气预报",
examples=["北京天气怎么样", "查一下上海的天气", "明天深圳会下雨吗"],
params_description="城市名称"
parameters={
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"}
},
"required": ["city"]
}
)
def handle_weather(ctx: MessageContext, params: str) -> bool:
def handle_weather(ctx: MessageContext, **kwargs) -> bool:
city = kwargs.get('city')
# 实现天气查询逻辑
pass
"""
@@ -42,211 +67,154 @@ class AIRouter:
name=name,
handler=func,
description=description,
examples=examples or [],
params_description=params_description
parameters=parameters or {}
)
self.functions[name] = ai_func
self.logger.info(f"AI路由器注册功能: {name} - {description}")
self.logger.info(f"注册Function Call功能: {name} - {description}")
return func
return decorator
def _build_ai_prompt(self) -> str:
"""构建给AI的系统提示词包含所有可用功能的信息"""
prompt = """你是一个智能路由助手。根据用户的输入判断用户的意图并返回JSON格式的响应。
### 注意:
1. 你需要优先判断自己是否可以直接回答用户的问题,如果你可以直接回答,则返回 "chat",无需返回 "function"
2. 如果用户输入中包含多个功能,请优先匹配最符合用户意图的功能。如果无法判断,则返回 "chat"
3. 优先考虑使用 chat 处理,需要外部资料或其他功能逻辑时,再返回 "function"
### 可用的功能列表:
"""
for name, func in self.functions.items():
prompt += f"\n- {name}: {func.description}"
if func.params_description:
prompt += f"\n 参数: {func.params_description}"
if func.examples:
prompt += f"\n 示例: {', '.join(func.examples[:3])}"
prompt += "\n"
prompt += """
请你分析用户输入严格按照以下格式返回JSON
### 返回格式:
1. 如果用户只是聊天或者不匹配任何功能,返回:
{
"action_type": "chat"
}
2.如果用户需要使用上述功能之一,返回:
{
"action_type": "function",
"function_name": "上述功能列表中的功能名",
"params": "从用户输入中提取的参数"
}
#### 示例:
- 用户输入"北京天气怎么样" -> {"action_type": "function", "function_name": "weather_query", "params": "北京"}
- 用户输入"看看新闻" -> {"action_type": "function", "function_name": "news_query", "params": ""}
- 用户输入"你好" -> {"action_type": "chat"}
- 用户输入"查一下Python教程" -> {"action_type": "function", "function_name": "perplexity_search", "params": "Python教程"}
#### 格式注意事项:
1. action_type 只能是 "function""chat"
2. 只返回JSON无需其他解释
3. function_name 必须完全匹配上述功能列表中的名称
"""
return prompt
def _build_function_tools(self, functions: Dict[str, AIFunction]) -> list:
"""构建function call的tools参数"""
return [func.to_function_schema() for func in functions.values()]
def route(self, ctx: MessageContext) -> Tuple[bool, Optional[Dict[str, Any]]]:
def handle_standard_function_call(self, ctx: MessageContext) -> bool:
"""
AI路由决策
返回: (是否处理成功, AI决策结果)
标准的OpenAI Function Call实现
支持多轮调用、函数结果反馈、AI最终回复
"""
print(f"[AI路由器] route方法被调用")
if not ctx.text:
print("[AI路由器] ctx.text为空返回False")
return False, None
# 获取AI模型
chat_model = getattr(ctx, 'chat', None)
if not chat_model:
chat_model = getattr(ctx.robot, 'chat', None) if ctx.robot else None
if not chat_model:
print("[AI路由器] 无可用的AI模型")
self.logger.error("AI路由器无可用的AI模型")
return False, None
print(f"[AI路由器] 找到AI模型: {type(chat_model)}")
try:
# 构建系统提示词
system_prompt = self._build_ai_prompt()
print(f"[AI路由器] 已构建系统提示词,长度: {len(system_prompt)}")
# 让AI分析用户意图
user_input = f"用户输入:{ctx.text}"
print(f"[AI路由器] 准备调用AI分析意图: {user_input}")
ai_response = chat_model.get_answer(
user_input,
wxid=ctx.get_receiver(),
system_prompt_override=system_prompt
)
print(f"[AI路由器] AI响应: {ai_response}")
# 解析AI返回的JSON
json_match = re.search(r'\{.*\}', ai_response, re.DOTALL)
if not json_match:
self.logger.warning(f"AI路由器无法从AI响应中提取JSON - {ai_response}")
return False, None
decision = json.loads(json_match.group(0))
# 验证决策格式
action_type = decision.get("action_type")
if action_type not in ["chat", "function"]:
self.logger.warning(f"AI路由器未知的action_type - {action_type}")
return False, None
# 如果是功能调用,验证功能名
if action_type == "function":
function_name = decision.get("function_name")
if function_name not in self.functions:
self.logger.warning(f"AI路由器未知的功能名 - {function_name}")
return False, None
self.logger.info(f"AI路由决策: {decision}")
return True, decision
except json.JSONDecodeError as e:
self.logger.error(f"AI路由器解析JSON失败 - {e}")
return False, None
except Exception as e:
self.logger.error(f"AI路由器处理异常 - {e}")
return False, None
def _check_permission(self, ctx: MessageContext) -> bool:
"""
检查是否有权限使用AI路由功能
:param ctx: 消息上下文
:return: 是否有权限
"""
# 检查是否启用AI路由
ai_router_config = getattr(ctx.config, 'AI_ROUTER', {})
if not ai_router_config.get('enable', True):
self.logger.info("AI路由功能已禁用")
return False
# 私聊始终允许
if not ctx.is_group:
return True
# 群聊需要检查白名单
allowed_groups = ai_router_config.get('allowed_groups', [])
current_group = ctx.get_receiver()
if current_group in allowed_groups:
self.logger.info(f"群聊 {current_group} 在AI路由白名单中允许使用")
return True
else:
self.logger.info(f"群聊 {current_group} 不在AI路由白名单中禁止使用")
return False
# 获取AI模型
chat_model = getattr(ctx, 'chat', None) or getattr(ctx.robot, 'chat', None)
if not chat_model:
self.logger.error("无可用的AI模型")
return False
try:
# 构建所有可用函数的tools
tools = self._build_function_tools(self.functions)
specific_max_history = getattr(ctx, 'specific_max_history', None)
# 初始化对话历史
conversation = [{"role": "user", "content": ctx.text}]
# 最多5轮function call防止无限循环
max_iterations = 5
for iteration in range(max_iterations):
self.logger.debug(f"Function Call第{iteration+1}")
# 调用AI模型
response = chat_model.get_answer(
question="", # 使用conversation模式question可以为空
wxid=ctx.get_receiver(),
tools=tools,
specific_max_history=specific_max_history,
conversation_history=conversation # 传递完整对话历史
)
# 如果AI直接回复文本不调用函数
if isinstance(response, str):
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text(response, at_list)
return True
# 如果AI调用函数
if isinstance(response, dict) and 'tool_calls' in response:
tool_calls = response['tool_calls']
# 添加assistant消息到对话历史
conversation.append({
"role": "assistant",
"tool_calls": tool_calls
})
# 执行所有函数调用
for tool_call in tool_calls:
function_name = tool_call['function']['name']
arguments = json.loads(tool_call['function']['arguments'])
self.logger.info(f"执行函数: {function_name}, 参数: {arguments}")
# 执行函数
func = self.functions.get(function_name)
if func:
try:
# 调用函数处理器
success = func.handler(ctx, **arguments)
function_result = "执行成功" if success else "执行失败"
except Exception as e:
self.logger.error(f"函数{function_name}执行错误: {e}")
function_result = f"执行错误: {str(e)}"
else:
function_result = f"函数{function_name}不存在"
# 添加函数结果到对话历史
conversation.append({
"role": "tool",
"tool_call_id": tool_call.get('id', f"call_{function_name}"),
"content": function_result
})
# 继续下一轮让AI基于函数结果继续思考
continue
# 如果响应格式异常,跳出循环
break
# 如果达到最大迭代次数让AI生成最终回复
if iteration == max_iterations - 1:
final_response = chat_model.get_answer(
question="请基于以上函数调用结果,生成最终回复。",
wxid=ctx.get_receiver(),
specific_max_history=specific_max_history,
conversation_history=conversation
)
if isinstance(final_response, str):
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text(final_response, at_list)
return True
return True
except Exception as e:
self.logger.error(f"标准Function Call处理异常: {e}")
return False
def dispatch(self, ctx: MessageContext) -> bool:
"""
执行AI路由分发
返回: 是否成功处理
标准Function Call分发
"""
print(f"[AI路由器] dispatch被调用消息内容: {ctx.text}")
# 检查权限
if not self._check_permission(ctx):
print("[AI路由器] 权限检查失败返回False")
if not ctx.text:
return False
# 获取AI路由决策
success, decision = self.route(ctx)
print(f"[AI路由器] route返回 - success: {success}, decision: {decision}")
if not success or not decision:
print("[AI路由器] route失败或无决策返回False")
return False
action_type = decision.get("action_type")
# 如果是聊天返回False让后续处理器处理
if action_type == "chat":
self.logger.info("AI路由器识别为聊天意图交给聊天处理器")
return False
# 如果是功能调用
if action_type == "function":
function_name = decision.get("function_name")
params = decision.get("params", "")
func = self.functions.get(function_name)
if not func:
self.logger.error(f"AI路由器功能 {function_name} 未找到")
# 调用标准Function Call处理
success = self.handle_standard_function_call(ctx)
if not success:
# 如果Function Call失败回退到聊天模式
return self._handle_chitchat(ctx)
return True
def _handle_chitchat(self, ctx: MessageContext) -> bool:
"""
处理闲聊逻辑 - 最简实现
"""
try:
if not ctx.text:
return False
try:
self.logger.info(f"AI路由器调用功能 {function_name},参数: {params}")
result = func.handler(ctx, params)
return result
except Exception as e:
self.logger.error(f"AI路由器执行功能 {function_name} 出错 - {e}")
return False
return False
# 调用闲聊处理器
from .handlers import handle_chitchat
return handle_chitchat(ctx, None)
except Exception as e:
self.logger.error(f"闲聊处理出错: {e}")
return False
# 创建全局AI路由器实例
ai_router = AIRouter()

View File

@@ -4,88 +4,12 @@ from typing import Optional, Match, Dict, Any
import json # 确保已导入json
from datetime import datetime # 确保已导入datetime
import os # 导入os模块用于文件路径操作
# from function.func_duel import DuelRankSystem
# 前向引用避免循环导入
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .context import MessageContext
def handle_help(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
处理 "帮助" 命令
匹配: info/帮助/指令
"""
help_text = [
"🤖 泡泡的指令列表 🤖",
"",
"【实用工具】",
"- 天气/温度 [城市名]",
"- 天气预报/预报 [城市名]",
"- 新闻",
"- ask [问题]",
"",
"【决斗 & 偷袭】",
"- 决斗@XX",
"- 偷袭@XX",
"- 决斗排行/排行榜",
"- 我的战绩/决斗战绩",
"- 我的装备/查看装备",
"- 改名 [旧名] [新名]",
"",
"【提醒】",
"- 提醒xxxxx一次性、每日、每周",
"- 查看提醒/我的提醒/提醒列表",
"- 删..提醒..",
"",
"【群聊工具】",
"- summary/总结",
"- clearmessages/清除历史",
""
]
help_text = "\n".join(help_text)
# 发送消息
return ctx.send_text(help_text)
def handle_check_equipment(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
处理 "查看装备" 命令
匹配: 我的装备/查看装备
"""
if not ctx.is_group:
ctx.send_text("❌ 装备查看功能只支持群聊")
return True
try:
from function.func_duel import DuelRankSystem
player_name = ctx.sender_name
rank_system = DuelRankSystem(ctx.msg.roomid)
player_data = rank_system.get_player_data(player_name)
if not player_data:
ctx.send_text(f"⚠️ 没有找到 {player_name} 的数据")
return True
items = player_data.get("items", {"elder_wand": 0, "magic_stone": 0, "invisibility_cloak": 0})
result = [
f"🧙‍♂️ {player_name} 的魔法装备:",
f"🪄 老魔杖: {items.get('elder_wand', 0)}",
f"💎 魔法石: {items.get('magic_stone', 0)}",
f"🧥 隐身衣: {items.get('invisibility_cloak', 0)}"
]
ctx.send_text("\n".join(result))
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"查看装备出错: {e}")
ctx.send_text("⚠️ 查看装备失败")
return False
def handle_summary(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
@@ -149,45 +73,6 @@ def handle_clear_messages(ctx: 'MessageContext', match: Optional[Match]) -> bool
ctx.send_text("⚠️ 清除消息历史失败")
return False
def handle_news_request(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
处理 "新闻" 命令
匹配: 新闻
"""
if ctx.logger:
ctx.logger.info(f"收到来自 {ctx.sender_name} (群聊: {ctx.msg.roomid if ctx.is_group else ''}) 的新闻请求")
try:
from function.func_news import News
news_instance = News()
# 调用方法,接收返回的元组(is_today, news_content)
is_today, news_content = news_instance.get_important_news()
receiver = ctx.get_receiver()
sender_for_at = ctx.msg.sender if ctx.is_group else "" # 群聊中@请求者
if is_today:
# 是当天新闻,直接发送
ctx.send_text(f"📰 今日要闻来啦:\n{news_content}", sender_for_at)
else:
# 不是当天新闻或获取失败
if news_content:
# 有内容,说明是旧闻
prompt = " 今日新闻暂未发布,为您找到最近的一条新闻:"
ctx.send_text(f"{prompt}\n{news_content}", sender_for_at)
else:
# 内容为空,说明获取彻底失败
ctx.send_text("❌ 获取新闻失败,请稍后重试或联系管理员。", sender_for_at)
return True # 无论结果如何,命令本身算成功处理
except Exception as e:
if ctx.logger: ctx.logger.error(f"处理新闻请求时出错: {e}")
receiver = ctx.get_receiver()
sender_for_at = ctx.msg.sender if ctx.is_group else ""
ctx.send_text("❌ 获取新闻时发生错误,请稍后重试。", sender_for_at)
return False # 处理失败
def handle_chitchat(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
@@ -409,114 +294,6 @@ def handle_insult(ctx: 'MessageContext', match: Optional[Match]) -> bool:
ctx.send_text("呃,我想骂但出错了...")
return True
def handle_perplexity_ask(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
处理 "ask" 命令,调用 Perplexity AI
匹配: ask [问题内容]
"""
if not match: # 理论上正则匹配成功才会被调用,但加个检查更安全
return False
# 1. 尝试从 Robot 实例获取 Perplexity 实例
perplexity_instance = getattr(ctx.robot, 'perplexity', None)
# 2. 检查 Perplexity 实例是否存在
if not perplexity_instance:
if ctx.logger:
ctx.logger.warning("尝试调用 Perplexity但实例未初始化或未配置。")
ctx.send_text("❌ Perplexity 功能当前不可用或未正确配置。")
return True # 命令已被处理(错误处理也是处理)
# 3. 从匹配结果中提取问题内容
prompt = match.group(1).strip()
if not prompt: # 如果 'ask' 后面没有内容
ctx.send_text("请在 'ask' 后面加上您想问的问题。", ctx.msg.sender if ctx.is_group else None)
return True # 命令已被处理
# 4. 准备调用 Perplexity 实例的 process_message 方法
if ctx.logger:
ctx.logger.info(f"检测到 Perplexity 请求,发送者: {ctx.sender_name}, 问题: {prompt[:50]}...")
# 准备参数并调用 process_message
# 确保无论用户输入有没有空格,都以标准格式"ask 问题"传给process_message
content_for_perplexity = f"ask {prompt}" # 重构包含触发词的内容
chat_id = ctx.get_receiver()
sender_wxid = ctx.msg.sender
room_id = ctx.msg.roomid if ctx.is_group else None
is_group = ctx.is_group
# 5. 调用 process_message 并返回其结果
was_handled, fallback_prompt = perplexity_instance.process_message(
content=content_for_perplexity,
chat_id=chat_id,
sender=sender_wxid,
roomid=room_id,
from_group=is_group,
send_text_func=ctx.send_text
)
# 6. 如果没有被处理且有备选prompt使用默认AI处理
if not was_handled and fallback_prompt:
if ctx.logger:
ctx.logger.info(f"使用备选prompt '{fallback_prompt[:20]}...' 调用默认AI处理")
# 获取当前选定的AI模型
chat_model = None
if hasattr(ctx, 'chat'):
chat_model = ctx.chat
elif ctx.robot and hasattr(ctx.robot, 'chat'):
chat_model = ctx.robot.chat
if chat_model:
# 使用与 handle_chitchat 类似的逻辑但使用备选prompt
try:
# 格式化消息,与 handle_chitchat 保持一致
if ctx.robot and hasattr(ctx.robot, "xml_processor"):
if ctx.is_group:
msg_data = ctx.robot.xml_processor.extract_quoted_message(ctx.msg)
q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, ctx.sender_name)
else:
msg_data = ctx.robot.xml_processor.extract_private_quoted_message(ctx.msg)
q_with_info = ctx.robot.xml_processor.format_message_for_ai(msg_data, ctx.sender_name)
if not q_with_info:
import time
current_time = time.strftime("%H:%M", time.localtime())
q_with_info = f"[{current_time}] {ctx.sender_name}: {prompt or '[空内容]'}"
else:
import time
current_time = time.strftime("%H:%M", time.localtime())
q_with_info = f"[{current_time}] {ctx.sender_name}: {prompt or '[空内容]'}"
if ctx.logger:
ctx.logger.info(f"发送给默认AI的消息内容: {q_with_info}")
# 调用 AI 模型时传入备选 prompt
# 需要调整 get_answer 方法以支持 system_prompt_override 参数
# 这里我们假设已对各AI模型实现了这个参数
specific_max_history = getattr(ctx, 'specific_max_history', None)
rsp = chat_model.get_answer(
question=q_with_info,
wxid=ctx.get_receiver(),
system_prompt_override=fallback_prompt,
specific_max_history=specific_max_history
)
if rsp:
# 发送回复
at_list = ctx.msg.sender if ctx.is_group else ""
ctx.send_text(rsp, at_list)
return True
else:
if ctx.logger:
ctx.logger.error("无法从默认AI获得答案")
except Exception as e:
if ctx.logger:
ctx.logger.error(f"使用备选prompt调用默认AI时出错: {e}")
return was_handled
def handle_reminder(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""处理来自私聊或群聊的 '提醒' 命令,支持批量添加多个提醒"""
@@ -1042,71 +819,3 @@ def handle_delete_reminder(ctx: 'MessageContext', match: Optional[Match]) -> boo
ctx.logger.error(f"handle_delete_reminder AI 部分顶层错误: {e}", exc_info=True)
return True
def handle_weather_forecast(ctx: 'MessageContext', match: Optional[Match]) -> bool:
"""
处理 "天气预报""预报" 命令
匹配: 天气预报 [城市名] 或 预报 [城市名]
"""
if not match:
return False
city_name = match.group(1).strip()
if not city_name:
ctx.send_text("🤔 请告诉我你想查询哪个城市的天气预报,例如:天气预报 北京")
return True
if ctx.logger:
ctx.logger.info(f"天气预报查询指令匹配: 城市={city_name}")
# --- 加载城市代码 ---
city_codes: Dict[str, str] = {}
city_code_path = os.path.join(os.path.dirname(__file__), '..', 'function', 'main_city.json') # 确保路径正确
try:
with open(city_code_path, 'r', encoding='utf-8') as f:
city_codes = json.load(f)
except FileNotFoundError:
if ctx.logger:
ctx.logger.error(f"城市代码文件未找到: {city_code_path}")
ctx.send_text("⚠️ 抱歉,天气功能所需的城市列表文件丢失了。")
return True
except json.JSONDecodeError:
if ctx.logger:
ctx.logger.error(f"无法解析城市代码文件: {city_code_path}")
ctx.send_text("⚠️ 抱歉,天气功能的城市列表文件格式错误。")
return True
except Exception as e:
if ctx.logger:
ctx.logger.error(f"加载城市代码时发生未知错误: {e}", exc_info=True)
ctx.send_text("⚠️ 抱歉,加载城市代码时发生错误。")
return True
# --- 城市代码加载完毕 ---
city_code = city_codes.get(city_name)
if not city_code:
# 尝试模糊匹配 (可选,如果需要)
found = False
for name, code in city_codes.items():
if city_name in name: # 如果输入的名字是城市全名的一部分
city_code = code
city_name = name # 使用找到的完整城市名
if ctx.logger:
ctx.logger.info(f"城市 '{match.group(1).strip()}' 未精确匹配,使用模糊匹配结果: {city_name} ({city_code})")
found = True
break
if not found:
ctx.send_text(f"😕 找不到城市 '{city_name}' 的天气信息,请检查城市名称是否正确。")
return True
# 获取天气信息 (包含预报)
try:
from function.func_weather import Weather
weather_info = Weather(city_code).get_weather(include_forecast=True) # 注意这里传入True
ctx.send_text(weather_info)
except Exception as e:
if ctx.logger:
ctx.logger.error(f"获取城市 {city_name}({city_code}) 天气预报时出错: {e}", exc_info=True)
ctx.send_text(f"😥 获取 {city_name} 天气预报时遇到问题,请稍后再试。")
return True

View File

@@ -1,38 +0,0 @@
import re
from dataclasses import dataclass
from typing import Pattern, Callable, Literal, Optional, Any, Union, Match
# 导入 MessageContext使用前向引用避免循环导入
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .context import MessageContext
@dataclass
class Command:
"""
命令定义类,封装命令的匹配条件和处理函数
"""
name: str # 命令名称,用于日志和调试
pattern: Union[Pattern, Callable[['MessageContext'], Optional[Match]]] # 匹配规则:正则表达式或自定义匹配函数
scope: Literal["group", "private", "both"] # 生效范围: "group"-仅群聊, "private"-仅私聊, "both"-两者都可
handler: Callable[['MessageContext', Optional[Match]], bool] # 处理函数
need_at: bool = False # 在群聊中是否必须@机器人才能触发
priority: int = 100 # 优先级,数字越小越先匹配
description: str = "" # 命令的描述,用于生成帮助信息
def __post_init__(self):
"""验证命令配置的有效性"""
if self.scope not in ["group", "private", "both"]:
raise ValueError(f"无效的作用域: {self.scope},必须是 'group', 'private''both'")
# 检查pattern是否为正则表达式或可调用对象
if not isinstance(self.pattern, (Pattern, Callable)):
# 如果是字符串,尝试转换为正则表达式
if isinstance(self.pattern, str):
try:
self.pattern = re.compile(self.pattern)
except re.error:
raise ValueError(f"无效的正则表达式: {self.pattern}")
else:
raise TypeError(f"pattern 必须是正则表达式或可调用对象,而不是 {type(self.pattern)}")

View File

@@ -1,136 +0,0 @@
import re
from .models import Command
from .handlers import (
handle_help,
# handle_duel, handle_sneak_attack, handle_duel_rank,
# handle_duel_stats, handle_check_equipment, handle_rename,
handle_summary, handle_clear_messages, handle_news_request,
handle_chitchat, handle_insult,
handle_perplexity_ask, handle_reminder, handle_list_reminders, handle_delete_reminder,
handle_weather_forecast
)
# 命令列表,按优先级排序
# 优先级越小越先匹配
COMMANDS = [
# ======== 基础系统命令 ========
Command(
name="help",
pattern=re.compile(r"^(info|帮助|指令)$", re.IGNORECASE),
scope="both", # 群聊和私聊都支持
need_at=False, # 不需要@机器人
priority=10, # 优先级较高
handler=handle_help,
description="显示机器人的帮助信息"
),
# ======== Perplexity AI 命令 ========
Command(
name="perplexity_ask",
pattern=re.compile(r"^ask\s*(.+)", re.IGNORECASE | re.DOTALL),
scope="both", # 群聊和私聊都支持
need_at=True, # 需要@机器人
priority=25, # 较高优先级,确保在闲聊之前处理
handler=handle_perplexity_ask,
description="使用 Perplexity AI 进行深度查询"
),
# ======== 消息管理命令 ========
Command(
name="summary",
pattern=re.compile(r"^(summary|总结)$", re.IGNORECASE),
scope="group", # 仅群聊支持
need_at=True, # 需要@机器人
priority=30, # 优先级一般
handler=handle_summary,
description="总结群聊最近的消息"
),
Command(
name="clear_messages",
pattern=re.compile(r"^(clearmessages|清除历史)$", re.IGNORECASE),
scope="group", # 仅群聊支持
need_at=True, # 需要@机器人
priority=31, # 优先级一般
handler=handle_clear_messages,
description="从数据库中清除群聊的历史消息记录"
),
# ======== 提醒功能 ========
Command(
name="reminder",
pattern=re.compile(r"提醒我", re.IGNORECASE),
scope="both", # 支持群聊和私聊
need_at=True, # 在群聊中需要@机器人
priority=35, # 优先级适中,在基础命令后,复杂功能或闲聊前
handler=handle_reminder,
description="设置一个提醒 (包含 '提醒我' 关键字即可, 例如提醒我明天下午3点开会)"
),
Command(
name="list_reminders",
pattern=re.compile(r"^(查看提醒|我的提醒|提醒列表)$", re.IGNORECASE),
scope="both", # 支持群聊和私聊
need_at=True, # 在群聊中需要@机器人
priority=36, # 优先级略低于设置提醒
handler=handle_list_reminders,
description="查看您设置的所有提醒"
),
Command(
name="delete_reminder",
# 修改为只匹配包含"删"、"删除"或"取消"的消息,不再要求特定格式
pattern=re.compile(r"(?:删|删除|取消)", re.IGNORECASE),
scope="both", # 支持群聊和私聊
need_at=True, # 在群聊中需要@机器人
priority=37,
handler=handle_delete_reminder,
description="删除提醒 (包含'''提醒'关键字即可,如: 把开会的提醒删了)"
),
# ======== 新闻和实用工具 ========
Command(
name="weather_forecast",
pattern=re.compile(r"^(?:天气预报|天气)\s+(.+)$"), # 匹配 天气预报/预报 城市名
scope="both", # 群聊和私聊都支持
need_at=True, # 需要@机器人
priority=38, # 优先级比天气高一点
handler=handle_weather_forecast,
description="查询指定城市未来几天的天气预报 (例如:天气预报 北京)"
),
Command(
name="news",
pattern=re.compile(r"^新闻$"),
scope="both", # 群聊和私聊都支持
need_at=True, # 需要@机器人
priority=40, # 优先级一般
handler=handle_news_request,
description="获取最新新闻"
),
# ======== 骂人命令 ========
Command(
name="insult",
pattern=re.compile(r"骂一下\s*@([^\s@]+)"),
scope="group", # 仅群聊支持
need_at=True, # 需要@机器人
priority=100, # 优先级较高
handler=handle_insult,
description="骂指定用户"
),
]
# 可以添加一个函数,获取命令列表的简单描述
def get_commands_info():
"""获取所有命令的简要信息,用于调试"""
info = []
for i, cmd in enumerate(COMMANDS):
scope_str = {"group": "仅群聊", "private": "仅私聊", "both": "群聊私聊"}[cmd.scope]
at_str = "需要@" if cmd.need_at else "不需@"
info.append(f"{i+1}. [{cmd.priority}] {cmd.name} ({scope_str},{at_str}) - {cmd.description or '无描述'}")
return "\n".join(info)
# 导出所有命令
__all__ = ["COMMANDS", "get_commands_info"]

View File

@@ -1,117 +0,0 @@
import re
import logging
from typing import List, Optional, Any, Dict, Match
import traceback
from .models import Command
from .context import MessageContext
# 获取模块级 logger
logger = logging.getLogger(__name__)
class CommandRouter:
"""
命令路由器,负责将消息路由到对应的命令处理函数
"""
def __init__(self, commands: List[Command], robot_instance: Optional[Any] = None):
# 按优先级排序命令列表,数字越小优先级越高
self.commands = sorted(commands, key=lambda cmd: cmd.priority)
self.robot_instance = robot_instance
# 分析并输出命令注册信息,便于调试
scope_count = {"group": 0, "private": 0, "both": 0}
for cmd in commands:
scope_count[cmd.scope] += 1
logger.info(f"命令路由器初始化成功,共加载 {len(commands)} 个命令")
logger.info(f"命令作用域分布: 仅群聊 {scope_count['group']},仅私聊 {scope_count['private']},两者均可 {scope_count['both']}")
# 按优先级输出命令信息
for i, cmd in enumerate(self.commands[:10]): # 只输出前10个
logger.info(f"{i+1}. [{cmd.priority}] {cmd.name} - {cmd.description or '无描述'}")
if len(self.commands) > 10:
logger.info(f"... 共 {len(self.commands)} 个命令")
def dispatch(self, ctx: MessageContext) -> bool:
"""
根据消息上下文分发命令
:param ctx: 消息上下文对象
:return: 是否有命令成功处理
"""
# 确保context可以访问到robot实例
if self.robot_instance and not ctx.robot:
ctx.robot = self.robot_instance
# 如果robot有logger属性且ctx没有logger则使用robot的logger
if hasattr(self.robot_instance, 'LOG') and not ctx.logger:
ctx.logger = self.robot_instance.LOG
# 记录日志,便于调试
if ctx.logger:
ctx.logger.debug(f"开始路由消息: '{ctx.text}', 来自: {ctx.sender_name}, 群聊: {ctx.is_group}, @机器人: {ctx.is_at_bot}")
# 遍历命令列表,按优先级顺序匹配
for cmd in self.commands:
# 1. 检查作用域 (scope)
if cmd.scope != "both":
if (cmd.scope == "group" and not ctx.is_group) or \
(cmd.scope == "private" and ctx.is_group):
continue # 作用域不匹配,跳过
# 2. 检查是否需要 @ (need_at) - 仅在群聊中有效
if ctx.is_group and cmd.need_at and not ctx.is_at_bot:
continue # 需要@机器人但未被@,跳过
# 3. 执行匹配逻辑
match_result = None
try:
# 根据pattern类型执行匹配
if callable(cmd.pattern):
# 自定义匹配函数
match_result = cmd.pattern(ctx)
else:
# 正则表达式匹配
match_obj = cmd.pattern.search(ctx.text)
match_result = match_obj
# 匹配失败,尝试下一个命令
if match_result is None:
continue
# 匹配成功,记录日志
if ctx.logger:
ctx.logger.info(f"命令 '{cmd.name}' 匹配成功,准备处理")
# 4. 执行命令处理函数
try:
result = cmd.handler(ctx, match_result)
if result:
if ctx.logger:
ctx.logger.info(f"命令 '{cmd.name}' 处理成功")
return True
else:
if ctx.logger:
ctx.logger.warning(f"命令 '{cmd.name}' 处理返回False尝试下一个命令")
except Exception as e:
if ctx.logger:
ctx.logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}")
ctx.logger.error(traceback.format_exc())
else:
logger.error(f"执行命令 '{cmd.name}' 处理函数时出错: {e}", exc_info=True)
# 出错后继续尝试下一个命令
except Exception as e:
# 匹配过程出错,记录并继续
if ctx.logger:
ctx.logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}")
else:
logger.error(f"匹配命令 '{cmd.name}' 时出错: {e}", exc_info=True)
continue
# 所有命令都未匹配或处理失败
if ctx.logger:
ctx.logger.debug("所有命令匹配失败或处理失败")
return False
def get_command_descriptions(self) -> Dict[str, str]:
"""获取所有命令的描述,用于生成帮助信息"""
return {cmd.name: cmd.description for cmd in self.commands if cmd.description}

File diff suppressed because it is too large Load Diff

170
robot.py
View File

@@ -20,7 +20,6 @@ from ai_providers.ai_gemini import Gemini
from ai_providers.ai_perplexity import Perplexity
from function.func_weather import Weather
from function.func_news import News
from function.func_duel import start_duel, get_rank_list, get_player_stats, change_player_name, DuelManager, attempt_sneak_attack
from function.func_summary import MessageSummary # 导入新的MessageSummary类
from function.func_reminder import ReminderManager # 导入ReminderManager类
from configuration import Config
@@ -28,13 +27,8 @@ from constants import ChatType
from job_mgmt import Job
from function.func_xml_process import XmlProcessor
# 导入命令路由系统
# 导入Function Call系统
from commands.context import MessageContext
from commands.router import CommandRouter
from commands.registry import COMMANDS, get_commands_info
from commands.handlers import handle_chitchat # 导入闲聊处理函数
# 导入AI路由系统
from commands.ai_router import ai_router
import commands.ai_functions # 导入以注册所有AI功能
@@ -54,7 +48,6 @@ class Robot(Job):
self.wxid = self.wcf.get_self_wxid() # 获取机器人自己的wxid
self.allContacts = self.getAllContacts()
self._msg_timestamps = []
self.duel_manager = DuelManager(self.sendDuelMsg)
try:
db_path = "data/message_history.db"
@@ -165,12 +158,8 @@ class Robot(Job):
# 初始化图像生成管理器
self.image_manager = ImageGenerationManager(self.config, self.wcf, self.LOG, self.sendTextMsg)
# 初始化命令路由器
self.command_router = CommandRouter(COMMANDS, robot_instance=self)
self.LOG.info(f"命令路由系统初始化完成,共加载 {len(COMMANDS)} 条命令")
# 初始化AI路由器
self.LOG.info(f"AI路由系统初始化完成共加载 {len(ai_router.functions)} 个AI功能")
# Function Call系统已自动加载
self.LOG.info(f"🚀 Function Call系统初始化完成共加载 {len(ai_router.functions)} 个智能功能")
# 初始化提醒管理器
try:
@@ -181,8 +170,10 @@ class Robot(Job):
except Exception as e:
self.LOG.error(f"初始化提醒管理器失败: {e}", exc_info=True)
# 输出命令列表信息,便于调试
# self.LOG.debug(get_commands_info()) # 如果需要在日志中输出所有命令信息,取消本行注释
# 输出AI功能列表信息,便于调试
if self.LOG.isEnabledFor(logging.DEBUG):
for name, func in ai_router.functions.items():
self.LOG.debug(f"AI功能: {name} - {func.description} (scope: {func.scope}, need_at: {func.need_at})")
@staticmethod
def value_check(args: dict) -> bool:
@@ -192,85 +183,43 @@ class Robot(Job):
def processMsg(self, msg: WxMsg) -> None:
"""
处理收到的微信消息
处理收到的微信消息 - 纯Function Call实现
:param msg: 微信消息对象
"""
try:
# 1. 使用MessageSummary记录消息(保持不变)
# 1. 使用MessageSummary记录消息
self.message_summary.process_message_from_wxmsg(msg, self.wcf, self.allContacts, self.wxid)
# 2. 根据消息来源选择使用的AI模型
self._select_model_for_message(msg)
# 3. 获取本次对话特定的历史消息限制
specific_limit = self._get_specific_history_limit(msg)
self.LOG.debug(f"本次对话 ({msg.sender} in {msg.roomid or msg.sender}) 使用历史限制: {specific_limit}")
# 4. 预处理消息生成MessageContext
# 2. 预处理消息生成MessageContext
ctx = self.preprocess(msg)
# 确保context能访问到当前选定的chat模型及特定历史限制
setattr(ctx, 'chat', self.chat)
setattr(ctx, 'specific_max_history', specific_limit)
# 5. 使用命令路由器分发处理消息
handled = self.command_router.dispatch(ctx)
# 6. 如果正则路由器没有处理尝试AI路由器
if not handled:
# 只在被@或私聊时才使用AI路由
if (msg.from_group() and msg.is_at(self.wxid)) or not msg.from_group():
print(f"[AI路由调试] 准备调用AI路由器处理消息: {msg.content}")
ai_handled = ai_router.dispatch(ctx)
print(f"[AI路由调试] AI路由器处理结果: {ai_handled}")
if ai_handled:
self.LOG.info("消息已由AI路由器处理")
print("[AI路由调试] 消息已成功由AI路由器处理")
return
else:
print("[AI路由调试] AI路由器未处理该消息")
# 7. 如果没有命令处理器处理,则进行特殊逻辑处理
if not handled:
# 7.1 好友请求自动处理
if msg.type == 37: # 好友请求
self.autoAcceptFriendRequest(msg)
# 3. 直接使用Function Call系统处理所有消息
handled = ai_router.dispatch(ctx)
if handled:
return
# 4. 特殊系统消息处理
if msg.type == 37: # 好友请求
self.autoAcceptFriendRequest(msg)
return
elif msg.type == 10000:
# 处理新成员入群
if "加入了群聊" in msg.content and msg.from_group():
new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content)
if new_member_match:
inviter = new_member_match.group(1)
new_member = new_member_match.group(2)
welcome_msg = self.config.WELCOME_MSG.format(new_member=new_member, inviter=inviter)
self.sendTextMsg(welcome_msg, msg.roomid)
self.LOG.info(f"已发送欢迎消息给新成员 {new_member} 在群 {msg.roomid}")
return
# 7.2 系统消息处理
elif msg.type == 10000:
# 7.2.1 处理新成员入群
if "加入了群聊" in msg.content and msg.from_group():
new_member_match = re.search(r'"(.+?)"邀请"(.+?)"加入了群聊', msg.content)
if new_member_match:
inviter = new_member_match.group(1) # 邀请人
new_member = new_member_match.group(2) # 新成员
# 使用配置文件中的欢迎语,支持变量替换
welcome_msg = self.config.WELCOME_MSG.format(new_member=new_member, inviter=inviter)
self.sendTextMsg(welcome_msg, msg.roomid)
self.LOG.info(f"已发送欢迎消息给新成员 {new_member} 在群 {msg.roomid}")
return
# 7.2.2 处理新好友添加
elif "你已添加了" in msg.content:
self.sayHiToNewFriend(msg)
return
# 7.3 群聊消息,且配置了响应该群
if msg.from_group() and msg.roomid in self.config.GROUPS:
# 如果在群里被@了,但命令路由器没有处理,则进行闲聊
if msg.is_at(self.wxid):
# 调用handle_chitchat函数处理闲聊传递完整的上下文
handle_chitchat(ctx, None)
else:
pass
# 7.4 私聊消息,未被命令处理,进行闲聊
elif not msg.from_group() and not msg.from_self():
# 检查是否是文本消息(type 1)或者是包含用户输入的类型49消息
if msg.type == 1 or (msg.type == 49 and ctx.text):
self.LOG.info(f"准备回复私聊消息: 类型={msg.type}, 文本内容='{ctx.text}'")
# 调用handle_chitchat函数处理闲聊传递完整的上下文
handle_chitchat(ctx, None)
# 处理新好友添加
elif "你已添加了" in msg.content:
self.sayHiToNewFriend(msg)
return
except Exception as e:
self.LOG.error(f"处理消息时发生错误: {str(e)}", exc_info=True)
@@ -416,36 +365,12 @@ class Robot(Job):
for r in receivers:
self.sendTextMsg(report, r)
def sendDuelMsg(self, msg: str, receiver: str) -> None:
"""发送决斗消息,不受消息频率限制,不记入历史记录
:param msg: 消息字符串
:param receiver: 接收人wxid或者群id
"""
try:
self.wcf.send_text(f"{msg}", receiver, "")
except Exception as e:
self.LOG.error(f"发送决斗消息失败: {e}")
def cleanup_perplexity_threads(self):
"""清理所有Perplexity线程"""
# 如果已初始化Perplexity实例调用其清理方法
perplexity_instance = self.get_perplexity_instance()
if perplexity_instance:
perplexity_instance.cleanup()
# 检查并等待决斗线程结束
if hasattr(self, 'duel_manager') and self.duel_manager.is_duel_running():
self.LOG.info("等待决斗线程结束...")
# 最多等待5秒
for i in range(5):
if not self.duel_manager.is_duel_running():
break
time.sleep(1)
if self.duel_manager.is_duel_running():
self.LOG.warning("决斗线程在退出时仍在运行")
else:
self.LOG.info("决斗线程已结束")
def cleanup(self):
"""清理所有资源,在程序退出前调用"""
@@ -485,7 +410,6 @@ class Robot(Job):
return self.chat_models[ChatType.PERPLEXITY.value]
return None
def _select_model_for_message(self, msg: WxMsg) -> None:
"""根据消息来源选择对应的AI模型
@@ -493,17 +417,17 @@ class Robot(Job):
"""
if not hasattr(self, 'chat_models') or not self.chat_models:
return # 没有可用模型,无需切换
# 获取消息来源ID
source_id = msg.roomid if msg.from_group() else msg.sender
# 检查配置
if not hasattr(self.config, 'GROUP_MODELS'):
# 没有配置,使用默认模型
if self.default_model_id in self.chat_models:
self.chat = self.chat_models[self.default_model_id]
return
# 群聊消息处理
if msg.from_group():
model_mappings = self.config.GROUP_MODELS.get('mapping', [])
@@ -536,24 +460,24 @@ class Robot(Job):
if self.default_model_id in self.chat_models:
self.chat = self.chat_models[self.default_model_id]
return
# 如果没有找到对应配置,使用默认模型
if self.default_model_id in self.chat_models:
self.chat = self.chat_models[self.default_model_id]
def _get_specific_history_limit(self, msg: WxMsg) -> int:
"""根据消息来源和配置,获取特定的历史消息数量限制
:param msg: 微信消息对象
:return: 历史消息数量限制如果没有特定配置则返回None
"""
if not hasattr(self.config, 'GROUP_MODELS'):
# 没有配置,使用当前模型默认值
return getattr(self.chat, 'max_history_messages', None)
# 获取消息来源ID
source_id = msg.roomid if msg.from_group() else msg.sender
# 确定查找的映射和字段名
if msg.from_group():
mappings = self.config.GROUP_MODELS.get('mapping', [])
@@ -561,7 +485,7 @@ class Robot(Job):
else:
mappings = self.config.GROUP_MODELS.get('private_mapping', [])
key_field = 'wxid'
# 在映射中查找特定配置
for mapping in mappings:
if mapping.get(key_field) == source_id:
@@ -574,7 +498,7 @@ class Robot(Job):
# 找到了配置但没有max_history使用模型默认值
self.LOG.debug(f"{source_id} 找到映射但无特定历史限制,使用模型默认值")
break
# 没有找到特定限制,使用当前模型的默认值
default_limit = getattr(self.chat, 'max_history_messages', None)
self.LOG.debug(f"未找到 {source_id} 的特定历史限制,使用模型默认值: {default_limit}")