diff --git a/.gitignore b/.gitignore index 6520724..06b35da 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,7 @@ device.json go-cqhttp logs/ session.token +*venv +common/test_sensitive_word.py +.vscode/launch.json +sensitive_words.txt diff --git a/channel/wechat/wechat_channel.py b/channel/wechat/wechat_channel.py index ea3f3ff..672d5d7 100644 --- a/channel/wechat/wechat_channel.py +++ b/channel/wechat/wechat_channel.py @@ -3,6 +3,7 @@ """ wechat channel """ + import itchat import json from itchat.content import * @@ -10,13 +11,24 @@ from channel.channel import Channel from concurrent.futures import ThreadPoolExecutor from common.log import logger from common import const -from config import channel_conf_val, channel_conf +from config import channel_conf_val import requests +from urllib.parse import urlencode + +from common.sensitive_word import SensitiveWord + import io + thread_pool = ThreadPoolExecutor(max_workers=8) + +sw = SensitiveWord() + +# ... + + @itchat.msg_register(TEXT) def handler_single_msg(msg): WechatChannel().handle(msg) @@ -29,6 +41,9 @@ def handler_group_msg(msg): return None + + + class WechatChannel(Channel): def __init__(self): pass @@ -40,12 +55,21 @@ class WechatChannel(Channel): # start message listener itchat.run() + + + def handle(self, msg): logger.debug("[WX]receive msg: " + json.dumps(msg, ensure_ascii=False)) from_user_id = msg['FromUserName'] to_user_id = msg['ToUserName'] # 接收人id other_user_id = msg['User']['UserName'] # 对手方id content = msg['Text'] + + # 调用敏感词检测函数 + if sw.process_text(content): + self.send('请检查您的输入是否有违规内容', from_user_id) + return + match_prefix = self.check_prefix(content, channel_conf_val(const.WECHAT, 'single_chat_prefix')) if from_user_id == other_user_id and match_prefix is not None: # 好友向自己发送消息 @@ -79,7 +103,7 @@ class WechatChannel(Channel): group_name = msg['User'].get('NickName', None) group_id = msg['User'].get('UserName', None) if not group_name: - return "" + return None origin_content = msg['Content'] content = msg['Content'] content_list = content.split(' ', 1) @@ -89,16 +113,27 @@ class WechatChannel(Channel): elif len(content_list) == 2: content = content_list[1] - match_prefix = (msg['IsAt'] and not channel_conf_val(const.WECHAT, "group_at_off", False)) or self.check_prefix(origin_content, channel_conf_val(const.WECHAT, 'group_chat_prefix')) \ - or self.check_contain(origin_content, channel_conf_val(const.WECHAT, 'group_chat_keyword')) + + + match_prefix = (msg['IsAt'] and not channel_conf_val(const.WECHAT, "group_at_off", False)) or self.check_prefix(origin_content, channel_conf_val(const.WECHAT, 'group_chat_prefix')) or self.check_contain(origin_content, channel_conf_val(const.WECHAT, 'group_chat_keyword')) + + # 如果在群里被at了 或 触发机器人关键字,则调用敏感词检测函数 + if match_prefix is True: + if sw.process_text(content): + self.send('请检查您的输入是否有违规内容', group_id) + return + group_white_list = channel_conf_val(const.WECHAT, 'group_name_white_list') + if ('ALL_GROUP' in group_white_list or group_name in group_white_list or self.check_contain(group_name, channel_conf_val(const.WECHAT, 'group_name_keyword_white_list'))) and match_prefix: + img_match_prefix = self.check_prefix(content, channel_conf_val(const.WECHAT, 'image_create_prefix')) if img_match_prefix: content = content.split(img_match_prefix, 1)[1].strip() thread_pool.submit(self._do_send_img, content, group_id) else: thread_pool.submit(self._do_send_group, content, msg) + return None def send(self, msg, receiver): logger.info('[WX] sendMsg={}, receiver={}'.format(msg, receiver)) @@ -164,3 +199,25 @@ class WechatChannel(Channel): if content.find(ky) != -1: return True return None + + +''' +这是一个基于itchat库的微信机器人实现,支持单聊和群聊消息的自动回复和图片发送等功能。代码中使用了线程池技术和异步回调函数等方式来提高程序的性能和并发处理能力。 + +其中,WechatChannel 类实现了 Channel 接口,并定义了一些额外的方法,如发送消息、检测敏感词汇、处理单聊和群聊消息等。 + +send() 函数用于向指定用户发送文本消息; +_do_send() 函数用于处理接收到的文本消息并回复相应的内容; +_do_send_img() 函数用于处理接收到的图片消息并发送相应的图片内容; +_do_send_group() 函数用于处理接收到的群组消息并回复相应的内容; + +check_prefix() 函数用于检查消息是否以指定前缀开头; +check_contain() 函数用于检查消息是否包含指定关键字。 + +handler_single_msg() 函数和 handler_group_msg() 函数分别用于处理接收到的单聊和群聊消息,并回复相应的内容。 + +在handle() 函数中,先根据消息类型和内容进行分类和处理,然后利用线程池并发处理多个消息,提高程序的处理效率。 + +整体上来说,这段代码实现了一个简单的微信机器人,并且具有较好的可扩展性,可以通过增加不同的处理函数或者修改匹配规则等方式来实现更为丰富的功能。 +''' + diff --git a/common/sensitive_word.py b/common/sensitive_word.py new file mode 100644 index 0000000..ce4012a --- /dev/null +++ b/common/sensitive_word.py @@ -0,0 +1,84 @@ +import requests +import json +import os +import config + +class SensitiveWord: + def __init__(self): + # 读取配置文件 + try: + self.config = config.load_config() # 加载配置文件 + #print(self.config) # 输出配置文件内容以进行调试 + except Exception as e: + print(e) # 打印错误信息 + + + print(self.config) + + # 设置请求 URL + self.url = "https://aip.baidubce.com/rest/2.0/antispam/v2/spam" + + # 获取 access token + self.access_token = self.get_access_token() + + def get_access_token(self): + """ + 获取百度云接口的 access token + + :return: str access token + + """ + + #检测敏感词配置是否存在 + if self.config is not None and "common" in self.config and "type" in self.config["common"] and self.config["common"]["type"]: + + url = "https://aip.baidubce.com/oauth/2.0/token" + params = { + "grant_type": "client_credentials", + "client_id": self.config["common"]["client_id"], + "client_secret": self.config["common"]["client_secret"] + } + response = requests.post(url, params=params) + response_json = response.json() + + access_token = response_json.get("access_token") + + if not access_token: + raise ValueError(f"获取 access_token 失败: {response_json.get('error_description')}") + + print(f"Access token: {access_token}") # 输出访问令牌以进行调试 + return access_token + else: + print("百度云接口配置不存在") + print(self.config) + + + def process_text(self, text): + + #检测敏感词配置是否存在 + if self.config is not None and "common" in self.config and "type" in self.config["common"] and self.config["common"]["type"]: + #存在则执行正常检测流程 + url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined" # API 请求地址 + access_token = self.get_access_token() + headers = {"content-type": "application/x-www-form-urlencoded"} + params = { + "text": text.encode("utf-8"), + "access_token": access_token + } + response = requests.post(url, data=params, headers=headers) + + if response.status_code != 200: + raise ValueError(f"无法连接到接口,请检查你的网络: {response.json().get('error_msg')}") + + conclusion_type = response.json().get("conclusionType") + + + print(response.json()) # 输出完整的 API 响应结果 + + if conclusion_type in [1, None]: + return False + else: + return True + #不存在则直接返回无敏感词 + else: + return False diff --git a/config.py b/config.py index 755421a..9509d95 100644 --- a/config.py +++ b/config.py @@ -15,6 +15,9 @@ def load_config(): config_str = read_file(config_path) # 将json字符串反序列化为dict类型 config = json.loads(config_str) + print("载入环节" ) + print(config) + return config def get_root(): return os.path.dirname(os.path.abspath( __file__ ))