feat: 重构为 Agent 架构并实现工具系统

This commit is contained in:
zihanjian
2026-02-04 17:35:08 +08:00
parent 10559d06f1
commit f0339f79ac
12 changed files with 1113 additions and 582 deletions

80
tools/__init__.py Normal file
View File

@@ -0,0 +1,80 @@
"""
工具系统 —— 让 LLM 在 Agent 循环中自主调用工具。
每个 Tool 提供 OpenAI function-calling 格式的 schema 和一个同步执行函数。
ToolRegistry 汇总所有工具,生成 tools 列表和统一的 tool_handler。
"""
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class Tool:
"""LLM 可调用的工具。"""
name: str
description: str
parameters: dict # JSON Schema
handler: Callable[..., str] = None # (ctx, **kwargs) -> str
def to_openai_schema(self) -> dict:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
class ToolRegistry:
"""收集工具,为 Agent 循环提供 tools + tool_handler。"""
def __init__(self):
self._tools: Dict[str, Tool] = {}
def register(self, tool: Tool) -> None:
self._tools[tool.name] = tool
logger.info(f"注册工具: {tool.name}")
def get(self, name: str) -> Optional[Tool]:
return self._tools.get(name)
@property
def tools(self) -> Dict[str, Tool]:
return dict(self._tools)
def get_openai_tools(self) -> List[dict]:
"""返回所有工具的 OpenAI function-calling schema 列表。"""
return [t.to_openai_schema() for t in self._tools.values()]
def create_handler(self, ctx: Any) -> Callable[[str, dict], str]:
"""创建一个绑定了消息上下文的 tool_handler 函数。"""
registry = self._tools
def handler(tool_name: str, arguments: dict) -> str:
tool = registry.get(tool_name)
if not tool:
return json.dumps(
{"error": f"Unknown tool: {tool_name}"},
ensure_ascii=False,
)
try:
result = tool.handler(ctx, **arguments)
if not isinstance(result, str):
result = json.dumps(result, ensure_ascii=False)
return result
except Exception as e:
logger.error(f"工具 {tool_name} 执行失败: {e}", exc_info=True)
return json.dumps({"error": str(e)}, ensure_ascii=False)
return handler
# ── 全局工具注册表 ──────────────────────────────────────────
tool_registry = ToolRegistry()

189
tools/history.py Normal file
View File

