Files
Bubbles/function/func_summary.py
zihanjian aeb5282193 300 条
2025-10-28 11:26:18 +08:00

787 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import logging
import time
import datetime
import re
from collections import deque
# from threading import Lock # 不再需要锁使用SQLite的事务机制
import sqlite3 # 添加sqlite3模块
import os # 用于处理文件路径
from function.func_xml_process import XmlProcessor # 导入XmlProcessor
MAX_DB_HISTORY_LIMIT = 10000
SUMMARY_MESSAGE_LIMIT = 300
def _is_internal_tool_message(content: str) -> bool:
"""判断是否为内部工具日志消息,避免被当作正常聊天记录返回"""
if not content:
return False
markers = (
"[search_chat_history]",
"[fetch_chat_history_range]",
"[fetch_chat_history_time_window]",
"[History tool failed",
)
return any(marker in content for marker in markers)
class MessageSummary:
"""消息总结功能类 (使用SQLite持久化)
用于记录、管理和生成聊天历史消息的总结
"""
def __init__(self, max_history=MAX_DB_HISTORY_LIMIT, db_path="data/message_history.db"):
"""初始化消息总结功能
Args:
max_history: 每个聊天保存的最大消息数量
db_path: SQLite数据库文件路径
"""
self.LOG = logging.getLogger("MessageSummary")
try:
parsed_history_limit = int(max_history)
except (TypeError, ValueError):
parsed_history_limit = MAX_DB_HISTORY_LIMIT
if parsed_history_limit <= 0:
parsed_history_limit = MAX_DB_HISTORY_LIMIT
if parsed_history_limit > MAX_DB_HISTORY_LIMIT:
self.LOG.warning(
f"传入的 max_history={parsed_history_limit} 超过上限,将截断为 {MAX_DB_HISTORY_LIMIT}"
)
parsed_history_limit = MAX_DB_HISTORY_LIMIT
self.max_history = parsed_history_limit
self.db_path = db_path
# 实例化XML处理器用于提取引用消息
self.xml_processor = XmlProcessor(self.LOG)
try:
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
self.LOG.info(f"创建数据库目录: {db_dir}")
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.cursor = self.conn.cursor()
self.LOG.info(f"已连接到 SQLite 数据库: {self.db_path}")
# 检查并添加 sender_wxid 列 (如果不存在)
self.cursor.execute("PRAGMA table_info(messages)")
columns = [col[1] for col in self.cursor.fetchall()]
if 'sender_wxid' not in columns:
try:
self.cursor.execute("ALTER TABLE messages ADD COLUMN sender_wxid TEXT")
self.conn.commit()
self.LOG.info("已向 messages 表添加 sender_wxid 列")
except sqlite3.OperationalError as e:
# 如果表是空的,直接删除重建可能更简单
self.LOG.warning(f"添加 sender_wxid 列失败 ({e}),可能是因为表非空且有主键?尝试重建表。")
# 注意:这会丢失现有数据!
self.cursor.execute("DROP TABLE IF EXISTS messages")
self.conn.commit()
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id TEXT NOT NULL,
sender TEXT NOT NULL,
sender_wxid TEXT, -- 新增: 存储发送者wxid
content TEXT NOT NULL,
timestamp_float REAL NOT NULL,
timestamp_str TEXT NOT NULL -- 存储完整时间格式 YYYY-MM-DD HH:MM:SS
)
""")
self.cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chat_time ON messages (chat_id, timestamp_float)
""")
# 新增 sender_wxid 索引 (可选如果经常需要按wxid查询)
self.cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_sender_wxid ON messages (sender_wxid)
""")
self.conn.commit() # 提交更改
self.LOG.info("消息表已准备就绪")
except sqlite3.Error as e:
self.LOG.error(f"数据库初始化失败: {e}")
raise ConnectionError(f"无法连接或初始化数据库: {e}") from e
except OSError as e:
self.LOG.error(f"创建数据库目录失败: {e}")
raise OSError(f"无法创建数据库目录: {e}") from e
def close_db(self):
"""关闭数据库连接"""
if hasattr(self, 'conn') and self.conn:
try:
self.conn.commit() # 确保所有更改都已保存
self.conn.close()
self.LOG.info("数据库连接已关闭")
except sqlite3.Error as e:
self.LOG.error(f"关闭数据库连接时出错: {e}")
def record_message(self, chat_id, sender_name, sender_wxid, content, timestamp=None):
"""记录单条消息到数据库
Args:
chat_id: 聊天ID群ID或用户ID
sender_name: 发送者名称
sender_wxid: 发送者wxid
content: 消息内容
timestamp: 外部提供的时间字符串(优先使用),否则生成
"""
try:
current_time_float = time.time()
if not timestamp:
# 默认使用完整时间格式
timestamp_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_time_float))
else:
# 如果传入的时间戳只有时分,转换为完整格式
if len(timestamp) <= 5: # 如果格式是 "HH:MM"
today = time.strftime("%Y-%m-%d", time.localtime(current_time_float))
timestamp_str = f"{today} {timestamp}:00" # 补上秒
elif len(timestamp) == 8 and timestamp.count(':') == 2: # 如果格式是 "HH:MM:SS"
today = time.strftime("%Y-%m-%d", time.localtime(current_time_float))
timestamp_str = f"{today} {timestamp}"
elif len(timestamp) == 16 and timestamp.count('-') == 2 and timestamp.count(':') == 1: # "YYYY-MM-DD HH:MM"
timestamp_str = f"{timestamp}:00" # 补上秒
else:
timestamp_str = timestamp # 假设是完整格式
# 插入新消息,包含 sender_wxid
self.cursor.execute("""
INSERT INTO messages (chat_id, sender, sender_wxid, content, timestamp_float, timestamp_str)
VALUES (?, ?, ?, ?, ?, ?)
""", (chat_id, sender_name, sender_wxid, content, current_time_float, timestamp_str))
# 删除超出 max_history 的旧消息
self.cursor.execute("""
DELETE FROM messages
WHERE chat_id = ? AND id NOT IN (
SELECT id
FROM messages
WHERE chat_id = ?
ORDER BY timestamp_float DESC
LIMIT ?
)
""", (chat_id, chat_id, self.max_history))
self.conn.commit() # 提交事务
except sqlite3.Error as e:
self.LOG.error(f"记录消息到数据库时出错: {e}")
try:
self.conn.rollback()
except:
pass
def clear_message_history(self, chat_id):
"""清除指定聊天的消息历史记录
Args:
chat_id: 聊天ID群ID或用户ID
Returns:
bool: 是否成功清除
"""
try:
self.cursor.execute("DELETE FROM messages WHERE chat_id = ?", (chat_id,))
rows_deleted = self.cursor.rowcount
self.conn.commit()
self.LOG.info(f"为 chat_id={chat_id} 清除了 {rows_deleted} 条历史消息")
return True
except sqlite3.Error as e:
self.LOG.error(f"清除消息历史时出错 (chat_id={chat_id}): {e}")
return False
def get_message_count(self, chat_id):
"""获取指定聊天的消息数量
Args:
chat_id: 聊天ID群ID或用户ID
Returns:
int: 消息数量
"""
try:
self.cursor.execute("SELECT COUNT(*) FROM messages WHERE chat_id = ?", (chat_id,))
result = self.cursor.fetchone()
return result[0] if result else 0
except sqlite3.Error as e:
self.LOG.error(f"获取消息数量时出错 (chat_id={chat_id}): {e}")
return 0
def get_messages(self, chat_id):
"""获取指定聊天的所有消息 (按时间升序)包含发送者wxid和完整时间戳
Args:
chat_id: 聊天ID群ID或用户ID
Returns:
list: 消息列表,格式为 [{"sender": ..., "sender_wxid": ..., "content": ..., "time": ...}]
"""
messages = []
try:
# 查询需要的字段,包括 sender_wxid 和 timestamp_str
self.cursor.execute("""
SELECT sender, sender_wxid, content, timestamp_str
FROM messages
WHERE chat_id = ?
ORDER BY timestamp_float ASC
LIMIT ?
""", (chat_id, self.max_history))
rows = self.cursor.fetchall()
# 将数据库行转换为期望的字典列表格式
for row in rows:
messages.append({
"sender": row[0],
"sender_wxid": row[1], # 添加 sender_wxid
"content": row[2],
"time": row[3] # 使用存储的完整 timestamp_str
})
except sqlite3.Error as e:
self.LOG.error(f"获取消息列表时出错 (chat_id={chat_id}): {e}")
return messages
def search_messages_with_context(
self,
chat_id,
keywords,
context_window=5,
max_groups=20,
exclude_recent=30
):
"""根据关键词搜索消息,返回包含前后上下文的结果
Args:
chat_id (str): 聊天ID群ID或用户ID
keywords (Union[str, list[str]]): 需要搜索的关键词或关键词列表
context_window (int): 每条匹配消息前后额外提供的消息数量
max_groups (int): 返回的最多结果组数(按时间倒序,优先最新消息)
exclude_recent (int): 跳过最近的若干条消息默认30条
Returns:
list[dict]: 搜索结果列表,每个元素包含匹配关键词、锚点消息及上下文消息
"""
if not keywords:
return []
if isinstance(keywords, str):
keywords = [keywords]
normalized_keywords = []
for kw in keywords:
if kw is None:
continue
kw_str = str(kw).strip()
if kw_str:
normalized_keywords.append((kw_str, kw_str.lower()))
if not normalized_keywords:
return []
try:
context_window = int(context_window)
except (TypeError, ValueError):
context_window = 5
context_window = max(0, min(context_window, 10)) # 限制上下文窗口大小,避免过长
try:
max_groups = int(max_groups)
except (TypeError, ValueError):
max_groups = 20
max_groups = max(1, min(max_groups, 20))
messages = self.get_messages(chat_id)
if not messages:
return []
try:
exclude_recent = int(exclude_recent)
except (TypeError, ValueError):
exclude_recent = 30
exclude_recent = max(0, exclude_recent)
results = []
total_messages = len(messages)
cutoff_index = total_messages - exclude_recent
if cutoff_index <= 0:
return []
used_indices = set()
for idx in range(cutoff_index - 1, -1, -1):
message = messages[idx]
content = message.get("content", "")
if not content:
continue
lower_content = content.lower()
matched_keywords = [
orig
for orig, lower in normalized_keywords
if lower in lower_content
]
if not matched_keywords:
continue
if idx in used_indices:
continue
start = max(0, idx - context_window)
end = min(total_messages, idx + context_window + 1)
segment_messages = []
formatted_lines = []
seen_lines = set()
for pos in range(start, end):
msg = messages[pos]
line = f"{msg.get('time')} {msg.get('sender')} {msg.get('content')}"
if line not in seen_lines:
seen_lines.add(line)
formatted_lines.append(line)
segment_messages.append({
"time": msg.get("time"),
"sender": msg.get("sender"),
"sender_wxid": msg.get("sender_wxid"),
"content": msg.get("content"),
"relative_offset": pos - idx,
"is_match": pos == idx
})
results.append({
"matched_keywords": matched_keywords,
"anchor_index": idx,
"anchor_time": message.get("time"),
"anchor_sender": message.get("sender"),
"anchor_sender_wxid": message.get("sender_wxid"),
"messages": segment_messages,
"formatted_messages": formatted_lines
})
for off in range(start, end):
used_indices.add(off)
if len(results) >= max_groups:
break
return results
def get_messages_by_reverse_range(
self,
chat_id,
start_offset,
end_offset,
max_messages_limit=500
):
"""按倒数范围获取消息
Args:
chat_id (str): 聊天ID群ID或用户ID
start_offset (int): 离最新消息的起始偏移(倒数 start_offset 条,必须 > 0
end_offset (int): 离最新消息的结束偏移(倒数 end_offset 条,必须 >= start_offset
max_messages_limit (int): 内部限制,防止一次返回过多消息
Returns:
dict: 包含请求范围信息和格式化消息行
"""
try:
start_offset = int(start_offset)
end_offset = int(end_offset)
except (TypeError, ValueError):
raise ValueError("start_offset 和 end_offset 必须是整数")
if start_offset <= 0 or end_offset <= 0:
raise ValueError("start_offset 和 end_offset 必须为正整数")
if start_offset > end_offset:
start_offset, end_offset = end_offset, start_offset
try:
max_messages_limit = int(max_messages_limit)
except (TypeError, ValueError):
max_messages_limit = 500
max_messages_limit = max(1, min(max_messages_limit, 1000))
messages = self.get_messages(chat_id)
total_messages = len(messages)
if total_messages == 0:
return {
"start_offset": start_offset,
"end_offset": end_offset,
"messages": [],
"returned_count": 0,
"total_messages": 0
}
start_offset = min(start_offset, total_messages)
end_offset = min(end_offset, total_messages)
start_index = max(total_messages - end_offset, 0)
end_index = min(total_messages - start_offset, total_messages - 1)
if end_index < start_index:
return {
"start_offset": start_offset,
"end_offset": end_offset,
"messages": [],
"returned_count": 0,
"total_messages": total_messages
}
selected = messages[start_index:end_index + 1]
if len(selected) > max_messages_limit:
selected = selected[-max_messages_limit:]
formatted_lines = [
f"{msg.get('time')} {msg.get('sender')} {msg.get('content')}"
for msg in selected
]
return {
"start_offset": start_offset,
"end_offset": end_offset,
"messages": formatted_lines,
"returned_count": len(formatted_lines),
"total_messages": total_messages
}
def _ai_summarize(self, messages, chat_model, chat_id):
"""使用AI模型生成消息总结
Args:
messages: 消息列表 (格式同 get_messages 返回值)
chat_model: AI聊天模型对象
chat_id: 聊天ID
Returns:
str: 消息总结
"""
if not messages:
return "没有可以总结的历史消息。"
formatted_msgs = []
for msg in messages:
# 使用新的时间格式和发送者
formatted_msgs.append(f"[{msg['time']}]{msg['sender']}: {msg['content']}")
summary_system_prompt = (
"你是泡泡,请仔细阅读并分析以下聊天记录,生成一简要的、结构清晰且抓住重点的摘要。\n\n"
"摘要格式要求:\n"
"1. 使用数字编号列表 (例如 1., 2., 3.) 来组织内容每个编号代表一个独立的主要讨论主题不要超过3个主题。\n"
"2. 在每个编号的主题下,写成一段不带格式的文字,每个主题单独成段并空行,需包含以下内容:\n"
" - 这个讨论的核心的简要描述。\n"
" - 该讨论的关键成员 (用括号 [用户名] 格式) 和他们的关键发言内容、成员之间的关键互动。\n"
" - 该讨论的讨论结果。\n"
"3. 总结需客观、精炼、简短精悍,直接呈现最核心且精简的事实,尽量不要添加额外的评论或分析,不要总结有关自己的事情。\n"
"4. 不要暴露出格式不要说核心是xxx参与者是xxx结果是xxx自然一点。\n"
)
prompt = "聊天记录如下:\n" + "\n".join(formatted_msgs)
try:
self.LOG.info(f"[Summary] chat_id={chat_id} 准备总结最近 {len(messages)} 条消息")
summary = chat_model.get_answer(
prompt,
f"summary_{chat_id}",
system_prompt_override=summary_system_prompt
)
if not summary:
return self._basic_summarize(messages)
return summary
except Exception as e:
self.LOG.error(f"使用AI生成总结失败: {e}")
return self._basic_summarize(messages)
def _basic_summarize(self, messages):
"""基本的消息总结逻辑不使用AI
Args:
messages: 消息列表 (格式同 get_messages 返回值)
Returns:
str: 消息总结
"""
if not messages:
return "没有可以总结的历史消息。"
res = ["以下是近期聊天记录摘要:\n"]
for msg in messages:
# 使用新的时间格式和发送者
res.append(f"[{msg['time']}]{msg['sender']}: {msg['content']}")
return "\n".join(res)
def summarize_messages(self, chat_id, chat_model=None):
"""生成消息总结
Args:
chat_id: 聊天ID群ID或用户ID
chat_model: AI聊天模型对象如果为None则使用基础总结
Returns:
str: 消息总结
"""
messages = self.get_messages(chat_id)
if messages:
messages = messages[-SUMMARY_MESSAGE_LIMIT:]
if not messages:
return "没有可以总结的历史消息。"
if chat_model:
if hasattr(chat_model, 'get_answer') and hasattr(chat_model, 'message_summary') and chat_model.message_summary:
return self._ai_summarize(messages, chat_model, chat_id)
else:
self.LOG.warning(
f"提供的 chat_model ({type(chat_model)}) 不支持基于数据库历史的总结或未正确初始化。将使用基础总结。"
)
return self._basic_summarize(messages)
else:
return self._basic_summarize(messages)
def process_message_from_wxmsg(self, msg, wcf, all_contacts, bot_wxid=None):
"""从微信消息对象中处理并记录与总结相关的文本消息
记录所有群聊和私聊的文本(1)和App/卡片(49)消息。
使用 XmlProcessor 提取用户实际输入的新内容或卡片标题。
Args:
msg: 微信消息对象(WxMsg)
wcf: 微信接口对象
all_contacts: 所有联系人字典
bot_wxid: 机器人自己的wxid (必须提供以正确记录 sender_wxid)
"""
if msg.type != 0x01 and msg.type != 49:
return
chat_id = msg.roomid if msg.from_group() else msg.sender
if not chat_id:
self.LOG.warning(f"无法确定消息的chat_id (msg.id={msg.id}), 跳过记录")
return
sender_wxid = msg.sender
if not sender_wxid:
# 理论上不应发生,但做个防护
self.LOG.error(f"消息 (id={msg.id}) 缺少 sender wxid无法记录")
return
# 确定发送者名称 (逻辑不变)
sender_name = ""
if msg.from_group():
sender_name = wcf.get_alias_in_chatroom(sender_wxid, chat_id)
if not sender_name:
sender_name = all_contacts.get(sender_wxid, sender_wxid)
else:
if bot_wxid and sender_wxid == bot_wxid:
sender_name = all_contacts.get(bot_wxid, "机器人")
else:
sender_name = all_contacts.get(sender_wxid, sender_wxid)
# 使用 XmlProcessor 提取消息详情 (逻辑不变)
extracted_data = None
try:
if msg.from_group():
extracted_data = self.xml_processor.extract_quoted_message(msg)
else:
extracted_data = self.xml_processor.extract_private_quoted_message(msg)
except Exception as e:
self.LOG.error(f"使用XmlProcessor提取消息内容时出错 (msg.id={msg.id}, type={msg.type}): {e}")
if msg.type == 0x01 and not ("<" in msg.content and ">" in msg.content):
content_to_record = msg.content.strip()
source_info = "来自 纯文本消息 (XML解析失败后备)"
self.LOG.warning(f"XML解析失败但记录纯文本消息: {content_to_record[:50]}...")
current_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 调用 record_message 时需要 sender_wxid
self.record_message(chat_id, sender_name, sender_wxid, content_to_record, current_time_str)
return
# 确定要记录的内容 (content_to_record) - 复用之前的逻辑
content_to_record = ""
source_info = "未知来源"
# 优先使用提取到的新内容 (来自回复或普通文本或<title>)
temp_new_content = extracted_data.get("new_content", "").strip()
if temp_new_content:
content_to_record = temp_new_content
source_info = "来自 new_content (回复/文本/标题)"
# 如果是引用类型消息,添加引用标记和引用内容的简略信息
if extracted_data.get("has_quote", False):
quoted_sender = extracted_data.get("quoted_sender", "")
quoted_content = extracted_data.get("quoted_content", "")
# 处理被引用内容
if quoted_content:
# 对较长的引用内容进行截断
max_quote_length = 30
if len(quoted_content) > max_quote_length:
quoted_content = quoted_content[:max_quote_length] + "..."
# 如果被引用的是卡片,则使用标准卡片格式
if extracted_data.get("quoted_is_card", False):
quoted_card_title = extracted_data.get("quoted_card_title", "")
quoted_card_type = extracted_data.get("quoted_card_type", "")
# 根据卡片类型确定内容类型
card_type = "卡片"
if "链接" in quoted_card_type or "消息" in quoted_card_type:
card_type = "链接"
elif "视频" in quoted_card_type or "音乐" in quoted_card_type:
card_type = "媒体"
elif "位置" in quoted_card_type:
card_type = "位置"
elif "图片" in quoted_card_type:
card_type = "图片"
elif "文件" in quoted_card_type:
card_type = "文件"
# 整个卡片内容包裹在【】中
quoted_content = f"{card_type}: {quoted_card_title}"
# 根据是否有被引用者信息构建引用前缀
if quoted_sender:
content_to_record = f"{content_to_record} 【回复 {quoted_sender}{quoted_content}"
else:
content_to_record = f"{content_to_record} 【回复:{quoted_content}"
# 其次,如果新内容为空,但这是一个卡片且有标题,则使用卡片标题
elif extracted_data.get("is_card") and extracted_data.get("card_title", "").strip():
card_title = extracted_data.get("card_title", "").strip()
card_description = extracted_data.get("card_description", "").strip()
card_type = extracted_data.get("card_type", "")
card_source = extracted_data.get("card_appname") or extracted_data.get("card_sourcedisplayname", "")
if "链接" in card_type or "消息" in card_type: content_type = "链接"
elif "视频" in card_type or "音乐" in card_type: content_type = "媒体"
elif "位置" in card_type: content_type = "位置"
elif "图片" in card_type: content_type = "图片"
elif "文件" in card_type: content_type = "文件"
else: content_type = "卡片"
card_content = f"{content_type}: {card_title}"
if card_description:
max_desc_length = 50
if len(card_description) > max_desc_length:
card_description = card_description[:max_desc_length] + "..."
card_content += f" - {card_description}"
if card_source:
card_content += f" (来自:{card_source})"
content_to_record = f"{card_content}"
source_info = "来自 卡片(标题+描述)"
# 普通文本消息的保底处理
elif msg.type == 0x01 and not ("<" in msg.content and ">" in msg.content): # 再次确认是纯文本
content_to_record = msg.content.strip() # 使用原始纯文本
source_info = "来自 纯文本消息"
# 如果最终没有提取到有效内容,则不记录 (逻辑不变)
if not content_to_record:
self.LOG.debug(f"未能提取到有效文本内容用于记录,跳过 (msg.id={msg.id}, type={msg.type}) - IsCard: {extracted_data.get('is_card', False)}, HasQuote: {extracted_data.get('has_quote', False)}")
return
# 获取当前时间字符串 (使用完整格式)
current_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.LOG.debug(f"记录消息 (来源: {source_info}, 类型: {'群聊' if msg.from_group() else '私聊'}): '[{current_time_str}]{sender_name}({sender_wxid}): {content_to_record}' (来自 msg.id={msg.id})")
# 调用 record_message 时传入 sender_wxid
self.record_message(chat_id, sender_name, sender_wxid, content_to_record, current_time_str)
@staticmethod
def _parse_datetime(dt_value):
"""解析多种常见时间格式"""
if isinstance(dt_value, datetime.datetime):
return dt_value
if not dt_value:
return None
candidates = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y/%m/%d %H:%M",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M"
]
dt_str = str(dt_value).strip()
for fmt in candidates:
try:
return datetime.datetime.strptime(dt_str, fmt)
except (ValueError, TypeError):
continue
return None
def get_messages_by_time_window(
self,
chat_id,
start_time,
end_time,
exclude_recent=30,
max_messages=500
):
"""根据时间窗口获取消息
Args:
chat_id (str): 聊天ID
start_time (Union[str, datetime]): 起始时间
end_time (Union[str, datetime]): 结束时间
exclude_recent (int): 跳过最新的消息数量
max_messages (int): 返回的最大消息数
Returns:
list[str]: 已格式化的消息行
"""
start_dt = self._parse_datetime(start_time)
end_dt = self._parse_datetime(end_time)
if not start_dt or not end_dt:
return []
try:
max_messages = int(max_messages)
except (TypeError, ValueError):
max_messages = 500
max_messages = max(1, min(max_messages, 500))
messages = self.get_messages(chat_id)
if not messages:
return []
total_messages = len(messages)
cutoff_index = total_messages - max(exclude_recent, 0)
if cutoff_index <= 0:
return []
collected = []
# 确保 start <= end
if start_dt > end_dt:
start_dt, end_dt = end_dt, start_dt
for idx in range(cutoff_index - 1, -1, -1):
msg = messages[idx]
content = msg.get("content")
if _is_internal_tool_message(content):
continue
time_str = msg.get("time")
dt = self._parse_datetime(time_str)
if not dt:
continue
if start_dt <= dt <= end_dt:
collected.append(f"{time_str} {msg.get('sender')} {content}")
if len(collected) >= max_messages:
break
collected.reverse()
return collected