feat: add support for WeChat integration via the wchatferry

This commit is contained in:
josephier
2025-02-14 00:25:09 +08:00
parent 436c038a2f
commit 323aebd1be
5 changed files with 292 additions and 0 deletions

1
.gitignore vendored
View File

@@ -14,6 +14,7 @@ tmp
plugins.json
itchat.pkl
*.log
logs/
user_datas.pkl
chatgpt_tool_hub/
plugins/**/

View File

@@ -18,6 +18,9 @@ def create_channel(channel_type) -> Channel:
elif channel_type == "wxy":
from channel.wechat.wechaty_channel import WechatyChannel
ch = WechatyChannel()
elif channel_type == "wcf":
from channel.wechat.wcf_channel import WechatfChannel
ch = WechatfChannel()
elif channel_type == "terminal":
from channel.terminal.terminal_channel import TerminalChannel
ch = TerminalChannel()

View File

@@ -0,0 +1,227 @@
# encoding:utf-8
"""
wechat channel
"""
import io
import json
import os
import threading
import time
from queue import Empty
from typing import Any
from bridge.context import *
from bridge.reply import *
from channel.chat_channel import ChatChannel
from channel.wechat.wcf_message import WechatfMessage
from common.log import logger
from common.singleton import singleton
from common.utils import *
from config import conf, get_appdata_dir
from wcferry import Wcf, WxMsg
@singleton
class WechatfChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = []
def __init__(self):
super().__init__()
self.NOT_SUPPORT_REPLYTYPE = []
# 使用字典存储最近消息,用于去重
self.received_msgs = {}
# 初始化wcferry客户端
self.wcf = Wcf()
self.wxid = None # 登录后会被设置为当前登录用户的wxid
def startup(self):
"""
启动通道
"""
try:
# wcferry会自动唤起微信并登录
self.wxid = self.wcf.get_self_wxid()
self.name = self.wcf.get_user_info().get("name")
logger.info(f"微信登录成功当前用户ID: {self.wxid}, 用户名:{self.name}")
self.contact_cache = ContactCache(self.wcf)
self.contact_cache.update()
# 启动消息接收
self.wcf.enable_receiving_msg()
# 创建消息处理线程
t = threading.Thread(target=self._process_messages, name="WeChatThread", daemon=True)
t.start()
except Exception as e:
logger.error(f"微信通道启动失败: {e}")
raise e
def _process_messages(self):
"""
处理消息队列
"""
while True:
try:
msg = self.wcf.get_msg()
if msg:
self._handle_message(msg)
except Empty:
continue
except Exception as e:
logger.error(f"处理消息失败: {e}")
continue
def _handle_message(self, msg: WxMsg):
"""
处理单条消息
"""
try:
# 构造消息对象
cmsg = WechatfMessage(self, msg)
# 消息去重
if cmsg.msg_id in self.received_msgs:
return
self.received_msgs[cmsg.msg_id] = time.time()
# 清理过期消息ID
self._clean_expired_msgs()
logger.debug(f"收到消息: {msg}")
context = self._compose_context(cmsg.ctype, cmsg.content,
isgroup=cmsg.is_group,
msg=cmsg)
if context:
self.produce(context)
except Exception as e:
logger.error(f"处理消息失败: {e}")
def _clean_expired_msgs(self, expire_time: float = 60):
"""
清理过期的消息ID
"""
now = time.time()
for msg_id in list(self.received_msgs.keys()):
if now - self.received_msgs[msg_id] > expire_time:
del self.received_msgs[msg_id]
def send(self, reply: Reply, context: Context):
"""
发送消息
"""
receiver = context["receiver"]
if not receiver:
logger.error("receiver is empty")
return
try:
if reply.type == ReplyType.TEXT:
# 处理@信息
at_list = []
if context.get("isgroup"):
if context["msg"].actual_user_id:
at_list = [context["msg"].actual_user_id]
at_str = ",".join(at_list) if at_list else ""
self.wcf.send_text(reply.content, receiver, at_str)
elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
self.wcf.send_text(reply.content, receiver)
elif reply.type == ReplyType.IMAGE_URL:
# 下载图片并发送
img_path = self._download_image(reply.content)
if img_path:
self.wcf.send_image(img_path, receiver)
os.remove(img_path) # 清理临时文件
elif reply.type == ReplyType.IMAGE:
# 直接发送本地图片
if isinstance(reply.content, str):
self.wcf.send_image(reply.content, receiver)
else:
# 处理二进制图片数据
tmp_path = os.path.join(get_appdata_dir(), f"tmp_{int(time.time())}.png")
with open(tmp_path, "wb") as f:
f.write(reply.content.getvalue())
self.wcf.send_image(tmp_path, receiver)
os.remove(tmp_path)
elif reply.type == ReplyType.FILE:
if isinstance(reply.content, str):
self.wcf.send_file(reply.content, receiver)
else:
# 处理二进制文件数据
tmp_path = os.path.join(get_appdata_dir(), f"tmp_{int(time.time())}")
with open(tmp_path, "wb") as f:
f.write(reply.content.getvalue())
self.wcf.send_file(tmp_path, receiver)
os.remove(tmp_path)
else:
logger.error(f"暂不支持的消息类型: {reply.type}")
except Exception as e:
logger.error(f"发送消息失败: {e}")
def _download_image(self, url: str) -> str:
"""
下载图片到临时文件
"""
try:
import requests
tmp_path = os.path.join(get_appdata_dir(), f"tmp_{int(time.time())}.png")
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(tmp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
f.write(chunk)
return tmp_path
except Exception as e:
logger.error(f"下载图片失败: {e}")
return None
def close(self):
"""
关闭通道
"""
try:
self.wcf.cleanup()
except Exception as e:
logger.error(f"关闭通道失败: {e}")
class ContactCache:
def __init__(self, wcf):
"""
wcf: 一个 wcfferry.client.Wcf 实例
"""
self.wcf = wcf
self._contact_map = {} # 形如 {wxid: {完整联系人信息}}
def update(self):
"""
更新缓存:调用 get_contacts()
再把 wcf.contacts 构建成 {wxid: {完整信息}} 的字典
"""
self.wcf.get_contacts()
self._contact_map.clear()
for item in self.wcf.contacts:
wxid = item.get('wxid')
if wxid: # 确保有 wxid 字段
self._contact_map[wxid] = item
def get_contact(self, wxid: str) -> dict:
"""
返回该 wxid 对应的完整联系人 dict
如果没找到就返回 None
"""
return self._contact_map.get(wxid)
def get_name_by_wxid(self, wxid: str) -> str:
"""
通过wxid获取成员/群名称
"""
contact = self.get_contact(wxid)
if contact:
return contact.get('name', '')
return ''

View File

@@ -0,0 +1,58 @@
# encoding:utf-8
"""
wechat channel message
"""
from bridge.context import ContextType
from channel.chat_message import ChatMessage
from common.log import logger
from wcferry import WxMsg
class WechatfMessage(ChatMessage):
"""
微信消息封装类
"""
def __init__(self, channel, wcf_msg: WxMsg, is_group=False):
"""
初始化消息对象
:param wcf_msg: wcferry消息对象
:param is_group: 是否是群消息
"""
super().__init__(wcf_msg)
self.msg_id = wcf_msg.id
self.create_time = wcf_msg.ts # 使用消息时间戳
self.is_group = is_group or wcf_msg._is_group
self.wxid = channel.wxid
self.name = channel.name
# 解析消息类型
if wcf_msg.is_text():
self.ctype = ContextType.TEXT
self.content = wcf_msg.content
else:
raise NotImplementedError(f"Unsupported message type: {wcf_msg.type}")
# 设置发送者和接收者信息
self.from_user_id = self.wxid if wcf_msg.sender == self.wxid else wcf_msg.sender
self.from_user_nickname = self.name if wcf_msg.sender == self.wxid else channel.contact_cache.get_name_by_wxid(wcf_msg.sender)
self.to_user_id = self.wxid
self.to_user_nickname = self.name
self.other_user_id = wcf_msg.sender
self.other_user_nickname = channel.contact_cache.get_name_by_wxid(wcf_msg.sender)
# 群消息特殊处理
if self.is_group:
self.other_user_id = wcf_msg.roomid
self.other_user_nickname = channel.contact_cache.get_name_by_wxid(wcf_msg.roomid)
self.actual_user_id = wcf_msg.sender
self.actual_user_nickname = channel.wcf.get_alias_in_chatroom(wcf_msg.sender, wcf_msg.roomid)
if not self.actual_user_nickname: # 群聊获取不到企微号成员昵称,这里尝试从联系人缓存去获取
self.actual_user_nickname = channel.contact_cache.get_name_by_wxid(wcf_msg.sender)
self.room_id = wcf_msg.roomid
self.is_at = wcf_msg.is_at(self.wxid) # 是否被@当前登录用户
# 判断是否是自己发送的消息
self.my_msg = wcf_msg.from_self()

View File

@@ -44,3 +44,6 @@ zhipuai>=2.0.1
# tongyi qwen new sdk
dashscope
# wechatferry
wcferry==39.3.3.2