@@ -0,0 +1,189 @@
"""聊天历史查询工具 —— 从 handlers.py 的内联定义中提取而来。
支持三种查询模式:
keywords — 关键词模糊搜索
range — 按倒序偏移取连续消息
time — 按时间窗口取消息
"""
import json
from tools import Tool, tool_registry
DEFAULT_VISIBLE_LIMIT = 30
def _handle_lookup_chat_history(ctx, mode: str = "", keywords: list = None,
start_offset: int = None, end_offset: int = None,
start_time: str = None, end_time: str = None,
**_) -> str:
message_summary = getattr(ctx.robot, "message_summary", None) if ctx.robot else None
if not message_summary:
return json.dumps({"error": "消息历史功能不可用"}, ensure_ascii=False)
chat_id = ctx.get_receiver()
visible_limit = DEFAULT_VISIBLE_LIMIT
raw = getattr(ctx, "specific_max_history", None)
if raw is not None:
try:
visible_limit = int(raw)
except (TypeError, ValueError):
pass
# 推断模式
mode = (mode or "").strip().lower()
if not mode:
if start_time and end_time:
mode = "time"
elif start_offset is not None and end_offset is not None:
mode = "range"
else:
mode = "keywords"
# ── keywords ────────────────────────────────────────────
if mode == "keywords":
if isinstance(keywords, str):
keywords = [keywords]
elif not isinstance(keywords, list):
keywords = []
cleaned = []
seen = set()
for kw in keywords:
if kw is None:
continue
s = str(kw).strip()
if s and (len(s) > 1 or s.isdigit()):
low = s.lower()
if low not in seen:
seen.add(low)
cleaned.append(s)
if not cleaned:
return json.dumps({"error": "未提供有效关键词", "results": []}, ensure_ascii=False)
search_results = message_summary.search_messages_with_context(
chat_id=chat_id,
keywords=cleaned,
context_window=10,
max_groups=20,
exclude_recent=visible_limit,
)
segments = []
lines_seen = set()
for seg in search_results:
formatted = [l for l in seg.get("formatted_messages", []) if l not in lines_seen]
lines_seen.update(formatted)
if formatted:
segments.append({
"matched_keywords": seg.get("matched_keywords", []),
"messages": formatted,
})
payload = {"segments": segments, "returned_groups": len(segments), "keywords": cleaned}
if not segments:
payload["notice"] = "未找到匹配的消息。"
return json.dumps(payload, ensure_ascii=False)
# ── range ───────────────────────────────────────────────
if mode == "range":
if start_offset is None or end_offset is None:
return json.dumps({"error": "range 模式需要 start_offset 和 end_offset"}, ensure_ascii=False)
try:
start_offset, end_offset = int(start_offset), int(end_offset)
except (TypeError, ValueError):
return json.dumps({"error": "start_offset 和 end_offset 必须是整数"}, ensure_ascii=False)
if start_offset <= visible_limit or end_offset <= visible_limit:
return json.dumps(
{"error": f"偏移量必须大于 {visible_limit} 以排除当前可见消息"},
ensure_ascii=False,
)
if start_offset > end_offset:
start_offset, end_offset = end_offset, start_offset
result = message_summary.get_messages_by_reverse_range(
chat_id=chat_id, start_offset=start_offset, end_offset=end_offset,
)
payload = {
"start_offset": result.get("start_offset"),
"end_offset": result.get("end_offset"),
"messages": result.get("messages", []),
"returned_count": result.get("returned_count", 0),
"total_messages": result.get("total_messages", 0),
}
if payload["returned_count"] == 0:
payload["notice"] = "请求范围内没有消息。"
return json.dumps(payload, ensure_ascii=False)
# ── time ────────────────────────────────────────────────
if mode == "time":
if not start_time or not end_time:
return json.dumps({"error": "time 模式需要 start_time 和 end_time"}, ensure_ascii=False)
time_lines = message_summary.get_messages_by_time_window(
chat_id=chat_id, start_time=start_time, end_time=end_time,
)
payload = {
"start_time": start_time,
"end_time": end_time,
"messages": time_lines,
"returned_count": len(time_lines),
}
if not time_lines:
payload["notice"] = "该时间范围内没有消息。"
return json.dumps(payload, ensure_ascii=False)
return json.dumps({"error": f"不支持的模式: {mode}"}, ensure_ascii=False)
# ── 注册 ────────────────────────────────────────────────────
tool_registry.register(Tool(
name="lookup_chat_history",
description=(
"查询聊天历史记录。你当前只能看到最近的消息,调用此工具可以回溯更早的上下文。"
"支持三种模式:\n"
"1. mode=\"keywords\" — 用关键词模糊搜索历史消息,返回匹配片段及上下文。"
" 需要 keywords 数组2-4 个关键词)。\n"
"2. mode=\"range\" — 按倒序偏移获取连续消息块。"
" 需要 start_offset 和 end_offset均需大于当前可见消息数\n"
"3. mode=\"time\" — 按时间窗口获取消息。"
" 需要 start_time 和 end_time格式如 2025-05-01 08:00\n"
"可多次调用,例如先用 keywords 找到锚点,再用 range/time 扩展上下文。"
),
parameters={
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["keywords", "range", "time"],
"description": "查询模式",
},
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "mode=keywords 时的搜索关键词",
},
"start_offset": {
"type": "integer",
"description": "mode=range 时的起始偏移(从最新消息倒数)",
},
"end_offset": {
"type": "integer",
"description": "mode=range 时的结束偏移",
},
"start_time": {
"type": "string",
"description": "mode=time 时的开始时间 (YYYY-MM-DD HH:MM)",
},
"end_time": {
"type": "string",
"description": "mode=time 时的结束时间 (YYYY-MM-DD HH:MM)",
},
},
"additionalProperties": False,
},
handler=_handle_lookup_chat_history,
))

