feat: add feishu websocket mode

This commit is contained in:
zhayujie
2026-01-31 12:32:41 +08:00
parent e3350d5bec
commit f044fb8b47
6 changed files with 356 additions and 84 deletions

167
channel/feishu/README.md Normal file
View File

@@ -0,0 +1,167 @@
# 飞书Channel使用说明
飞书Channel支持两种事件接收模式可以根据部署环境灵活选择。
## 模式对比
| 模式 | 适用场景 | 优点 | 缺点 |
|------|---------|------|------|
| **webhook** | 生产环境 | 稳定可靠,官方推荐 | 需要公网IP或域名 |
| **websocket** | 本地开发 | 无需公网IP开发便捷 | 需要额外依赖 |
## 配置说明
### 基础配置
`config.json` 中添加以下配置:
```json
{
"channel_type": "feishu",
"feishu_app_id": "cli_xxxxx",
"feishu_app_secret": "your_app_secret",
"feishu_token": "your_verification_token",
"feishu_bot_name": "你的机器人名称",
"feishu_event_mode": "webhook",
"feishu_port": 9891
}
```
### 配置项说明
- `feishu_app_id`: 飞书应用的App ID
- `feishu_app_secret`: 飞书应用的App Secret
- `feishu_token`: 事件订阅的Verification Token
- `feishu_bot_name`: 机器人名称(用于群聊@判断)
- `feishu_event_mode`: 事件接收模式,可选值:
- `"websocket"`: 长连接模式(默认)
- `"webhook"`: HTTP服务器模式
- `feishu_port`: webhook模式下的HTTP服务端口(默认9891)
## 模式一: Webhook模式(推荐生产环境)
### 1. 配置
```json
{
"feishu_event_mode": "webhook",
"feishu_port": 9891
}
```
### 2. 启动服务
```bash
python3 app.py
```
服务将在 `http://0.0.0.0:9891` 启动。
### 3. 配置飞书应用
1. 登录[飞书开放平台](https://open.feishu.cn/)
2. 进入应用详情 -> 事件订阅
3. 选择 **将事件发送至开发者服务器**
4. 填写请求地址: `http://your-domain:9891/`
5. 添加事件: `im.message.receive_v1` (接收消息v2.0)
6. 保存配置
### 4. 注意事项
- 需要有公网IP或域名
- 确保防火墙开放对应端口
- 建议使用HTTPS(需要配置反向代理)
## 模式二: WebSocket模式(推荐本地开发)
### 1. 安装依赖
```bash
pip install lark-oapi
```
### 2. 配置
```json
{
"feishu_event_mode": "websocket"
}
```
### 3. 启动服务
```bash
python3 app.py
```
程序将自动建立与飞书开放平台的长连接。
### 4. 配置飞书应用
1. 登录[飞书开放平台](https://open.feishu.cn/)
2. 进入应用详情 -> 事件订阅
3. 选择 **使用长连接接收事件**
4. 添加事件: `im.message.receive_v1` (接收消息v2.0)
5. 保存配置
### 5. 注意事项
- 无需公网IP
- 需要能访问公网(建立WebSocket连接)
- 每个应用最多50个连接
- 集群模式下消息随机分发到一个客户端
## 平滑迁移
从webhook模式切换到websocket模式(或反向切换):
1. 修改 `config.json` 中的 `feishu_event_mode`
2. 如果切换到websocket模式安装 `lark-oapi` 依赖
3. 重启服务
4. 在飞书开放平台修改事件订阅方式
**重要**: 同一时间只能使用一种模式,否则会导致消息重复接收。
## 消息去重机制
两种模式都使用相同的消息去重机制:
- 使用 `ExpiredDict` 存储已处理的消息ID
- 过期时间: 7.1小时
- 确保消息不会重复处理
## 故障排查
### WebSocket模式连接失败
```
[FeiShu] lark_oapi not installed
```
**解决**: 安装依赖 `pip install lark-oapi`
### Webhook模式端口被占用
```
Address already in use
```
**解决**: 修改 `feishu_port` 配置或关闭占用端口的进程
### 收不到消息
1. 检查飞书应用的事件订阅配置
2. 确认已添加 `im.message.receive_v1` 事件
3. 检查应用权限: 需要 `im:message` 权限
4. 查看日志中的错误信息
## 开发建议
- **本地开发**: 使用websocket模式快速迭代
- **测试环境**: 可以使用webhook模式 + 内网穿透工具(如ngrok)
- **生产环境**: 使用webhook模式配置正式域名和HTTPS
## 参考文档
- [飞书开放平台 - 事件订阅](https://open.feishu.cn/document/ukTMukTMukTM/uUTNz4SN1MjL1UzM)
- [飞书SDK - Python](https://github.com/larksuite/oapi-sdk-python)

View File

@@ -1,48 +1,80 @@
"""
飞书通道接入
支持两种事件接收模式:
1. webhook模式: 通过HTTP服务器接收事件(需要公网IP)
2. websocket模式: 通过长连接接收事件(本地开发友好)
通过配置项 feishu_event_mode 选择模式: "webhook""websocket"
@author Saboteur7
@Date 2023/11/19
"""
import json
import os
import threading
# -*- coding=utf-8 -*-
import uuid
import requests
import web
from channel.feishu.feishu_message import FeishuMessage
from bridge.context import Context
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel, check_prefix
from channel.feishu.feishu_message import FeishuMessage
from common import utils
from common.expired_dict import ExpiredDict
from common.log import logger
from common.singleton import singleton
from config import conf
from common.expired_dict import ExpiredDict
from bridge.context import ContextType
from channel.chat_channel import ChatChannel, check_prefix
from common import utils
import json
import os
URL_VERIFICATION = "url_verification"
# 尝试导入飞书SDK,如果未安装则websocket模式不可用
try:
import lark_oapi as lark
LARK_SDK_AVAILABLE = True
except ImportError:
LARK_SDK_AVAILABLE = False
logger.warning(
"[FeiShu] lark_oapi not installed, websocket mode is not available. Install with: pip install lark-oapi")
@singleton
class FeiShuChanel(ChatChannel):
feishu_app_id = conf().get('feishu_app_id')
feishu_app_secret = conf().get('feishu_app_secret')
feishu_token = conf().get('feishu_token')
feishu_event_mode = conf().get('feishu_event_mode', 'websocket') # webhook 或 websocket
def __init__(self):
super().__init__()
# 历史消息id暂存用于幂等控制
self.receivedMsgs = ExpiredDict(60 * 60 * 7.1)
logger.info("[FeiShu] app_id={}, app_secret={} verification_token={}".format(
self.feishu_app_id, self.feishu_app_secret, self.feishu_token))
logger.info("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format(
self.feishu_app_id, self.feishu_app_secret, self.feishu_token, self.feishu_event_mode))
# 无需群校验和前缀
conf()["group_name_white_list"] = ["ALL_GROUP"]
conf()["single_chat_prefix"] = [""]
# 验证配置
if self.feishu_event_mode == 'websocket' and not LARK_SDK_AVAILABLE:
logger.error("[FeiShu] websocket mode requires lark_oapi. Please install: pip install lark-oapi")
raise Exception("lark_oapi not installed")
def startup(self):
if self.feishu_event_mode == 'websocket':
self._startup_websocket()
else:
self._startup_webhook()
def _startup_webhook(self):
"""启动HTTP服务器接收事件(webhook模式)"""
logger.info("[FeiShu] Starting in webhook mode...")
urls = (
'/', 'channel.feishu.feishu_channel.FeishuController'
)
@@ -50,6 +82,109 @@ class FeiShuChanel(ChatChannel):
port = conf().get("feishu_port", 9891)
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
def _startup_websocket(self):
"""启动长连接接收事件(websocket模式)"""
logger.info("[FeiShu] Starting in websocket mode...")
# 创建事件处理器
def handle_message_event(data: lark.im.v1.P2ImMessageReceiveV1) -> None:
"""处理接收消息事件 v2.0"""
try:
logger.debug(f"[FeiShu] websocket receive event: {lark.JSON.marshal(data, indent=2)}")
# 转换为标准的event格式
event_dict = json.loads(lark.JSON.marshal(data))
event = event_dict.get("event", {})
# 处理消息
self._handle_message_event(event)
except Exception as e:
logger.error(f"[FeiShu] websocket handle message error: {e}", exc_info=True)
# 构建事件分发器
event_handler = lark.EventDispatcherHandler.builder("", "") \
.register_p2_im_message_receive_v1(handle_message_event) \
.build()
# 创建长连接客户端
ws_client = lark.ws.Client(
self.feishu_app_id,
self.feishu_app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.INFO
)
# 在新线程中启动客户端,避免阻塞主线程
def start_client():
try:
logger.info("[FeiShu] Websocket client starting...")
ws_client.start()
except Exception as e:
logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True)
ws_thread = threading.Thread(target=start_client, daemon=True)
ws_thread.start()
# 保持主线程运行
logger.info("[FeiShu] Websocket mode started, waiting for events...")
ws_thread.join()
def _handle_message_event(self, event: dict):
"""
处理消息事件的核心逻辑
webhook和websocket模式共用此方法
"""
if not event.get("message") or not event.get("sender"):
logger.warning(f"[FeiShu] invalid message, event={event}")
return
msg = event.get("message")
# 幂等判断
msg_id = msg.get("message_id")
if self.receivedMsgs.get(msg_id):
logger.warning(f"[FeiShu] repeat msg filtered, msg_id={msg_id}")
return
self.receivedMsgs[msg_id] = True
is_group = False
chat_type = msg.get("chat_type")
if chat_type == "group":
if not msg.get("mentions") and msg.get("message_type") == "text":
# 群聊中未@不响应
return
if msg.get("mentions") and msg.get("mentions")[0].get("name") != conf().get("feishu_bot_name") and msg.get(
"message_type") == "text":
# 不是@机器人,不响应
return
# 群聊
is_group = True
receive_id_type = "chat_id"
elif chat_type == "p2p":
receive_id_type = "open_id"
else:
logger.warning("[FeiShu] message ignore")
return
# 构造飞书消息对象
feishu_msg = FeishuMessage(event, is_group=is_group, access_token=self.fetch_access_token())
if not feishu_msg:
return
context = self._compose_context(
feishu_msg.ctype,
feishu_msg.content,
isgroup=is_group,
msg=feishu_msg,
receive_id_type=receive_id_type,
no_need_at=True
)
if context:
self.produce(context)
logger.info(f"[FeiShu] query={feishu_msg.content}, type={feishu_msg.ctype}")
def send(self, reply: Reply, context: Context):
msg = context.get("msg")
is_group = context["isgroup"]
@@ -143,9 +278,39 @@ class FeiShuChanel(ChatChannel):
os.remove(temp_name)
return upload_response.json().get("data").get("image_key")
def _compose_context(self, ctype: ContextType, content, **kwargs):
context = Context(ctype, content)
context.kwargs = kwargs
if "origin_ctype" not in context:
context["origin_ctype"] = ctype
cmsg = context["msg"]
context["session_id"] = cmsg.from_user_id
context["receiver"] = cmsg.other_user_id
if ctype == ContextType.TEXT:
# 1.文本请求
# 图片生成处理
img_match_prefix = check_prefix(content, conf().get("image_create_prefix"))
if img_match_prefix:
content = content.replace(img_match_prefix, "", 1)
context.type = ContextType.IMAGE_CREATE
else:
context.type = ContextType.TEXT
context.content = content.strip()
elif context.type == ContextType.VOICE:
# 2.语音请求
if "desire_rtype" not in context and conf().get("voice_reply_voice"):
context["desire_rtype"] = ReplyType.VOICE
return context
class FeishuController:
"""
HTTP服务器控制器用于webhook模式
"""
# 类常量
FAILED_MSG = '{"success": false}'
SUCCESS_MSG = '{"success": true}'
@@ -175,80 +340,10 @@ class FeishuController:
# 处理消息事件
event = request.get("event")
if header.get("event_type") == self.MESSAGE_RECEIVE_TYPE and event:
if not event.get("message") or not event.get("sender"):
logger.warning(f"[FeiShu] invalid message, msg={request}")
return self.FAILED_MSG
msg = event.get("message")
channel._handle_message_event(event)
# 幂等判断
if channel.receivedMsgs.get(msg.get("message_id")):
logger.warning(f"[FeiShu] repeat msg filtered, event_id={header.get('event_id')}")
return self.SUCCESS_MSG
channel.receivedMsgs[msg.get("message_id")] = True
is_group = False
chat_type = msg.get("chat_type")
if chat_type == "group":
if not msg.get("mentions") and msg.get("message_type") == "text":
# 群聊中未@不响应
return self.SUCCESS_MSG
if msg.get("mentions")[0].get("name") != conf().get("feishu_bot_name") and msg.get("message_type") == "text":
# 不是@机器人,不响应
return self.SUCCESS_MSG
# 群聊
is_group = True
receive_id_type = "chat_id"
elif chat_type == "p2p":
receive_id_type = "open_id"
else:
logger.warning("[FeiShu] message ignore")
return self.SUCCESS_MSG
# 构造飞书消息对象
feishu_msg = FeishuMessage(event, is_group=is_group, access_token=channel.fetch_access_token())
if not feishu_msg:
return self.SUCCESS_MSG
context = self._compose_context(
feishu_msg.ctype,
feishu_msg.content,
isgroup=is_group,
msg=feishu_msg,
receive_id_type=receive_id_type,
no_need_at=True
)
if context:
channel.produce(context)
logger.info(f"[FeiShu] query={feishu_msg.content}, type={feishu_msg.ctype}")
return self.SUCCESS_MSG
except Exception as e:
logger.error(e)
return self.FAILED_MSG
def _compose_context(self, ctype: ContextType, content, **kwargs):
context = Context(ctype, content)
context.kwargs = kwargs
if "origin_ctype" not in context:
context["origin_ctype"] = ctype
cmsg = context["msg"]
context["session_id"] = cmsg.from_user_id
context["receiver"] = cmsg.other_user_id
if ctype == ContextType.TEXT:
# 1.文本请求
# 图片生成处理
img_match_prefix = check_prefix(content, conf().get("image_create_prefix"))
if img_match_prefix:
content = content.replace(img_match_prefix, "", 1)
context.type = ContextType.IMAGE_CREATE
else:
context.type = ContextType.TEXT
context.content = content.strip()
elif context.type == ContextType.VOICE:
# 2.语音请求
if "desire_rtype" not in context and conf().get("voice_reply_voice"):
context["desire_rtype"] = ReplyType.VOICE
return context

View File

@@ -17,7 +17,7 @@
"@bot"
],
"group_name_white_list": [
"ChatGPT测试群",
"Agent测试群",
"ChatGPT测试群2"
],
"image_create_prefix": [

View File

@@ -148,6 +148,7 @@ available_setting = {
"feishu_app_secret": "", # 飞书机器人APP secret
"feishu_token": "", # 飞书 verification token
"feishu_bot_name": "", # 飞书机器人的名字
"feishu_event_mode": "websocket", # 飞书事件接收模式: webhook(HTTP服务器) 或 websocket(长连接)
# 钉钉配置
"dingtalk_client_id": "", # 钉钉机器人Client ID
"dingtalk_client_secret": "", # 钉钉机器人Client Secret
@@ -199,12 +200,14 @@ class Config(dict):
self.user_datas = {}
def __getitem__(self, key):
if key not in available_setting:
# 跳过以下划线开头的注释字段
if not key.startswith("_") and key not in available_setting:
raise Exception("key {} not in available_setting".format(key))
return super().__getitem__(key)
def __setitem__(self, key, value):
if key not in available_setting:
# 跳过以下划线开头的注释字段
if not key.startswith("_") and key not in available_setting:
raise Exception("key {} not in available_setting".format(key))
return super().__setitem__(key, value)
@@ -286,6 +289,9 @@ def load_config():
# Some online deployment platforms (e.g. Railway) deploy project from github directly. So you shouldn't put your secrets like api key in a config file, instead use environment variables to override the default config.
for name, value in os.environ.items():
name = name.lower()
# 跳过以下划线开头的注释字段
if name.startswith("_"):
continue
if name in available_setting:
logger.info("[INIT] override config by environ args: {}={}".format(name, value))
try:

View File

@@ -15,7 +15,7 @@ elevenlabs==1.0.3 # elevenlabs TTS
#install plugin
dulwich
# wechatmp && wechatcom
# wechatmp && wechatcom && feishu
web.py
wechatpy
@@ -27,6 +27,7 @@ websocket-client==1.2.0
# claude bot
curl_cffi
# claude API
anthropic==0.25.0

View File

@@ -9,3 +9,6 @@ pre-commit
web.py
linkai>=0.0.6.0
agentmesh-sdk>=0.1.3
# feishu websocket mode
lark-oapi