feat: support channel start in sub thread

This commit is contained in:
zhayujie
2026-02-13 12:38:52 +08:00
parent a24b26a1ef
commit 46945942e1
8 changed files with 382 additions and 43 deletions

162
app.py
View File

@@ -7,11 +7,152 @@ import time
from channel import channel_factory
from common import const
from config import load_config
from common.log import logger
from config import load_config, conf
from plugins import *
import threading
# Global channel manager for restart support
_channel_mgr = None
def get_channel_manager():
return _channel_mgr
class ChannelManager:
"""
Manage the lifecycle of a channel, supporting restart from sub-threads.
The channel.startup() runs in a daemon thread so that the main thread
remains available and a new channel can be started at any time.
"""
def __init__(self):
self._channel = None
self._channel_thread = None
self._lock = threading.Lock()
@property
def channel(self):
return self._channel
def start(self, channel_name: str, first_start: bool = False):
"""
Create and start a channel in a sub-thread.
If first_start is True, plugins and linkai client will also be initialized.
"""
with self._lock:
channel = channel_factory.create_channel(channel_name)
self._channel = channel
if first_start:
if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web",
"wechatmp_service", "wechatcom_app", "wework",
const.FEISHU, const.DINGTALK]:
PluginManager().load_plugins()
if conf().get("use_linkai"):
try:
from common import linkai_client
threading.Thread(target=linkai_client.start, args=(channel, self), daemon=True).start()
except Exception as e:
pass
# Run channel.startup() in a daemon thread so we can restart later
self._channel_thread = threading.Thread(
target=self._run_channel, args=(channel,), daemon=True
)
self._channel_thread.start()
logger.info(f"[ChannelManager] Channel '{channel_name}' started in sub-thread")
def _run_channel(self, channel):
try:
channel.startup()
except Exception as e:
logger.error(f"[ChannelManager] Channel startup error: {e}")
logger.exception(e)
def stop(self):
"""
Stop the current channel. Since most channel startup() methods block
on an HTTP server or stream client, we stop by terminating the thread.
"""
with self._lock:
if self._channel is None:
return
channel_type = getattr(self._channel, 'channel_type', 'unknown')
logger.info(f"[ChannelManager] Stopping channel '{channel_type}'...")
# Try graceful stop if channel implements it
try:
if hasattr(self._channel, 'stop'):
self._channel.stop()
except Exception as e:
logger.warning(f"[ChannelManager] Error during channel stop: {e}")
self._channel = None
self._channel_thread = None
def restart(self, new_channel_name: str):
"""
Restart the channel with a new channel type.
Can be called from any thread (e.g. linkai config callback).
"""
logger.info(f"[ChannelManager] Restarting channel to '{new_channel_name}'...")
self.stop()
# Clear singleton cache so a fresh channel instance is created
_clear_singleton_cache(new_channel_name)
time.sleep(1) # Brief pause to allow resources to release
self.start(new_channel_name, first_start=False)
logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully")
def _clear_singleton_cache(channel_name: str):
"""
Clear the singleton cache for the channel class so that
a new instance can be created with updated config.
"""
cls_map = {
"wx": "channel.wechat.wechat_channel.WechatChannel",
"wxy": "channel.wechat.wechaty_channel.WechatyChannel",
"wcf": "channel.wechat.wcf_channel.WechatfChannel",
"web": "channel.web.web_channel.WebChannel",
"wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel",
"wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel",
"wework": "channel.wework.wework_channel.WeworkChannel",
const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel",
const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel",
}
module_path = cls_map.get(channel_name)
if not module_path:
return
# The singleton decorator stores instances in a closure dict keyed by class.
# We need to find the actual class and clear it from the closure.
try:
parts = module_path.rsplit(".", 1)
module_name, class_name = parts[0], parts[1]
import importlib
module = importlib.import_module(module_name)
# The module-level name is the wrapper function from @singleton
wrapper = getattr(module, class_name, None)
if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__:
for cell in wrapper.__closure__:
try:
cell_contents = cell.cell_contents
if isinstance(cell_contents, dict):
cell_contents.clear()
logger.debug(f"[ChannelManager] Cleared singleton cache for {class_name}")
break
except ValueError:
pass
except Exception as e:
logger.warning(f"[ChannelManager] Failed to clear singleton cache: {e}")
def sigterm_handler_wrap(_signo):
old_handler = signal.getsignal(_signo)
@@ -25,22 +166,8 @@ def sigterm_handler_wrap(_signo):
signal.signal(_signo, func)
def start_channel(channel_name: str):
channel = channel_factory.create_channel(channel_name)
if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web", "wechatmp_service", "wechatcom_app", "wework",
const.FEISHU, const.DINGTALK]:
PluginManager().load_plugins()
if conf().get("use_linkai"):
try:
from common import linkai_client
threading.Thread(target=linkai_client.start, args=(channel,)).start()
except Exception as e:
pass
channel.startup()
def run():
global _channel_mgr
try:
# load config
load_config()
@@ -58,7 +185,8 @@ def run():
if channel_name == "wxy":
os.environ["WECHATY_LOG"] = "warn"
start_channel(channel_name)
_channel_mgr = ChannelManager()
_channel_mgr.start(channel_name, first_start=True)
while True:
time.sleep(1)

