Merge pull request #115 from DBinK/sensitive_word

使用百度AI内容审核平台的文本审核接口 实现用户输入敏感词检测
This commit is contained in:
zhayujie
2023-03-15 00:09:12 +08:00
committed by GitHub
4 changed files with 152 additions and 4 deletions

4
.gitignore vendored
View File

@@ -10,3 +10,7 @@ device.json
go-cqhttp
logs/
session.token
*venv
common/test_sensitive_word.py
.vscode/launch.json
sensitive_words.txt

View File

@@ -3,6 +3,7 @@
"""
wechat channel
"""
import itchat
import json
from itchat.content import *
@@ -10,13 +11,24 @@ from channel.channel import Channel
from concurrent.futures import ThreadPoolExecutor
from common.log import logger
from common import const
from config import channel_conf_val, channel_conf
from config import channel_conf_val
import requests
from urllib.parse import urlencode
from common.sensitive_word import SensitiveWord
import io
thread_pool = ThreadPoolExecutor(max_workers=8)
sw = SensitiveWord()
# ...
@itchat.msg_register(TEXT)
def handler_single_msg(msg):
WechatChannel().handle(msg)
@@ -29,6 +41,9 @@ def handler_group_msg(msg):
return None
class WechatChannel(Channel):
def __init__(self):
pass
@@ -40,12 +55,21 @@ class WechatChannel(Channel):
# start message listener
itchat.run()
def handle(self, msg):
logger.debug("[WX]receive msg: " + json.dumps(msg, ensure_ascii=False))
from_user_id = msg['FromUserName']
to_user_id = msg['ToUserName'] # 接收人id
other_user_id = msg['User']['UserName'] # 对手方id
content = msg['Text']
# 调用敏感词检测函数
if sw.process_text(content):
self.send('请检查您的输入是否有违规内容', from_user_id)
return
match_prefix = self.check_prefix(content, channel_conf_val(const.WECHAT, 'single_chat_prefix'))
if from_user_id == other_user_id and match_prefix is not None:
# 好友向自己发送消息
@@ -79,7 +103,7 @@ class WechatChannel(Channel):
group_name = msg['User'].get('NickName', None)
group_id = msg['User'].get('UserName', None)
if not group_name:
return ""
return None
origin_content = msg['Content']
content = msg['Content']
content_list = content.split(' ', 1)
@@ -89,16 +113,27 @@ class WechatChannel(Channel):
elif len(content_list) == 2:
content = content_list[1]
match_prefix = (msg['IsAt'] and not channel_conf_val(const.WECHAT, "group_at_off", False)) or self.check_prefix(origin_content, channel_conf_val(const.WECHAT, 'group_chat_prefix')) \
or self.check_contain(origin_content, channel_conf_val(const.WECHAT, 'group_chat_keyword'))
match_prefix = (msg['IsAt'] and not channel_conf_val(const.WECHAT, "group_at_off", False)) or self.check_prefix(origin_content, channel_conf_val(const.WECHAT, 'group_chat_prefix')) or self.check_contain(origin_content, channel_conf_val(const.WECHAT, 'group_chat_keyword'))
# 如果在群里被at了 或 触发机器人关键字,则调用敏感词检测函数
if match_prefix is True:
if sw.process_text(content):
self.send('请检查您的输入是否有违规内容', group_id)
return
group_white_list = channel_conf_val(const.WECHAT, 'group_name_white_list')
if ('ALL_GROUP' in group_white_list or group_name in group_white_list or self.check_contain(group_name, channel_conf_val(const.WECHAT, 'group_name_keyword_white_list'))) and match_prefix:
img_match_prefix = self.check_prefix(content, channel_conf_val(const.WECHAT, 'image_create_prefix'))
if img_match_prefix:
content = content.split(img_match_prefix, 1)[1].strip()
thread_pool.submit(self._do_send_img, content, group_id)
else:
thread_pool.submit(self._do_send_group, content, msg)
return None
def send(self, msg, receiver):
logger.info('[WX] sendMsg={}, receiver={}'.format(msg, receiver))
@@ -164,3 +199,25 @@ class WechatChannel(Channel):
if content.find(ky) != -1:
return True
return None
'''
这是一个基于itchat库的微信机器人实现支持单聊和群聊消息的自动回复和图片发送等功能。代码中使用了线程池技术和异步回调函数等方式来提高程序的性能和并发处理能力。
其中WechatChannel 类实现了 Channel 接口,并定义了一些额外的方法,如发送消息、检测敏感词汇、处理单聊和群聊消息等。
send() 函数用于向指定用户发送文本消息;
_do_send() 函数用于处理接收到的文本消息并回复相应的内容;
_do_send_img() 函数用于处理接收到的图片消息并发送相应的图片内容;
_do_send_group() 函数用于处理接收到的群组消息并回复相应的内容;
check_prefix() 函数用于检查消息是否以指定前缀开头;
check_contain() 函数用于检查消息是否包含指定关键字。
handler_single_msg() 函数和 handler_group_msg() 函数分别用于处理接收到的单聊和群聊消息,并回复相应的内容。
在handle() 函数中,先根据消息类型和内容进行分类和处理,然后利用线程池并发处理多个消息,提高程序的处理效率。
整体上来说,这段代码实现了一个简单的微信机器人,并且具有较好的可扩展性,可以通过增加不同的处理函数或者修改匹配规则等方式来实现更为丰富的功能。
'''

