mirror of
https://github.com/Zippland/Bubbles.git
synced 2026-02-08 01:02:07 +08:00
212 lines
7.8 KiB
Python
212 lines
7.8 KiB
Python
import re
|
||
import json
|
||
import logging
|
||
from typing import Dict, Callable, Optional, Any, Tuple
|
||
from dataclasses import dataclass, field
|
||
from .context import MessageContext
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
@dataclass
|
||
class AIFunction:
|
||
"""AI可调用的功能定义"""
|
||
name: str # 功能唯一标识名
|
||
handler: Callable # 处理函数
|
||
description: str # 功能描述(给AI看的)
|
||
examples: list[str] = field(default_factory=list) # 示例用法
|
||
params_description: str = "" # 参数说明
|
||
|
||
class AIRouter:
|
||
"""AI智能路由器"""
|
||
|
||
def __init__(self):
|
||
self.functions: Dict[str, AIFunction] = {}
|
||
self.logger = logger
|
||
|
||
def register(self, name: str, description: str, examples: list[str] = None, params_description: str = ""):
|
||
"""
|
||
装饰器:注册一个功能到AI路由器
|
||
|
||
@ai_router.register(
|
||
name="weather_query",
|
||
description="查询指定城市的天气预报",
|
||
examples=["北京天气怎么样", "查一下上海的天气", "明天深圳会下雨吗"],
|
||
params_description="城市名称"
|
||
)
|
||
def handle_weather(ctx: MessageContext, params: str) -> bool:
|
||
# 实现天气查询逻辑
|
||
pass
|
||
"""
|
||
def decorator(func: Callable) -> Callable:
|
||
ai_func = AIFunction(
|
||
name=name,
|
||
handler=func,
|
||
description=description,
|
||
examples=examples or [],
|
||
params_description=params_description
|
||
)
|
||
self.functions[name] = ai_func
|
||
self.logger.info(f"AI路由器注册功能: {name} - {description}")
|
||
return func
|
||
|
||
return decorator
|
||
|
||
def _build_ai_prompt(self) -> str:
|
||
"""构建给AI的系统提示词,包含所有可用功能的信息"""
|
||
prompt = """你是一个智能路由助手。根据用户的输入,判断用户的意图并返回JSON格式的响应。
|
||
|
||
可用的功能列表:
|
||
"""
|
||
for name, func in self.functions.items():
|
||
prompt += f"\n- {name}: {func.description}"
|
||
if func.params_description:
|
||
prompt += f"\n 参数: {func.params_description}"
|
||
if func.examples:
|
||
prompt += f"\n 示例: {', '.join(func.examples[:3])}"
|
||
prompt += "\n"
|
||
|
||
prompt += """
|
||
分析用户输入,严格按照以下格式返回JSON:
|
||
|
||
如果用户需要使用上述功能之一,返回:
|
||
{
|
||
"action_type": "function",
|
||
"function_name": "上述功能列表中的功能名",
|
||
"params": "从用户输入中提取的参数"
|
||
}
|
||
|
||
如果用户只是聊天或者不匹配任何功能,返回:
|
||
{
|
||
"action_type": "chat"
|
||
}
|
||
|
||
示例:
|
||
- 用户输入"北京天气怎么样" -> {"action_type": "function", "function_name": "weather_query", "params": "北京"}
|
||
- 用户输入"看看新闻" -> {"action_type": "function", "function_name": "news_query", "params": ""}
|
||
- 用户输入"你好" -> {"action_type": "chat"}
|
||
- 用户输入"查一下Python教程" -> {"action_type": "function", "function_name": "perplexity_search", "params": "Python教程"}
|
||
|
||
重要:
|
||
1. action_type 只能是 "function" 或 "chat"
|
||
2. 只返回JSON,无需其他解释
|
||
3. function_name 必须完全匹配上述功能列表中的名称
|
||
"""
|
||
return prompt
|
||
|
||
def route(self, ctx: MessageContext) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""
|
||
AI路由决策
|
||
|
||
返回: (是否处理成功, AI决策结果)
|
||
"""
|
||
print(f"[AI路由器] route方法被调用")
|
||
|
||
if not ctx.text:
|
||
print("[AI路由器] ctx.text为空,返回False")
|
||
return False, None
|
||
|
||
# 获取AI模型
|
||
chat_model = getattr(ctx, 'chat', None)
|
||
if not chat_model:
|
||
chat_model = getattr(ctx.robot, 'chat', None) if ctx.robot else None
|
||
|
||
if not chat_model:
|
||
print("[AI路由器] 无可用的AI模型")
|
||
self.logger.error("AI路由器:无可用的AI模型")
|
||
return False, None
|
||
|
||
print(f"[AI路由器] 找到AI模型: {type(chat_model)}")
|
||
|
||
try:
|
||
# 构建系统提示词
|
||
system_prompt = self._build_ai_prompt()
|
||
print(f"[AI路由器] 已构建系统提示词,长度: {len(system_prompt)}")
|
||
|
||
# 让AI分析用户意图
|
||
user_input = f"用户输入:{ctx.text}"
|
||
print(f"[AI路由器] 准备调用AI分析意图: {user_input}")
|
||
|
||
ai_response = chat_model.get_answer(
|
||
user_input,
|
||
wxid=ctx.get_receiver(),
|
||
system_prompt_override=system_prompt
|
||
)
|
||
|
||
print(f"[AI路由器] AI响应: {ai_response}")
|
||
|
||
# 解析AI返回的JSON
|
||
json_match = re.search(r'\{.*\}', ai_response, re.DOTALL)
|
||
if not json_match:
|
||
self.logger.warning(f"AI路由器:无法从AI响应中提取JSON - {ai_response}")
|
||
return False, None
|
||
|
||
decision = json.loads(json_match.group(0))
|
||
|
||
# 验证决策格式
|
||
action_type = decision.get("action_type")
|
||
if action_type not in ["chat", "function"]:
|
||
self.logger.warning(f"AI路由器:未知的action_type - {action_type}")
|
||
return False, None
|
||
|
||
# 如果是功能调用,验证功能名
|
||
if action_type == "function":
|
||
function_name = decision.get("function_name")
|
||
if function_name not in self.functions:
|
||
self.logger.warning(f"AI路由器:未知的功能名 - {function_name}")
|
||
return False, None
|
||
|
||
self.logger.info(f"AI路由决策: {decision}")
|
||
return True, decision
|
||
|
||
except json.JSONDecodeError as e:
|
||
self.logger.error(f"AI路由器:解析JSON失败 - {e}")
|
||
return False, None
|
||
except Exception as e:
|
||
self.logger.error(f"AI路由器:处理异常 - {e}")
|
||
return False, None
|
||
|
||
def dispatch(self, ctx: MessageContext) -> bool:
|
||
"""
|
||
执行AI路由分发
|
||
|
||
返回: 是否成功处理
|
||
"""
|
||
print(f"[AI路由器] dispatch被调用,消息内容: {ctx.text}")
|
||
|
||
# 获取AI路由决策
|
||
success, decision = self.route(ctx)
|
||
print(f"[AI路由器] route返回 - success: {success}, decision: {decision}")
|
||
|
||
if not success or not decision:
|
||
print("[AI路由器] route失败或无决策,返回False")
|
||
return False
|
||
|
||
action_type = decision.get("action_type")
|
||
|
||
# 如果是聊天,返回False让后续处理器处理
|
||
if action_type == "chat":
|
||
self.logger.info("AI路由器:识别为聊天意图,交给聊天处理器")
|
||
return False
|
||
|
||
# 如果是功能调用
|
||
if action_type == "function":
|
||
function_name = decision.get("function_name")
|
||
params = decision.get("params", "")
|
||
|
||
func = self.functions.get(function_name)
|
||
if not func:
|
||
self.logger.error(f"AI路由器:功能 {function_name} 未找到")
|
||
return False
|
||
|
||
try:
|
||
self.logger.info(f"AI路由器:调用功能 {function_name},参数: {params}")
|
||
result = func.handler(ctx, params)
|
||
return result
|
||
except Exception as e:
|
||
self.logger.error(f"AI路由器:执行功能 {function_name} 出错 - {e}")
|
||
return False
|
||
|
||
return False
|
||
|
||
# 创建全局AI路由器实例
|
||
ai_router = AIRouter() |