View File

@@ -19,6 +19,12 @@ class Channel(object):
"""
raise NotImplementedError
def stop(self):
"""
stop channel gracefully, called before restart
"""
pass
def handle_text(self, msg):
"""
process received msg

View File

@@ -90,13 +90,9 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
dingtalk_client_secret = conf().get('dingtalk_client_secret')
def setup_logger(self):
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter('%(asctime)s %(name)-8s %(levelname)-8s %(message)s [%(filename)s:%(lineno)d]'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# Suppress verbose logs from dingtalk_stream SDK
logging.getLogger("dingtalk_stream").setLevel(logging.WARNING)
return logging.getLogger("DingTalk")
def __init__(self):
super().__init__()
@@ -104,6 +100,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
self.logger = self.setup_logger()
# 历史消息id暂存用于幂等控制
self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600))
self._stream_client = None
logger.debug("[DingTalk] client_id={}, client_secret={} ".format(
self.dingtalk_client_id, self.dingtalk_client_secret))
# 无需群校验和前缀
@@ -119,9 +116,19 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
def startup(self):
credential = dingtalk_stream.Credential(self.dingtalk_client_id, self.dingtalk_client_secret)
client = dingtalk_stream.DingTalkStreamClient(credential)
self._stream_client = client
client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self)
logger.info("[DingTalk] ✅ Stream connected, ready to receive messages")
client.start_forever()
def stop(self):
if self._stream_client:
try:
self._stream_client.stop()
logger.info("[DingTalk] Stream client stopped")
except Exception as e:
logger.warning(f"[DingTalk] Error stopping stream client: {e}")
self._stream_client = None
def get_access_token(self):
"""

View File

@@ -12,6 +12,7 @@
"""
import json
import logging
import os
import ssl
import threading
@@ -32,6 +33,9 @@ from common.log import logger
from common.singleton import singleton
from config import conf
# Suppress verbose logs from Lark SDK
logging.getLogger("Lark").setLevel(logging.WARNING)
URL_VERIFICATION = "url_verification"
# 尝试导入飞书SDK,如果未安装则websocket模式不可用
@@ -56,6 +60,7 @@ class FeiShuChanel(ChatChannel):
super().__init__()
# 历史消息id暂存用于幂等控制
self.receivedMsgs = ExpiredDict(60 * 60 * 7.1)
self._http_server = None
logger.debug("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format(
self.feishu_app_id, self.feishu_app_secret, self.feishu_token, self.feishu_event_mode))
# 无需群校验和前缀
@@ -73,6 +78,15 @@ class FeiShuChanel(ChatChannel):
else:
self._startup_webhook()
def stop(self):
if self._http_server:
try:
self._http_server.stop()
logger.info("[FeiShu] HTTP server stopped")
except Exception as e:
logger.warning(f"[FeiShu] Error stopping HTTP server: {e}")
self._http_server = None
def _startup_webhook(self):
"""启动HTTP服务器接收事件(webhook模式)"""
logger.debug("[FeiShu] Starting in webhook mode...")
@@ -81,7 +95,14 @@ class FeiShuChanel(ChatChannel):
)
app = web.application(urls, globals(), autoreload=False)
port = conf().get("feishu_port", 9891)
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
server = web.httpserver.WSGIServer(("0.0.0.0", port), func)
self._http_server = server
try:
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
def _startup_websocket(self):
"""启动长连接接收事件(websocket模式)"""
@@ -138,7 +159,7 @@ class FeiShuChanel(ChatChannel):
self.feishu_app_id,
self.feishu_app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.INFO
log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.WARNING
)
logger.debug("[FeiShu] Websocket client starting...")