84
common/sensitive_word.py Normal file
View File

@@ -0,0 +1,84 @@
import requests
import json
import os
import config
class SensitiveWord:
def __init__(self):
# 读取配置文件
try:
self.config = config.load_config() # 加载配置文件
#print(self.config) # 输出配置文件内容以进行调试
except Exception as e:
print(e) # 打印错误信息
print(self.config)
# 设置请求 URL
self.url = "https://aip.baidubce.com/rest/2.0/antispam/v2/spam"
# 获取 access token
self.access_token = self.get_access_token()
def get_access_token(self):
"""
获取百度云接口的 access token
:return: str access token
"""
#检测敏感词配置是否存在
if self.config is not None and "common" in self.config and "type" in self.config["common"] and self.config["common"]["type"]:
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {
"grant_type": "client_credentials",
"client_id": self.config["common"]["client_id"],
"client_secret": self.config["common"]["client_secret"]
}
response = requests.post(url, params=params)
response_json = response.json()
access_token = response_json.get("access_token")
if not access_token:
raise ValueError(f"获取 access_token 失败: {response_json.get('error_description')}")
print(f"Access token: {access_token}") # 输出访问令牌以进行调试
return access_token
else:
print("百度云接口配置不存在")
print(self.config)
def process_text(self, text):
#检测敏感词配置是否存在
if self.config is not None and "common" in self.config and "type" in self.config["common"] and self.config["common"]["type"]:
#存在则执行正常检测流程
url = "https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined" # API 请求地址
access_token = self.get_access_token()
headers = {"content-type": "application/x-www-form-urlencoded"}
params = {
"text": text.encode("utf-8"),
"access_token": access_token
}
response = requests.post(url, data=params, headers=headers)
if response.status_code != 200:
raise ValueError(f"无法连接到接口,请检查你的网络: {response.json().get('error_msg')}")
conclusion_type = response.json().get("conclusionType")
print(response.json()) # 输出完整的 API 响应结果
if conclusion_type in [1, None]:
return False
else:
return True
#不存在则直接返回无敏感词
else:
return False

View File

@@ -15,6 +15,9 @@ def load_config():
config_str = read_file(config_path)
# 将json字符串反序列化为dict类型
config = json.loads(config_str)
print("载入环节" )
print(config)
return config
def get_root():
return os.path.dirname(os.path.abspath( __file__ ))