164
tools/reminder.py Normal file
View File

@@ -0,0 +1,164 @@
"""提醒工具 —— 创建 / 查看 / 删除提醒。
LLM 直接传入结构化参数,不再需要二级路由或二次 AI 解析。
"""
import json
from datetime import datetime
from tools import Tool, tool_registry
# ── 创建提醒 ────────────────────────────────────────────────
def _handle_reminder_create(ctx, type: str = "once", time: str = "",
content: str = "", weekday: int = None, **_) -> str:
if not hasattr(ctx.robot, "reminder_manager"):
return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False)
if not time or not content:
return json.dumps({"error": "缺少必要字段: time 和 content"}, ensure_ascii=False)
if len(content.strip()) < 2:
return json.dumps({"error": "提醒内容太短"}, ensure_ascii=False)
# 校验时间格式
if type == "once":
parsed_dt = None
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"):
try:
parsed_dt = datetime.strptime(time, fmt)
break
except ValueError:
continue
if not parsed_dt:
return json.dumps({"error": f"once 类型时间格式应为 YYYY-MM-DD HH:MM收到: {time}"}, ensure_ascii=False)
if parsed_dt < datetime.now():
return json.dumps({"error": f"时间 {time} 已过去,请使用未来的时间"}, ensure_ascii=False)
time = parsed_dt.strftime("%Y-%m-%d %H:%M")
elif type in ("daily", "weekly"):
parsed_time = None
for fmt in ("%H:%M", "%H:%M:%S"):
try:
parsed_time = datetime.strptime(time, fmt)
break
except ValueError:
continue
if not parsed_time:
return json.dumps({"error": f"daily/weekly 类型时间格式应为 HH:MM收到: {time}"}, ensure_ascii=False)
time = parsed_time.strftime("%H:%M")
else:
return json.dumps({"error": f"不支持的提醒类型: {type}"}, ensure_ascii=False)
if type == "weekly":
if weekday is None or not (isinstance(weekday, int) and 0 <= weekday <= 6):
return json.dumps({"error": "weekly 类型需要 weekday 参数 (0=周一 … 6=周日)"}, ensure_ascii=False)
data = {"type": type, "time": time, "content": content, "extra": {}}
if weekday is not None:
data["weekday"] = weekday
roomid = ctx.msg.roomid if ctx.is_group else None
success, result = ctx.robot.reminder_manager.add_reminder(ctx.msg.sender, data, roomid=roomid)
if success:
type_label = {"once": "一次性", "daily": "每日", "weekly": "每周"}.get(type, type)
return json.dumps({"success": True, "id": result,
"message": f"已创建{type_label}提醒: {time} - {content}"}, ensure_ascii=False)
return json.dumps({"success": False, "error": result}, ensure_ascii=False)
# ── 查看提醒 ────────────────────────────────────────────────
def _handle_reminder_list(ctx, **_) -> str:
if not hasattr(ctx.robot, "reminder_manager"):
return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False)
reminders = ctx.robot.reminder_manager.list_reminders(ctx.msg.sender)
if not reminders:
return json.dumps({"reminders": [], "message": "当前没有任何提醒"}, ensure_ascii=False)
return json.dumps({"reminders": reminders, "count": len(reminders)}, ensure_ascii=False)
# ── 删除提醒 ────────────────────────────────────────────────
def _handle_reminder_delete(ctx, reminder_id: str = "", delete_all: bool = False, **_) -> str:
if not hasattr(ctx.robot, "reminder_manager"):
return json.dumps({"error": "提醒管理器未初始化"}, ensure_ascii=False)
if delete_all:
success, message, count = ctx.robot.reminder_manager.delete_all_reminders(ctx.msg.sender)
return json.dumps({"success": success, "message": message, "deleted_count": count}, ensure_ascii=False)
if not reminder_id:
return json.dumps({"error": "请提供 reminder_id或设置 delete_all=true 删除全部"}, ensure_ascii=False)
success, message = ctx.robot.reminder_manager.delete_reminder(ctx.msg.sender, reminder_id)
return json.dumps({"success": success, "message": message}, ensure_ascii=False)
# ── 注册 ────────────────────────────────────────────────────
tool_registry.register(Tool(
name="reminder_create",
description=(
"创建提醒。支持 once(一次性)、daily(每日)、weekly(每周) 三种类型。"
"当前时间已在对话上下文中提供,请据此计算目标时间。"
),
parameters={
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["once", "daily", "weekly"],
"description": "提醒类型",
},
"time": {
"type": "string",
"description": "once → YYYY-MM-DD HH:MMdaily/weekly → HH:MM",
},
"content": {
"type": "string",
"description": "提醒内容",
},
"weekday": {
"type": "integer",
"description": "仅 weekly 需要。0=周一 … 6=周日",
},
},
"required": ["type", "time", "content"],
"additionalProperties": False,
},
handler=_handle_reminder_create,
))
tool_registry.register(Tool(
name="reminder_list",
description="查看当前用户的所有提醒列表。",
parameters={"type": "object", "properties": {}, "additionalProperties": False},
handler=_handle_reminder_list,
))
tool_registry.register(Tool(
name="reminder_delete",
description=(
"删除提醒。需要先调用 reminder_list 获取 ID再用 reminder_id 精确删除;"
"或设置 delete_all=true 一次性删除全部。"
),
parameters={
"type": "object",
"properties": {
"reminder_id": {
"type": "string",
"description": "要删除的提醒完整 ID",
},
"delete_all": {
"type": "boolean",
"description": "是否删除该用户全部提醒",
},
},
"additionalProperties": False,
},
handler=_handle_reminder_delete,
))