View File

@@ -50,6 +50,7 @@ class WebChannel(ChatChannel):
self.msg_id_counter = 0 # 添加消息ID计数器
self.session_queues = {} # 存储session_id到队列的映射
self.request_to_session = {} # 存储request_id到session_id的映射
self._http_server = None
def _generate_msg_id(self):
@@ -235,13 +236,24 @@ class WebChannel(ChatChannel):
logging.getLogger("web").setLevel(logging.ERROR)
logging.getLogger("web.httpserver").setLevel(logging.ERROR)
# 抑制 web.py 默认的服务器启动消息
old_stdout = sys.stdout
sys.stdout = io.StringIO()
# Build WSGI app with middleware (same as runsimple but without print)
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
server = web.httpserver.WSGIServer(("0.0.0.0", port), func)
self._http_server = server
try:
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
finally:
sys.stdout = old_stdout
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
def stop(self):
if self._http_server:
try:
self._http_server.stop()
logger.info("[WebChannel] HTTP server stopped")
except Exception as e:
logger.warning(f"[WebChannel] Error stopping HTTP server: {e}")
self._http_server = None
class RootHandler:

View File

@@ -36,6 +36,7 @@ class WechatComAppChannel(ChatChannel):
self.agent_id = conf().get("wechatcomapp_agent_id")
self.token = conf().get("wechatcomapp_token")
self.aes_key = conf().get("wechatcomapp_aes_key")
self._http_server = None
logger.info(
"[wechatcom] Initializing WeCom app channel, corp_id: {}, agent_id: {}".format(self.corp_id, self.agent_id)
)
@@ -51,13 +52,24 @@ class WechatComAppChannel(ChatChannel):
logger.info("[wechatcom] 📡 Listening on http://0.0.0.0:{}/wxcomapp/".format(port))
logger.info("[wechatcom] 🤖 Ready to receive messages")
# Suppress web.py's default server startup message
old_stdout = sys.stdout
sys.stdout = io.StringIO()
# Build WSGI app with middleware (same as runsimple but without print)
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
server = web.httpserver.WSGIServer(("0.0.0.0", port), func)
self._http_server = server
try:
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
finally:
sys.stdout = old_stdout
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
def stop(self):
if self._http_server:
try:
self._http_server.stop()
logger.info("[wechatcom] HTTP server stopped")
except Exception as e:
logger.warning(f"[wechatcom] Error stopping HTTP server: {e}")
self._http_server = None
def send(self, reply: Reply, context: Context):
receiver = context["receiver"]

View File

@@ -41,6 +41,7 @@ class WechatMPChannel(ChatChannel):
super().__init__()
self.passive_reply = passive_reply
self.NOT_SUPPORT_REPLYTYPE = []
self._http_server = None
appid = conf().get("wechatmp_app_id")
secret = conf().get("wechatmp_app_secret")
token = conf().get("wechatmp_token")
@@ -69,7 +70,23 @@ class WechatMPChannel(ChatChannel):
urls = ("/wx", "channel.wechatmp.active_reply.Query")
app = web.application(urls, globals(), autoreload=False)
port = conf().get("wechatmp_port", 8080)
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
server = web.httpserver.WSGIServer(("0.0.0.0", port), func)
self._http_server = server
try:
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
def stop(self):
if self._http_server:
try:
self._http_server.stop()
logger.info("[wechatmp] HTTP server stopped")
except Exception as e:
logger.warning(f"[wechatmp] Error stopping HTTP server: {e}")
self._http_server = None
def start_loop(self, loop):
asyncio.set_event_loop(loop)

