初始提交

This commit is contained in:
Zylan
2025-04-23 13:30:10 +08:00
commit db26c07bb3
49 changed files with 40973 additions and 0 deletions

View File

@@ -0,0 +1,856 @@
import logging
import re
import html
import time
import xml.etree.ElementTree as ET
from wcferry import WxMsg
class XmlProcessor:
"""处理微信消息XML解析的工具类"""
def __init__(self, logger=None):
"""初始化XML处理器
Args:
logger: 日志对象,如果不提供则创建一个新的
"""
self.logger = logger or logging.getLogger("XmlProcessor")
def extract_quoted_message(self, msg: WxMsg) -> dict:
"""从微信消息中提取引用内容
Args:
msg: 微信消息对象
Returns:
dict: {
"new_content": "", # 用户新发送的内容
"quoted_content": "", # 引用的内容
"quoted_sender": "", # 被引用消息的发送者
"media_type": "", # 媒体类型(文本/图片/视频/链接等)
"has_quote": False, # 是否包含引用
"is_card": False, # 是否为卡片消息
"card_type": "", # 卡片类型
"card_title": "", # 卡片标题
"card_description": "", # 卡片描述
"card_url": "", # 卡片链接
"card_appname": "", # 卡片来源应用
"card_sourcedisplayname": "", # 来源显示名称
"quoted_is_card": False, # 被引用的内容是否为卡片
"quoted_card_type": "", # 被引用的卡片类型
"quoted_card_title": "", # 被引用的卡片标题
"quoted_card_description": "", # 被引用的卡片描述
"quoted_card_url": "", # 被引用的卡片链接
"quoted_card_appname": "", # 被引用的卡片来源应用
"quoted_card_sourcedisplayname": "" # 被引用的来源显示名称
}
"""
result = {
"new_content": "",
"quoted_content": "",
"quoted_sender": "",
"media_type": "文本",
"has_quote": False,
"is_card": False,
"card_type": "",
"card_title": "",
"card_description": "",
"card_url": "",
"card_appname": "",
"card_sourcedisplayname": "",
"quoted_is_card": False,
"quoted_card_type": "",
"quoted_card_title": "",
"quoted_card_description": "",
"quoted_card_url": "",
"quoted_card_appname": "",
"quoted_card_sourcedisplayname": ""
}
try:
# 检查消息类型
if msg.type != 0x01 and msg.type != 49: # 普通文本消息或APP消息
return result
self.logger.info(f"处理群聊消息: 类型={msg.type}, 发送者={msg.sender}")
# 检查是否为引用消息类型 (type 57)
is_quote_msg = False
appmsg_type_match = re.search(r'<appmsg.*?type="(\d+)"', msg.content, re.DOTALL)
if appmsg_type_match and appmsg_type_match.group(1) == "57":
is_quote_msg = True
self.logger.info("检测到引用类型消息 (type 57)")
# 检查是否包含refermsg标签
has_refermsg = "<refermsg>" in msg.content
# 确定是否是引用操作
is_referring = is_quote_msg or has_refermsg
# 处理App类型消息类型49
if msg.type == 49:
if not is_referring:
# 如果不是引用消息,按普通卡片处理
card_details = self.extract_card_details(msg.content)
result.update(card_details)
# 根据卡片类型更新媒体类型
if card_details["is_card"] and card_details["card_type"]:
result["media_type"] = card_details["card_type"]
# 引用消息情况下我们不立即更新result的卡片信息因为外层appmsg是引用容器
# 处理用户新输入内容
# 优先检查是否有<title>标签内容
title_match = re.search(r'<title>(.*?)</title>', msg.content)
if title_match:
# 对于引用消息从title标签提取用户新输入
if is_referring:
result["new_content"] = title_match.group(1).strip()
self.logger.info(f"引用消息中的新内容: {result['new_content']}")
else:
# 对于普通卡片消息避免将card_title重复设为new_content
extracted_title = title_match.group(1).strip()
if not (result["is_card"] and result["card_title"] == extracted_title):
result["new_content"] = extracted_title
self.logger.info(f"从title标签提取到用户新消息: {result['new_content']}")
elif msg.type == 0x01: # 纯文本消息
# 检查是否有XML标签如果没有则视为普通消息
if not ("<" in msg.content and ">" in msg.content):
result["new_content"] = msg.content
return result
# 如果是引用消息处理refermsg部分
if is_referring:
result["has_quote"] = True
# 提取refermsg内容
refer_data = self.extract_refermsg(msg.content)
result["quoted_sender"] = refer_data.get("sender", "")
result["quoted_content"] = refer_data.get("content", "")
# 从raw_content尝试解析被引用内容的卡片信息
raw_content = refer_data.get("raw_content", "")
if raw_content and "<appmsg" in raw_content:
quoted_card_details = self.extract_card_details(raw_content)
# 将引用的卡片详情存储到quoted_前缀的字段
result["quoted_is_card"] = quoted_card_details["is_card"]
result["quoted_card_type"] = quoted_card_details["card_type"]
result["quoted_card_title"] = quoted_card_details["card_title"]
result["quoted_card_description"] = quoted_card_details["card_description"]
result["quoted_card_url"] = quoted_card_details["card_url"]
result["quoted_card_appname"] = quoted_card_details["card_appname"]
result["quoted_card_sourcedisplayname"] = quoted_card_details["card_sourcedisplayname"]
# 如果没有提取到有效内容使用卡片标题作为quoted_content
if not result["quoted_content"] and quoted_card_details["card_title"]:
result["quoted_content"] = quoted_card_details["card_title"]
self.logger.info(f"成功从引用内容中提取卡片信息: {quoted_card_details['card_type']}")
else:
# 如果未发现卡片特征尝试fallback方法
if not result["quoted_content"]:
fallback_content = self.extract_quoted_fallback(msg.content)
if fallback_content:
if fallback_content.startswith("引用内容:") or fallback_content.startswith("相关内容:"):
result["quoted_content"] = fallback_content.split(":", 1)[1].strip()
else:
result["quoted_content"] = fallback_content
# 设置媒体类型
if result["is_card"] and result["card_type"]:
result["media_type"] = result["card_type"]
elif is_referring and result["quoted_is_card"]:
# 如果当前消息是引用,且引用的是卡片,则媒体类型设为"引用消息"
result["media_type"] = "引用消息"
else:
# 普通消息,使用群聊消息类型识别
result["media_type"] = self.identify_message_type(msg.content)
return result
except Exception as e:
self.logger.error(f"处理群聊引用消息时出错: {e}")
return result
def extract_private_quoted_message(self, msg: WxMsg) -> dict:
"""专门处理私聊引用消息,返回结构化数据
Args:
msg: 微信消息对象
Returns:
dict: {
"new_content": "", # 用户新发送的内容
"quoted_content": "", # 引用的内容
"quoted_sender": "", # 被引用消息的发送者
"media_type": "", # 媒体类型(文本/图片/视频/链接等)
"has_quote": False, # 是否包含引用
"is_card": False, # 是否为卡片消息
"card_type": "", # 卡片类型
"card_title": "", # 卡片标题
"card_description": "", # 卡片描述
"card_url": "", # 卡片链接
"card_appname": "", # 卡片来源应用
"card_sourcedisplayname": "", # 来源显示名称
"quoted_is_card": False, # 被引用的内容是否为卡片
"quoted_card_type": "", # 被引用的卡片类型
"quoted_card_title": "", # 被引用的卡片标题
"quoted_card_description": "", # 被引用的卡片描述
"quoted_card_url": "", # 被引用的卡片链接
"quoted_card_appname": "", # 被引用的卡片来源应用
"quoted_card_sourcedisplayname": "" # 被引用的来源显示名称
}
"""
result = {
"new_content": "",
"quoted_content": "",
"quoted_sender": "",
"media_type": "文本",
"has_quote": False,
"is_card": False,
"card_type": "",
"card_title": "",
"card_description": "",
"card_url": "",
"card_appname": "",
"card_sourcedisplayname": "",
"quoted_is_card": False,
"quoted_card_type": "",
"quoted_card_title": "",
"quoted_card_description": "",
"quoted_card_url": "",
"quoted_card_appname": "",
"quoted_card_sourcedisplayname": ""
}
try:
# 检查消息类型
if msg.type != 0x01 and msg.type != 49: # 普通文本消息或APP消息
return result
self.logger.info(f"处理私聊消息: 类型={msg.type}, 发送者={msg.sender}")
# 检查是否为引用消息类型 (type 57)
is_quote_msg = False
appmsg_type_match = re.search(r'<appmsg.*?type="(\d+)"', msg.content, re.DOTALL)
if appmsg_type_match and appmsg_type_match.group(1) == "57":
is_quote_msg = True
self.logger.info("检测到引用类型消息 (type 57)")
# 检查是否包含refermsg标签
has_refermsg = "<refermsg>" in msg.content
# 确定是否是引用操作
is_referring = is_quote_msg or has_refermsg
# 处理App类型消息类型49
if msg.type == 49:
if not is_referring:
# 如果不是引用消息,按普通卡片处理
card_details = self.extract_card_details(msg.content)
result.update(card_details)
# 根据卡片类型更新媒体类型
if card_details["is_card"] and card_details["card_type"]:
result["media_type"] = card_details["card_type"]
# 引用消息情况下我们不立即更新result的卡片信息因为外层appmsg是引用容器
# 处理用户新输入内容
# 优先检查是否有<title>标签内容
title_match = re.search(r'<title>(.*?)</title>', msg.content)
if title_match:
# 对于引用消息从title标签提取用户新输入
if is_referring:
result["new_content"] = title_match.group(1).strip()
self.logger.info(f"引用消息中的新内容: {result['new_content']}")
else:
# 对于普通卡片消息避免将card_title重复设为new_content
extracted_title = title_match.group(1).strip()
if not (result["is_card"] and result["card_title"] == extracted_title):
result["new_content"] = extracted_title
self.logger.info(f"从title标签提取到用户新消息: {result['new_content']}")
elif msg.type == 0x01: # 纯文本消息
# 检查是否有XML标签如果没有则视为普通消息
if not ("<" in msg.content and ">" in msg.content):
result["new_content"] = msg.content
return result
# 如果是引用消息处理refermsg部分
if is_referring:
result["has_quote"] = True
# 提取refermsg内容
refer_data = self.extract_private_refermsg(msg.content)
result["quoted_sender"] = refer_data.get("sender", "")
result["quoted_content"] = refer_data.get("content", "")
# 从raw_content尝试解析被引用内容的卡片信息
raw_content = refer_data.get("raw_content", "")
if raw_content and "<appmsg" in raw_content:
quoted_card_details = self.extract_card_details(raw_content)
# 将引用的卡片详情存储到quoted_前缀的字段
result["quoted_is_card"] = quoted_card_details["is_card"]
result["quoted_card_type"] = quoted_card_details["card_type"]
result["quoted_card_title"] = quoted_card_details["card_title"]
result["quoted_card_description"] = quoted_card_details["card_description"]
result["quoted_card_url"] = quoted_card_details["card_url"]
result["quoted_card_appname"] = quoted_card_details["card_appname"]
result["quoted_card_sourcedisplayname"] = quoted_card_details["card_sourcedisplayname"]
# 如果没有提取到有效内容使用卡片标题作为quoted_content
if not result["quoted_content"] and quoted_card_details["card_title"]:
result["quoted_content"] = quoted_card_details["card_title"]
self.logger.info(f"成功从引用内容中提取卡片信息: {quoted_card_details['card_type']}")
else:
# 如果未发现卡片特征尝试fallback方法
if not result["quoted_content"]:
fallback_content = self.extract_quoted_fallback(msg.content)
if fallback_content:
if fallback_content.startswith("引用内容:") or fallback_content.startswith("相关内容:"):
result["quoted_content"] = fallback_content.split(":", 1)[1].strip()
else:
result["quoted_content"] = fallback_content
# 设置媒体类型
if result["is_card"] and result["card_type"]:
result["media_type"] = result["card_type"]
elif is_referring and result["quoted_is_card"]:
# 如果当前消息是引用,且引用的是卡片,则媒体类型设为"引用消息"
result["media_type"] = "引用消息"
else:
# 普通消息,使用私聊消息类型识别
result["media_type"] = self.identify_private_message_type(msg.content)
return result
except Exception as e:
self.logger.error(f"处理私聊引用消息时出错: {e}")
return result
def extract_refermsg(self, content: str) -> dict:
"""专门提取群聊refermsg节点内容包括HTML解码
Args:
content: 消息内容
Returns:
dict: {
"sender": "", # 发送者
"content": "", # 引用内容
"raw_content": "" # 解码后的原始XML内容用于后续解析
}
"""
result = {"sender": "", "content": "", "raw_content": ""}
try:
# 使用正则表达式精确提取refermsg内容避免完整XML解析
refermsg_match = re.search(r'<refermsg>(.*?)</refermsg>', content, re.DOTALL)
if not refermsg_match:
return result
refermsg_content = refermsg_match.group(1)
# 提取发送者
displayname_match = re.search(r'<displayname>(.*?)</displayname>', refermsg_content, re.DOTALL)
if displayname_match:
result["sender"] = displayname_match.group(1).strip()
# 提取内容并进行HTML解码
content_match = re.search(r'<content>(.*?)</content>', refermsg_content, re.DOTALL)
if content_match:
# 获取引用的原始内容可能是HTML编码的XML
extracted_content = content_match.group(1)
# 保存解码后的原始内容,用于后续解析
decoded_content = html.unescape(extracted_content)
result["raw_content"] = decoded_content
# 清理内容中的HTML标签用于文本展示
cleaned_content = re.sub(r'<.*?>', '', extracted_content)
# 清理HTML实体编码和多余空格
cleaned_content = re.sub(r'\s+', ' ', cleaned_content).strip()
# 解码HTML实体
cleaned_content = html.unescape(cleaned_content)
result["content"] = cleaned_content
return result
except Exception as e:
self.logger.error(f"提取群聊refermsg内容时出错: {e}")
return result
def extract_private_refermsg(self, content: str) -> dict:
"""专门提取私聊refermsg节点内容包括HTML解码
Args:
content: 消息内容
Returns:
dict: {
"sender": "", # 发送者
"content": "", # 引用内容
"raw_content": "" # 解码后的原始XML内容用于后续解析
}
"""
result = {"sender": "", "content": "", "raw_content": ""}
try:
# 使用正则表达式精确提取refermsg内容避免完整XML解析
refermsg_match = re.search(r'<refermsg>(.*?)</refermsg>', content, re.DOTALL)
if not refermsg_match:
return result
refermsg_content = refermsg_match.group(1)
# 提取发送者
displayname_match = re.search(r'<displayname>(.*?)</displayname>', refermsg_content, re.DOTALL)
if displayname_match:
result["sender"] = displayname_match.group(1).strip()
# 提取内容并进行HTML解码
content_match = re.search(r'<content>(.*?)</content>', refermsg_content, re.DOTALL)
if content_match:
# 获取引用的原始内容可能是HTML编码的XML
extracted_content = content_match.group(1)
# 保存解码后的原始内容,用于后续解析
decoded_content = html.unescape(extracted_content)
result["raw_content"] = decoded_content
# 清理内容中的HTML标签用于文本展示
cleaned_content = re.sub(r'<.*?>', '', extracted_content)
# 清理HTML实体编码和多余空格
cleaned_content = re.sub(r'\s+', ' ', cleaned_content).strip()
# 解码HTML实体
cleaned_content = html.unescape(cleaned_content)
result["content"] = cleaned_content
return result
except Exception as e:
self.logger.error(f"提取私聊refermsg内容时出错: {e}")
return result
def identify_message_type(self, content: str) -> str:
"""识别群聊消息的媒体类型
Args:
content: 消息内容
Returns:
str: 媒体类型描述
"""
try:
if "<appmsg type=\"2\"" in content:
return "图片"
elif "<appmsg type=\"5\"" in content:
return "文件"
elif "<appmsg type=\"4\"" in content:
return "链接分享"
elif "<appmsg type=\"3\"" in content:
return "音频"
elif "<appmsg type=\"6\"" in content:
return "视频"
elif "<appmsg type=\"8\"" in content:
return "动画表情"
elif "<appmsg type=\"1\"" in content:
return "文本卡片"
elif "<appmsg type=\"7\"" in content:
return "位置分享"
elif "<appmsg type=\"17\"" in content:
return "实时位置分享"
elif "<appmsg type=\"19\"" in content:
return "频道消息"
elif "<appmsg type=\"33\"" in content:
return "小程序"
elif "<appmsg type=\"57\"" in content:
return "引用消息"
else:
return "文本"
except Exception as e:
self.logger.error(f"识别消息类型时出错: {e}")
return "文本"
def identify_private_message_type(self, content: str) -> str:
"""识别私聊消息的媒体类型
Args:
content: 消息内容
Returns:
str: 媒体类型描述
"""
try:
if "<appmsg type=\"2\"" in content:
return "图片"
elif "<appmsg type=\"5\"" in content:
return "文件"
elif "<appmsg type=\"4\"" in content:
return "链接分享"
elif "<appmsg type=\"3\"" in content:
return "音频"
elif "<appmsg type=\"6\"" in content:
return "视频"
elif "<appmsg type=\"8\"" in content:
return "动画表情"
elif "<appmsg type=\"1\"" in content:
return "文本卡片"
elif "<appmsg type=\"7\"" in content:
return "位置分享"
elif "<appmsg type=\"17\"" in content:
return "实时位置分享"
elif "<appmsg type=\"19\"" in content:
return "频道消息"
elif "<appmsg type=\"33\"" in content:
return "小程序"
elif "<appmsg type=\"57\"" in content:
return "引用消息"
else:
return "文本"
except Exception as e:
self.logger.error(f"识别消息类型时出错: {e}")
return "文本"
def extract_quoted_fallback(self, content: str) -> str:
"""当XML解析失败时的后备提取方法
Args:
content: 原始消息内容
Returns:
str: 提取的引用内容,如果未找到返回空字符串
"""
try:
# 使用正则表达式直接从内容中提取
# 查找<content>标签内容
content_match = re.search(r'<content>(.*?)</content>', content, re.DOTALL)
if content_match:
extracted = content_match.group(1)
# 清理可能存在的XML标签
extracted = re.sub(r'<.*?>', '', extracted)
# 去除换行符和多余空格
extracted = re.sub(r'\s+', ' ', extracted).strip()
# 解码HTML实体
extracted = html.unescape(extracted)
return extracted
# 查找displayname和content的组合
display_name_match = re.search(r'<displayname>(.*?)</displayname>', content, re.DOTALL)
content_match = re.search(r'<content>(.*?)</content>', content, re.DOTALL)
if display_name_match and content_match:
name = re.sub(r'<.*?>', '', display_name_match.group(1))
text = re.sub(r'<.*?>', '', content_match.group(1))
# 去除换行符和多余空格
text = re.sub(r'\s+', ' ', text).strip()
# 解码HTML实体
name = html.unescape(name)
text = html.unescape(text)
return f"{name}: {text}"
# 查找引用或回复的关键词
if "引用" in content or "回复" in content:
# 寻找引用关键词后的内容
match = re.search(r'[引用|回复].*?[:](.*?)(?:<|$)', content, re.DOTALL)
if match:
text = match.group(1).strip()
text = re.sub(r'<.*?>', '', text)
# 去除换行符和多余空格
text = re.sub(r'\s+', ' ', text).strip()
# 解码HTML实体
text = html.unescape(text)
return text
return ""
except Exception as e:
self.logger.error(f"后备提取引用内容时出错: {e}")
return ""
def extract_card_details(self, content: str) -> dict:
"""从消息内容中提取卡片详情 (使用 ElementTree 解析)
Args:
content: 消息内容 (XML 字符串)
Returns:
dict: 包含卡片详情的字典
"""
result = {
"is_card": False,
"card_type": "",
"card_title": "",
"card_description": "",
"card_url": "",
"card_appname": "",
"card_sourcedisplayname": ""
}
try:
# 1. 定位并提取 <appmsg> 标签内容
# 正则表达式用于精确找到 <appmsg>...</appmsg> 部分,避免解析整个消息体可能引入的错误
appmsg_match = re.search(r'<appmsg.*?>(.*?)</appmsg>', content, re.DOTALL | re.IGNORECASE)
if not appmsg_match:
# 有些简单的 appmsg 可能没有闭合标签,尝试匹配自闭合或非标准格式
appmsg_match_simple = re.search(r'(<appmsg[^>]*>)', content, re.IGNORECASE)
if not appmsg_match_simple:
# 尝试查找 <msg> 下的 <appmsg> 作为根
msg_match = re.search(r'<msg>(.*?)</msg>', content, re.DOTALL | re.IGNORECASE)
if msg_match:
inner_content = msg_match.group(1)
try:
# 尝试将<msg>内的内容解析为根然后查找appmsg
# 为了容错,添加一个虚拟根标签
root = ET.fromstring(f"<root>{inner_content}</root>")
appmsg_node = root.find('.//appmsg')
if appmsg_node is None:
self.logger.debug("在 <msg> 内未找到 <appmsg> 标签")
return result # 未找到 appmsg不是标准卡片
# 将 Element 对象转回字符串以便后续统一处理(或直接使用 Element对象查找
# 为简化后续流程我们还是转回字符串交给下面的ET.fromstring处理
# 注意:这里需要重新构造 appmsg 标签本身ET.tostring只包含内容
appmsg_xml_str = ET.tostring(appmsg_node, encoding='unicode', method='xml')
except ET.ParseError as parse_error:
self.logger.debug(f"解析 <msg> 内容时出错: {parse_error}")
return result # 解析失败
else:
self.logger.debug("未找到 <appmsg> 标签")
return result # 未找到 appmsg不是标准卡片
else:
# 对于 <appmsg ... /> 这种简单情况,可能无法提取内部标签,但也标记为卡片
appmsg_xml_str = appmsg_match_simple.group(1)
result["is_card"] = True # 标记为卡片,即使可能无法提取详细信息
else:
# 需要重新包含 <appmsg ...> 标签本身来解析属性
appmsg_outer_match = re.search(r'(<appmsg[^>]*>).*?</appmsg>', content, re.DOTALL | re.IGNORECASE)
if not appmsg_outer_match:
# 如果上面的正则失败,尝试简单匹配开始标签
appmsg_outer_match = re.search(r'(<appmsg[^>]*>)', content, re.IGNORECASE)
if appmsg_outer_match:
appmsg_tag_start = appmsg_outer_match.group(1)
appmsg_inner_content = appmsg_match.group(1)
appmsg_xml_str = f"{appmsg_tag_start}{appmsg_inner_content}</appmsg>"
else:
self.logger.warning("无法提取完整的 <appmsg> 标签结构")
return result # 结构不完整
# 2. 使用 ElementTree 解析 <appmsg> 内容
try:
# 尝试解析提取出的 <appmsg> XML 字符串
# 使用 XML 而不是 fromstring因为它对根元素要求更宽松
appmsg_root = ET.XML(appmsg_xml_str)
result["is_card"] = True # 解析成功,确认是卡片
# 3. 提取卡片类型 (来自 <appmsg> 标签的 type 属性)
card_type_num = appmsg_root.get('type', '') # 安全获取属性
if card_type_num:
result["card_type"] = self.get_card_type_name(card_type_num)
else:
# 尝试从内部 <type> 标签获取 (兼容旧格式或特殊格式)
type_node = appmsg_root.find('./type')
if type_node is not None and type_node.text:
result["card_type"] = self.get_card_type_name(type_node.text.strip())
# 4. 提取标题 (<title>)
title = appmsg_root.findtext('./title', default='').strip()
if title:
result["card_title"] = html.unescape(title)
# 5. 提取描述 (<des>)
description = appmsg_root.findtext('./des', default='').strip()
if description:
cleaned_desc = re.sub(r'<.*?>', '', description) # 清理HTML标签
result["card_description"] = html.unescape(cleaned_desc)
# 6. 提取链接 (<url>)
url = appmsg_root.findtext('./url', default='').strip()
if url:
result["card_url"] = html.unescape(url)
# 7. 提取应用名称 (<appinfo/appname> 或 <sourcedisplayname>)
# 优先尝试 <appinfo><appname>
appname_node = appmsg_root.find('./appinfo/appname')
if appname_node is not None and appname_node.text:
appname = appname_node.text.strip()
result["card_appname"] = html.unescape(appname)
# 如果没找到,或者为空,尝试 <sourcedisplayname>
sourcedisplayname_node = appmsg_root.find('./sourcedisplayname')
if sourcedisplayname_node is not None and sourcedisplayname_node.text:
sourcedisplayname = sourcedisplayname_node.text.strip()
result["card_sourcedisplayname"] = html.unescape(sourcedisplayname)
# 如果 appname 为空,使用 sourcedisplayname 作为 appname
if not result["card_appname"]:
result["card_appname"] = result["card_sourcedisplayname"]
# 兼容直接在 appmsg 下的 appname
if not result["card_appname"]:
appname_direct = appmsg_root.findtext('./appname', default='').strip()
if appname_direct:
result["card_appname"] = html.unescape(appname_direct)
# 记录提取结果用于调试
self.logger.debug(f"ElementTree 解析结果: type={result['card_type']}, title={result['card_title']}, desc_len={len(result['card_description'])}, url_len={len(result['card_url'])}, app={result['card_appname']}, source={result['card_sourcedisplayname']}")
except ET.ParseError as e:
self.logger.error(f"使用 ElementTree 解析 <appmsg> 时出错: {e}\nXML 内容片段: {appmsg_xml_str[:500]}...", exc_info=True)
# 即使解析<appmsg>出错,如果正则找到了<appmsg>,仍然标记为卡片
if result["is_card"] == False and ('<appmsg' in content or '<msg>' in content):
result["is_card"] = True # 基本判断是卡片,但细节提取失败
# 尝试用正则提取基础信息作为后备
type_match_fallback = re.search(r'<type>(\d+)</type>', content)
title_match_fallback = re.search(r'<title>(.*?)</title>', content, re.DOTALL)
if type_match_fallback:
result["card_type"] = self.get_card_type_name(type_match_fallback.group(1))
if title_match_fallback:
result["card_title"] = html.unescape(title_match_fallback.group(1).strip())
self.logger.warning("ElementTree 解析失败,已尝试正则后备提取基础信息")
except Exception as e:
self.logger.error(f"提取卡片详情时发生意外错误: {e}", exc_info=True)
# 尽量判断是否是卡片
if not result["is_card"] and ('<appmsg' in content or '<msg>' in content):
result["is_card"] = True
return result
def get_card_type_name(self, type_num: str) -> str:
"""根据卡片类型编号获取类型名称
Args:
type_num: 类型编号
Returns:
str: 类型名称
"""
card_types = {
"1": "文本卡片",
"2": "图片",
"3": "音频",
"4": "视频",
"5": "链接",
"6": "文件",
"7": "位置",
"8": "表情动画",
"17": "实时位置",
"19": "频道消息",
"33": "小程序",
"36": "转账",
"50": "视频号",
"51": "直播间",
"57": "引用消息",
"62": "视频号直播",
"63": "视频号商品",
"87": "群收款",
"88": "语音通话"
}
return card_types.get(type_num, f"未知类型({type_num})")
def format_message_for_ai(self, msg_data: dict, sender_name: str) -> str:
"""将提取的消息数据格式化为发送给AI的最终文本
Args:
msg_data: 提取的消息数据
sender_name: 发送者名称
Returns:
str: 格式化后的文本
"""
result = []
current_time = time.strftime("%H:%M", time.localtime())
# 添加用户新消息
if msg_data["new_content"]:
result.append(f"[{current_time}] {sender_name}: {msg_data['new_content']}")
# 处理当前消息的卡片信息(如果不是引用消息而是直接分享的卡片)
if msg_data["is_card"] and not msg_data["has_quote"]:
card_info = []
card_info.append(f"[卡片信息]")
if msg_data["card_type"]:
card_info.append(f"类型: {msg_data['card_type']}")
if msg_data["card_title"]:
card_info.append(f"标题: {msg_data['card_title']}")
if msg_data["card_description"]:
# 如果描述过长,截取一部分
description = msg_data["card_description"]
if len(description) > 100:
description = description[:97] + "..."
card_info.append(f"描述: {description}")
if msg_data["card_appname"] or msg_data["card_sourcedisplayname"]:
source = msg_data["card_appname"] or msg_data["card_sourcedisplayname"]
card_info.append(f"来源: {source}")
if msg_data["card_url"]:
# 如果URL过长截取一部分
url = msg_data["card_url"]
if len(url) > 80:
url = url[:77] + "..."
card_info.append(f"链接: {url}")
# 只有当有实质性内容时才添加卡片信息
if len(card_info) > 1: # 不只有[卡片信息]这一行
result.append("\n".join(card_info))
# 添加引用内容(如果有)
if msg_data["has_quote"]:
quoted_header = f"[用户引用]"
if msg_data["quoted_sender"]:
quoted_header += f" {msg_data['quoted_sender']}"
# 检查被引用内容是否为卡片
if msg_data["quoted_is_card"]:
# 格式化被引用的卡片信息
quoted_info = [quoted_header]
if msg_data["quoted_card_type"]:
quoted_info.append(f"类型: {msg_data['quoted_card_type']}")
if msg_data["quoted_card_title"]:
quoted_info.append(f"标题: {msg_data['quoted_card_title']}")
if msg_data["quoted_card_description"]:
# 如果描述过长,截取一部分
description = msg_data["quoted_card_description"]
if len(description) > 100:
description = description[:97] + "..."
quoted_info.append(f"描述: {description}")
if msg_data["quoted_card_appname"] or msg_data["quoted_card_sourcedisplayname"]:
source = msg_data["quoted_card_appname"] or msg_data["quoted_card_sourcedisplayname"]
quoted_info.append(f"来源: {source}")
if msg_data["quoted_card_url"]:
# 如果URL过长截取一部分
url = msg_data["quoted_card_url"]
if len(url) > 80:
url = url[:77] + "..."
quoted_info.append(f"链接: {url}")
result.append("\n".join(quoted_info))
elif msg_data["quoted_content"]:
# 如果是普通文本引用
result.append(f"{quoted_header}: {msg_data['quoted_content']}")
# 如果没有任何内容,但有媒体类型,添加基本信息
if not result and msg_data["media_type"] and msg_data["media_type"] != "文本":
result.append(f"[{current_time}] {sender_name} 发送了 [{msg_data['media_type']}]")
# 如果完全没有内容,返回一个默认消息
if not result:
result.append(f"[{current_time}] {sender_name} 发送了消息")
return "\n\n".join(result)