61
tools/web_search.py Normal file
View File

@@ -0,0 +1,61 @@
"""网络搜索工具 —— 通过 Perplexity 联网搜索。
直接调用 perplexity.get_answer() 获取同步结果,
结果回传给 LLM 做综合回答,而非直接发送给用户。
"""
import json
import re
from tools import Tool, tool_registry
def _handle_web_search(ctx, query: str = "", deep_research: bool = False, **_) -> str:
if not query:
return json.dumps({"error": "请提供搜索关键词"}, ensure_ascii=False)
perplexity_instance = getattr(ctx.robot, "perplexity", None)
if not perplexity_instance:
return json.dumps({"error": "Perplexity 搜索功能不可用,未配置或未初始化"}, ensure_ascii=False)
try:
chat_id = ctx.get_receiver()
response = perplexity_instance.get_answer(query, chat_id, deep_research=deep_research)
if not response:
return json.dumps({"error": "搜索无结果"}, ensure_ascii=False)
# 清理 <think> 标签reasoning 模型可能返回)
cleaned = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL).strip()
if not cleaned:
cleaned = response
return json.dumps({"result": cleaned}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": f"搜索失败: {e}"}, ensure_ascii=False)
tool_registry.register(Tool(
name="web_search",
description=(
"在网络上搜索信息。用于回答需要最新数据、实时信息或你不确定的事实性问题。"
"deep_research 仅在问题非常复杂、需要深度研究时才开启。"
),
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词或问题",
},
"deep_research": {
"type": "boolean",
"description": "是否启用深度研究模式(耗时较长,仅用于复杂问题)",
},
},
"required": ["query"],
"additionalProperties": False,
},
handler=_handle_web_search,
))