View File

@@ -2,9 +2,12 @@ from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from common.log import logger
from linkai import LinkAIClient, PushMsg
from config import conf, pconf, plugin_config, available_setting, write_plugin_config
from config import conf, pconf, plugin_config, available_setting, write_plugin_config, get_root
from plugins import PluginManager
import threading
import time
import json
import os
chat_client: LinkAIClient
@@ -15,6 +18,7 @@ class ChatClient(LinkAIClient):
super().__init__(api_key, host)
self.channel = channel
self.client_type = channel.channel_type
self.channel_mgr = None
def on_message(self, push_msg: PushMsg):
session_id = push_msg.session_id
@@ -34,9 +38,12 @@ class ChatClient(LinkAIClient):
return
local_config = conf()
need_restart_channel = False
for key in config.keys():
if key in available_setting and config.get(key) is not None:
local_config[key] = config.get(key)
# 语音配置
reply_voice_mode = config.get("reply_voice_mode")
if reply_voice_mode:
@@ -50,6 +57,55 @@ class ChatClient(LinkAIClient):
local_config["always_reply_voice"] = False
local_config["voice_reply_voice"] = False
# Model configuration
if config.get("model"):
local_config["model"] = config.get("model")
# Channel configuration
if config.get("channelType"):
if local_config.get("channel_type") != config.get("channelType"):
local_config["channel_type"] = config.get("channelType")
need_restart_channel = True
# Channel-specific app credentials
current_channel_type = local_config.get("channel_type", "")
if config.get("app_id") is not None:
if current_channel_type == "feishu":
if local_config.get("feishu_app_id") != config.get("app_id"):
local_config["feishu_app_id"] = config.get("app_id")
need_restart_channel = True
elif current_channel_type == "dingtalk":
if local_config.get("dingtalk_client_id") != config.get("app_id"):
local_config["dingtalk_client_id"] = config.get("app_id")
need_restart_channel = True
elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service":
if local_config.get("wechatmp_app_id") != config.get("app_id"):
local_config["wechatmp_app_id"] = config.get("app_id")
need_restart_channel = True
elif current_channel_type == "wechatcom_app":
if local_config.get("wechatcomapp_agent_id") != config.get("app_id"):
local_config["wechatcomapp_agent_id"] = config.get("app_id")
need_restart_channel = True
if config.get("app_secret"):
if current_channel_type == "feishu":
if local_config.get("feishu_app_secret") != config.get("app_secret"):
local_config["feishu_app_secret"] = config.get("app_secret")
need_restart_channel = True
elif current_channel_type == "dingtalk":
if local_config.get("dingtalk_client_secret") != config.get("app_secret"):
local_config["dingtalk_client_secret"] = config.get("app_secret")
need_restart_channel = True
elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service":
if local_config.get("wechatmp_app_secret") != config.get("app_secret"):
local_config["wechatmp_app_secret"] = config.get("app_secret")
need_restart_channel = True
elif current_channel_type == "wechatcom_app":
if local_config.get("wechatcomapp_secret") != config.get("app_secret"):
local_config["wechatcomapp_secret"] = config.get("app_secret")
need_restart_channel = True
if config.get("admin_password"):
if not pconf("Godcmd"):
write_plugin_config({"Godcmd": {"password": config.get("admin_password"), "admin_users": []} })
@@ -71,11 +127,67 @@ class ChatClient(LinkAIClient):
elif config.get("text_to_image") and config.get("text_to_image") in ["dall-e-2", "dall-e-3"]:
if pconf("linkai")["midjourney"]:
pconf("linkai")["midjourney"]["use_image_create_prefix"] = False
# Save configuration to config.json file
self._save_config_to_file(local_config)
if need_restart_channel:
self._restart_channel(local_config.get("channel_type", ""))
def _restart_channel(self, new_channel_type: str):
"""
Restart the channel via ChannelManager when channel type changes.
"""
if self.channel_mgr:
logger.info(f"[LinkAI] Restarting channel to '{new_channel_type}'...")
threading.Thread(target=self._do_restart_channel, args=(self.channel_mgr, new_channel_type), daemon=True).start()
else:
logger.warning("[LinkAI] ChannelManager not available, please restart the application manually")
def _do_restart_channel(self, mgr, new_channel_type: str):
"""
Perform the channel restart in a separate thread to avoid blocking the config callback.
"""
try:
mgr.restart(new_channel_type)
# Update the linkai client's channel reference
if mgr.channel:
self.channel = mgr.channel
self.client_type = mgr.channel.channel_type
logger.info(f"[LinkAI] Channel reference updated to '{new_channel_type}'")
except Exception as e:
logger.error(f"[LinkAI] Channel restart failed: {e}")
def _save_config_to_file(self, local_config: dict):
"""
Save configuration to config.json file
"""
try:
config_path = os.path.join(get_root(), "config.json")
if not os.path.exists(config_path):
logger.warning(f"[LinkAI] config.json not found at {config_path}, skip saving")
return
# Read current config file
with open(config_path, "r", encoding="utf-8") as f:
file_config = json.load(f)
# Update file config with memory config
file_config.update(dict(local_config))
# Write back to file
with open(config_path, "w", encoding="utf-8") as f:
json.dump(file_config, f, indent=4, ensure_ascii=False)
logger.info("[LinkAI] Configuration saved to config.json successfully")
except Exception as e:
logger.error(f"[LinkAI] Failed to save configuration to config.json: {e}")
def start(channel):
def start(channel, channel_mgr=None):
global chat_client
chat_client = ChatClient(api_key=conf().get("linkai_api_key"), host="", channel=channel)
chat_client = ChatClient(api_key=conf().get("linkai_api_key"), channel=channel)
chat_client.channel_mgr = channel_mgr
chat_client.config = _build_config()
chat_client.start()
time.sleep(1.5)
@@ -97,14 +209,38 @@ def _build_config():
"nick_name_black_list": local_conf.get("nick_name_black_list"),
"speech_recognition": "Y" if local_conf.get("speech_recognition") else "N",
"text_to_image": local_conf.get("text_to_image"),
"image_create_prefix": local_conf.get("image_create_prefix")
"image_create_prefix": local_conf.get("image_create_prefix"),
"model": local_conf.get("model"),
"agent_max_context_turns": local_conf.get("agent_max_context_turns"),
"agent_max_context_tokens": local_conf.get("agent_max_context_tokens"),
"agent_max_steps": local_conf.get("agent_max_steps"),
"channelType": local_conf.get("channel_type")
}
if local_conf.get("always_reply_voice"):
config["reply_voice_mode"] = "always_reply_voice"
elif local_conf.get("voice_reply_voice"):
config["reply_voice_mode"] = "voice_reply_voice"
if pconf("linkai"):
config["group_app_map"] = pconf("linkai").get("group_app_map")
if plugin_config.get("Godcmd"):
config["admin_password"] = plugin_config.get("Godcmd").get("password")
# Add channel-specific app credentials
current_channel_type = local_conf.get("channel_type", "")
if current_channel_type == "feishu":
config["app_id"] = local_conf.get("feishu_app_id")
config["app_secret"] = local_conf.get("feishu_app_secret")
elif current_channel_type == "dingtalk":
config["app_id"] = local_conf.get("dingtalk_client_id")
config["app_secret"] = local_conf.get("dingtalk_client_secret")
elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service":
config["app_id"] = local_conf.get("wechatmp_app_id")
config["app_secret"] = local_conf.get("wechatmp_app_secret")
elif current_channel_type == "wechatcom_app":
config["app_id"] = local_conf.get("wechatcomapp_agent_id")
config["app_secret"] = local_conf.get("wechatcomapp_secret")
return config