diff --git a/channel/wechat/wechat_channel.py b/channel/wechat/wechat_channel.py index a571805..672d5d7 100644 --- a/channel/wechat/wechat_channel.py +++ b/channel/wechat/wechat_channel.py @@ -11,9 +11,9 @@ from channel.channel import Channel from concurrent.futures import ThreadPoolExecutor from common.log import logger from common import const -from config import channel_conf_val, channel_conf +from config import channel_conf_val import requests -from urllib.parse import urlencode, quote +from urllib.parse import urlencode from common.sensitive_word import SensitiveWord @@ -67,7 +67,7 @@ class WechatChannel(Channel): # 调用敏感词检测函数 if sw.process_text(content): - self.send('请注意文明用语', from_user_id) + self.send('请检查您的输入是否有违规内容', from_user_id) return match_prefix = self.check_prefix(content, channel_conf_val(const.WECHAT, 'single_chat_prefix')) @@ -113,13 +113,16 @@ class WechatChannel(Channel): elif len(content_list) == 2: content = content_list[1] - # 调用敏感词检测函数 - if sw.process_text(content): - self.send('请注意文明用语', group_id) - return + match_prefix = (msg['IsAt'] and not channel_conf_val(const.WECHAT, "group_at_off", False)) or self.check_prefix(origin_content, channel_conf_val(const.WECHAT, 'group_chat_prefix')) or self.check_contain(origin_content, channel_conf_val(const.WECHAT, 'group_chat_keyword')) + # 如果在群里被at了 或 触发机器人关键字,则调用敏感词检测函数 + if match_prefix is True: + if sw.process_text(content): + self.send('请检查您的输入是否有违规内容', group_id) + return + group_white_list = channel_conf_val(const.WECHAT, 'group_name_white_list') if ('ALL_GROUP' in group_white_list or group_name in group_white_list or self.check_contain(group_name, channel_conf_val(const.WECHAT, 'group_name_keyword_white_list'))) and match_prefix: @@ -218,4 +221,3 @@ handler_single_msg() 函数和 handler_group_msg() 函数分别用于处理接 整体上来说,这段代码实现了一个简单的微信机器人,并且具有较好的可扩展性,可以通过增加不同的处理函数或者修改匹配规则等方式来实现更为丰富的功能。 ''' - diff --git a/common/sensitive_word.py b/common/sensitive_word.py index 24f0e83..4369eb3 100644 --- a/common/sensitive_word.py +++ b/common/sensitive_word.py @@ -27,45 +27,58 @@ class SensitiveWord: 获取百度云接口的 access token :return: str access token - """ - url = "https://aip.baidubce.com/oauth/2.0/token" - params = { - "grant_type": "client_credentials", - "client_id": self.config["common"]["client_id"], - "client_secret": self.config["common"]["client_secret"] - } - response = requests.post(url, params=params) - response_json = response.json() - - access_token = response_json.get("access_token") - - if not access_token: - raise ValueError(f"获取 access_token 失败: {response_json.get('error_description')}") - #print(f"Access token: {access_token}") # 输出访问令牌以进行调试 + """ + + #检测敏感词配置是否存在 + if self.config is not None and self.config.get("common") is not None and self.config["common"].get("type") is not None: - return access_token + url = "https://aip.baidubce.com/oauth/2.0/token" + params = { + "grant_type": "client_credentials", + "client_id": self.config["common"]["client_id"], + "client_secret": self.config["common"]["client_secret"] + } + response = requests.post(url, params=params) + response_json = response.json() + + access_token = response_json.get("access_token") + + if not access_token: + raise ValueError(f"获取 access_token 失败: {response_json.get('error_description')}") + + #print(f"Access token: {access_token}") # 输出访问令牌以进行调试 + return access_token + else: + print("百度云接口配置不存在") def process_text(self, text): - url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined" # 填写 API 请求地址 - access_token = self.get_access_token() - headers = {"content-type": "application/x-www-form-urlencoded"} - params = { - "text": text.encode("utf-8"), - "access_token": access_token - } - response = requests.post(url, data=params, headers=headers) - if response.status_code != 200: - raise ValueError(f"Failed to check sensitive words: {response.json().get('error_msg')}") + #检测敏感词配置是否存在 + if self.config is not None and self.config.get("common") is not None and self.config["common"].get("type") is not None: + #存在则执行正常检测流程 + url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined" # API 请求地址 + access_token = self.get_access_token() + headers = {"content-type": "application/x-www-form-urlencoded"} + params = { + "text": text.encode("utf-8"), + "access_token": access_token + } + response = requests.post(url, data=params, headers=headers) - conclusion_type = response.json().get("conclusionType") + if response.status_code != 200: + raise ValueError(f"无法连接到接口,请检查你的网络: {response.json().get('error_msg')}") + + conclusion_type = response.json().get("conclusionType") - print(response.json()) # 输出完整的 API 响应结果 + print(response.json()) # 输出完整的 API 响应结果 - if conclusion_type in [1, None]: - return False + if conclusion_type in [1, None]: + return False + else: + return True + #不存在则直接返回无敏感词 else: - return True + return False