diff --git a/app.py b/app.py index 022fdd7..97b58a1 100644 --- a/app.py +++ b/app.py @@ -7,11 +7,152 @@ import time from channel import channel_factory from common import const -from config import load_config +from common.log import logger +from config import load_config, conf from plugins import * import threading +# Global channel manager for restart support +_channel_mgr = None + + +def get_channel_manager(): + return _channel_mgr + + +class ChannelManager: + """ + Manage the lifecycle of a channel, supporting restart from sub-threads. + The channel.startup() runs in a daemon thread so that the main thread + remains available and a new channel can be started at any time. + """ + + def __init__(self): + self._channel = None + self._channel_thread = None + self._lock = threading.Lock() + + @property + def channel(self): + return self._channel + + def start(self, channel_name: str, first_start: bool = False): + """ + Create and start a channel in a sub-thread. + If first_start is True, plugins and linkai client will also be initialized. + """ + with self._lock: + channel = channel_factory.create_channel(channel_name) + self._channel = channel + + if first_start: + if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web", + "wechatmp_service", "wechatcom_app", "wework", + const.FEISHU, const.DINGTALK]: + PluginManager().load_plugins() + + if conf().get("use_linkai"): + try: + from common import linkai_client + threading.Thread(target=linkai_client.start, args=(channel, self), daemon=True).start() + except Exception as e: + pass + + # Run channel.startup() in a daemon thread so we can restart later + self._channel_thread = threading.Thread( + target=self._run_channel, args=(channel,), daemon=True + ) + self._channel_thread.start() + logger.info(f"[ChannelManager] Channel '{channel_name}' started in sub-thread") + + def _run_channel(self, channel): + try: + channel.startup() + except Exception as e: + logger.error(f"[ChannelManager] Channel startup error: {e}") + logger.exception(e) + + def stop(self): + """ + Stop the current channel. Since most channel startup() methods block + on an HTTP server or stream client, we stop by terminating the thread. + """ + with self._lock: + if self._channel is None: + return + channel_type = getattr(self._channel, 'channel_type', 'unknown') + logger.info(f"[ChannelManager] Stopping channel '{channel_type}'...") + + # Try graceful stop if channel implements it + try: + if hasattr(self._channel, 'stop'): + self._channel.stop() + except Exception as e: + logger.warning(f"[ChannelManager] Error during channel stop: {e}") + + self._channel = None + self._channel_thread = None + + def restart(self, new_channel_name: str): + """ + Restart the channel with a new channel type. + Can be called from any thread (e.g. linkai config callback). + """ + logger.info(f"[ChannelManager] Restarting channel to '{new_channel_name}'...") + self.stop() + + # Clear singleton cache so a fresh channel instance is created + _clear_singleton_cache(new_channel_name) + + time.sleep(1) # Brief pause to allow resources to release + self.start(new_channel_name, first_start=False) + logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully") + + +def _clear_singleton_cache(channel_name: str): + """ + Clear the singleton cache for the channel class so that + a new instance can be created with updated config. + """ + cls_map = { + "wx": "channel.wechat.wechat_channel.WechatChannel", + "wxy": "channel.wechat.wechaty_channel.WechatyChannel", + "wcf": "channel.wechat.wcf_channel.WechatfChannel", + "web": "channel.web.web_channel.WebChannel", + "wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel", + "wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel", + "wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel", + "wework": "channel.wework.wework_channel.WeworkChannel", + const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel", + const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel", + } + module_path = cls_map.get(channel_name) + if not module_path: + return + # The singleton decorator stores instances in a closure dict keyed by class. + # We need to find the actual class and clear it from the closure. + try: + parts = module_path.rsplit(".", 1) + module_name, class_name = parts[0], parts[1] + import importlib + module = importlib.import_module(module_name) + # The module-level name is the wrapper function from @singleton + wrapper = getattr(module, class_name, None) + if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__: + for cell in wrapper.__closure__: + try: + cell_contents = cell.cell_contents + if isinstance(cell_contents, dict): + cell_contents.clear() + logger.debug(f"[ChannelManager] Cleared singleton cache for {class_name}") + break + except ValueError: + pass + except Exception as e: + logger.warning(f"[ChannelManager] Failed to clear singleton cache: {e}") + + def sigterm_handler_wrap(_signo): old_handler = signal.getsignal(_signo) @@ -25,22 +166,8 @@ def sigterm_handler_wrap(_signo): signal.signal(_signo, func) -def start_channel(channel_name: str): - channel = channel_factory.create_channel(channel_name) - if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web", "wechatmp_service", "wechatcom_app", "wework", - const.FEISHU, const.DINGTALK]: - PluginManager().load_plugins() - - if conf().get("use_linkai"): - try: - from common import linkai_client - threading.Thread(target=linkai_client.start, args=(channel,)).start() - except Exception as e: - pass - channel.startup() - - def run(): + global _channel_mgr try: # load config load_config() @@ -58,7 +185,8 @@ def run(): if channel_name == "wxy": os.environ["WECHATY_LOG"] = "warn" - start_channel(channel_name) + _channel_mgr = ChannelManager() + _channel_mgr.start(channel_name, first_start=True) while True: time.sleep(1) diff --git a/channel/channel.py b/channel/channel.py index 08799c6..42d613f 100644 --- a/channel/channel.py +++ b/channel/channel.py @@ -19,6 +19,12 @@ class Channel(object): """ raise NotImplementedError + def stop(self): + """ + stop channel gracefully, called before restart + """ + pass + def handle_text(self, msg): """ process received msg diff --git a/channel/dingtalk/dingtalk_channel.py b/channel/dingtalk/dingtalk_channel.py index 0094f56..4c41d3b 100644 --- a/channel/dingtalk/dingtalk_channel.py +++ b/channel/dingtalk/dingtalk_channel.py @@ -90,13 +90,9 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): dingtalk_client_secret = conf().get('dingtalk_client_secret') def setup_logger(self): - logger = logging.getLogger() - handler = logging.StreamHandler() - handler.setFormatter( - logging.Formatter('%(asctime)s %(name)-8s %(levelname)-8s %(message)s [%(filename)s:%(lineno)d]')) - logger.addHandler(handler) - logger.setLevel(logging.INFO) - return logger + # Suppress verbose logs from dingtalk_stream SDK + logging.getLogger("dingtalk_stream").setLevel(logging.WARNING) + return logging.getLogger("DingTalk") def __init__(self): super().__init__() @@ -104,6 +100,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): self.logger = self.setup_logger() # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600)) + self._stream_client = None logger.debug("[DingTalk] client_id={}, client_secret={} ".format( self.dingtalk_client_id, self.dingtalk_client_secret)) # 无需群校验和前缀 @@ -119,9 +116,19 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): def startup(self): credential = dingtalk_stream.Credential(self.dingtalk_client_id, self.dingtalk_client_secret) client = dingtalk_stream.DingTalkStreamClient(credential) + self._stream_client = client client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self) logger.info("[DingTalk] ✅ Stream connected, ready to receive messages") client.start_forever() + + def stop(self): + if self._stream_client: + try: + self._stream_client.stop() + logger.info("[DingTalk] Stream client stopped") + except Exception as e: + logger.warning(f"[DingTalk] Error stopping stream client: {e}") + self._stream_client = None def get_access_token(self): """ diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index f2d6c4b..e6a8ff7 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -12,6 +12,7 @@ """ import json +import logging import os import ssl import threading @@ -32,6 +33,9 @@ from common.log import logger from common.singleton import singleton from config import conf +# Suppress verbose logs from Lark SDK +logging.getLogger("Lark").setLevel(logging.WARNING) + URL_VERIFICATION = "url_verification" # 尝试导入飞书SDK,如果未安装则websocket模式不可用 @@ -56,6 +60,7 @@ class FeiShuChanel(ChatChannel): super().__init__() # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(60 * 60 * 7.1) + self._http_server = None logger.debug("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format( self.feishu_app_id, self.feishu_app_secret, self.feishu_token, self.feishu_event_mode)) # 无需群校验和前缀 @@ -73,6 +78,15 @@ class FeiShuChanel(ChatChannel): else: self._startup_webhook() + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[FeiShu] HTTP server stopped") + except Exception as e: + logger.warning(f"[FeiShu] Error stopping HTTP server: {e}") + self._http_server = None + def _startup_webhook(self): """启动HTTP服务器接收事件(webhook模式)""" logger.debug("[FeiShu] Starting in webhook mode...") @@ -81,7 +95,14 @@ class FeiShuChanel(ChatChannel): ) app = web.application(urls, globals(), autoreload=False) port = conf().get("feishu_port", 9891) - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server + try: + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() def _startup_websocket(self): """启动长连接接收事件(websocket模式)""" @@ -138,7 +159,7 @@ class FeiShuChanel(ChatChannel): self.feishu_app_id, self.feishu_app_secret, event_handler=event_handler, - log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.INFO + log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.WARNING ) logger.debug("[FeiShu] Websocket client starting...") diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 88c04fd..43c245e 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -50,6 +50,7 @@ class WebChannel(ChatChannel): self.msg_id_counter = 0 # 添加消息ID计数器 self.session_queues = {} # 存储session_id到队列的映射 self.request_to_session = {} # 存储request_id到session_id的映射 + self._http_server = None def _generate_msg_id(self): @@ -235,13 +236,24 @@ class WebChannel(ChatChannel): logging.getLogger("web").setLevel(logging.ERROR) logging.getLogger("web.httpserver").setLevel(logging.ERROR) - # 抑制 web.py 默认的服务器启动消息 - old_stdout = sys.stdout - sys.stdout = io.StringIO() + # Build WSGI app with middleware (same as runsimple but without print) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server try: - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) - finally: - sys.stdout = old_stdout + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() + + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[WebChannel] HTTP server stopped") + except Exception as e: + logger.warning(f"[WebChannel] Error stopping HTTP server: {e}") + self._http_server = None class RootHandler: diff --git a/channel/wechatcom/wechatcomapp_channel.py b/channel/wechatcom/wechatcomapp_channel.py index 262702a..3321f54 100644 --- a/channel/wechatcom/wechatcomapp_channel.py +++ b/channel/wechatcom/wechatcomapp_channel.py @@ -36,6 +36,7 @@ class WechatComAppChannel(ChatChannel): self.agent_id = conf().get("wechatcomapp_agent_id") self.token = conf().get("wechatcomapp_token") self.aes_key = conf().get("wechatcomapp_aes_key") + self._http_server = None logger.info( "[wechatcom] Initializing WeCom app channel, corp_id: {}, agent_id: {}".format(self.corp_id, self.agent_id) ) @@ -51,13 +52,24 @@ class WechatComAppChannel(ChatChannel): logger.info("[wechatcom] 📡 Listening on http://0.0.0.0:{}/wxcomapp/".format(port)) logger.info("[wechatcom] 🤖 Ready to receive messages") - # Suppress web.py's default server startup message - old_stdout = sys.stdout - sys.stdout = io.StringIO() + # Build WSGI app with middleware (same as runsimple but without print) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server try: - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) - finally: - sys.stdout = old_stdout + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() + + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[wechatcom] HTTP server stopped") + except Exception as e: + logger.warning(f"[wechatcom] Error stopping HTTP server: {e}") + self._http_server = None def send(self, reply: Reply, context: Context): receiver = context["receiver"] diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py index e0a07dd..c066f28 100644 --- a/channel/wechatmp/wechatmp_channel.py +++ b/channel/wechatmp/wechatmp_channel.py @@ -41,6 +41,7 @@ class WechatMPChannel(ChatChannel): super().__init__() self.passive_reply = passive_reply self.NOT_SUPPORT_REPLYTYPE = [] + self._http_server = None appid = conf().get("wechatmp_app_id") secret = conf().get("wechatmp_app_secret") token = conf().get("wechatmp_token") @@ -69,7 +70,23 @@ class WechatMPChannel(ChatChannel): urls = ("/wx", "channel.wechatmp.active_reply.Query") app = web.application(urls, globals(), autoreload=False) port = conf().get("wechatmp_port", 8080) - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server + try: + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() + + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[wechatmp] HTTP server stopped") + except Exception as e: + logger.warning(f"[wechatmp] Error stopping HTTP server: {e}") + self._http_server = None def start_loop(self, loop): asyncio.set_event_loop(loop) diff --git a/common/linkai_client.py b/common/linkai_client.py index 16c7049..7ee5e8a 100644 --- a/common/linkai_client.py +++ b/common/linkai_client.py @@ -2,9 +2,12 @@ from bridge.context import Context, ContextType from bridge.reply import Reply, ReplyType from common.log import logger from linkai import LinkAIClient, PushMsg -from config import conf, pconf, plugin_config, available_setting, write_plugin_config +from config import conf, pconf, plugin_config, available_setting, write_plugin_config, get_root from plugins import PluginManager +import threading import time +import json +import os chat_client: LinkAIClient @@ -15,6 +18,7 @@ class ChatClient(LinkAIClient): super().__init__(api_key, host) self.channel = channel self.client_type = channel.channel_type + self.channel_mgr = None def on_message(self, push_msg: PushMsg): session_id = push_msg.session_id @@ -34,9 +38,12 @@ class ChatClient(LinkAIClient): return local_config = conf() + need_restart_channel = False + for key in config.keys(): if key in available_setting and config.get(key) is not None: local_config[key] = config.get(key) + # 语音配置 reply_voice_mode = config.get("reply_voice_mode") if reply_voice_mode: @@ -50,6 +57,55 @@ class ChatClient(LinkAIClient): local_config["always_reply_voice"] = False local_config["voice_reply_voice"] = False + # Model configuration + if config.get("model"): + local_config["model"] = config.get("model") + + # Channel configuration + if config.get("channelType"): + if local_config.get("channel_type") != config.get("channelType"): + local_config["channel_type"] = config.get("channelType") + need_restart_channel = True + + # Channel-specific app credentials + current_channel_type = local_config.get("channel_type", "") + + if config.get("app_id") is not None: + if current_channel_type == "feishu": + if local_config.get("feishu_app_id") != config.get("app_id"): + local_config["feishu_app_id"] = config.get("app_id") + need_restart_channel = True + elif current_channel_type == "dingtalk": + if local_config.get("dingtalk_client_id") != config.get("app_id"): + local_config["dingtalk_client_id"] = config.get("app_id") + need_restart_channel = True + elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + if local_config.get("wechatmp_app_id") != config.get("app_id"): + local_config["wechatmp_app_id"] = config.get("app_id") + need_restart_channel = True + elif current_channel_type == "wechatcom_app": + if local_config.get("wechatcomapp_agent_id") != config.get("app_id"): + local_config["wechatcomapp_agent_id"] = config.get("app_id") + need_restart_channel = True + + if config.get("app_secret"): + if current_channel_type == "feishu": + if local_config.get("feishu_app_secret") != config.get("app_secret"): + local_config["feishu_app_secret"] = config.get("app_secret") + need_restart_channel = True + elif current_channel_type == "dingtalk": + if local_config.get("dingtalk_client_secret") != config.get("app_secret"): + local_config["dingtalk_client_secret"] = config.get("app_secret") + need_restart_channel = True + elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + if local_config.get("wechatmp_app_secret") != config.get("app_secret"): + local_config["wechatmp_app_secret"] = config.get("app_secret") + need_restart_channel = True + elif current_channel_type == "wechatcom_app": + if local_config.get("wechatcomapp_secret") != config.get("app_secret"): + local_config["wechatcomapp_secret"] = config.get("app_secret") + need_restart_channel = True + if config.get("admin_password"): if not pconf("Godcmd"): write_plugin_config({"Godcmd": {"password": config.get("admin_password"), "admin_users": []} }) @@ -71,11 +127,67 @@ class ChatClient(LinkAIClient): elif config.get("text_to_image") and config.get("text_to_image") in ["dall-e-2", "dall-e-3"]: if pconf("linkai")["midjourney"]: pconf("linkai")["midjourney"]["use_image_create_prefix"] = False + + # Save configuration to config.json file + self._save_config_to_file(local_config) + + if need_restart_channel: + self._restart_channel(local_config.get("channel_type", "")) + + def _restart_channel(self, new_channel_type: str): + """ + Restart the channel via ChannelManager when channel type changes. + """ + if self.channel_mgr: + logger.info(f"[LinkAI] Restarting channel to '{new_channel_type}'...") + threading.Thread(target=self._do_restart_channel, args=(self.channel_mgr, new_channel_type), daemon=True).start() + else: + logger.warning("[LinkAI] ChannelManager not available, please restart the application manually") + + def _do_restart_channel(self, mgr, new_channel_type: str): + """ + Perform the channel restart in a separate thread to avoid blocking the config callback. + """ + try: + mgr.restart(new_channel_type) + # Update the linkai client's channel reference + if mgr.channel: + self.channel = mgr.channel + self.client_type = mgr.channel.channel_type + logger.info(f"[LinkAI] Channel reference updated to '{new_channel_type}'") + except Exception as e: + logger.error(f"[LinkAI] Channel restart failed: {e}") + + def _save_config_to_file(self, local_config: dict): + """ + Save configuration to config.json file + """ + try: + config_path = os.path.join(get_root(), "config.json") + if not os.path.exists(config_path): + logger.warning(f"[LinkAI] config.json not found at {config_path}, skip saving") + return + + # Read current config file + with open(config_path, "r", encoding="utf-8") as f: + file_config = json.load(f) + + # Update file config with memory config + file_config.update(dict(local_config)) + + # Write back to file + with open(config_path, "w", encoding="utf-8") as f: + json.dump(file_config, f, indent=4, ensure_ascii=False) + + logger.info("[LinkAI] Configuration saved to config.json successfully") + except Exception as e: + logger.error(f"[LinkAI] Failed to save configuration to config.json: {e}") -def start(channel): +def start(channel, channel_mgr=None): global chat_client - chat_client = ChatClient(api_key=conf().get("linkai_api_key"), host="", channel=channel) + chat_client = ChatClient(api_key=conf().get("linkai_api_key"), channel=channel) + chat_client.channel_mgr = channel_mgr chat_client.config = _build_config() chat_client.start() time.sleep(1.5) @@ -97,14 +209,38 @@ def _build_config(): "nick_name_black_list": local_conf.get("nick_name_black_list"), "speech_recognition": "Y" if local_conf.get("speech_recognition") else "N", "text_to_image": local_conf.get("text_to_image"), - "image_create_prefix": local_conf.get("image_create_prefix") + "image_create_prefix": local_conf.get("image_create_prefix"), + "model": local_conf.get("model"), + "agent_max_context_turns": local_conf.get("agent_max_context_turns"), + "agent_max_context_tokens": local_conf.get("agent_max_context_tokens"), + "agent_max_steps": local_conf.get("agent_max_steps"), + "channelType": local_conf.get("channel_type") } + if local_conf.get("always_reply_voice"): config["reply_voice_mode"] = "always_reply_voice" elif local_conf.get("voice_reply_voice"): config["reply_voice_mode"] = "voice_reply_voice" + if pconf("linkai"): config["group_app_map"] = pconf("linkai").get("group_app_map") + if plugin_config.get("Godcmd"): config["admin_password"] = plugin_config.get("Godcmd").get("password") + + # Add channel-specific app credentials + current_channel_type = local_conf.get("channel_type", "") + if current_channel_type == "feishu": + config["app_id"] = local_conf.get("feishu_app_id") + config["app_secret"] = local_conf.get("feishu_app_secret") + elif current_channel_type == "dingtalk": + config["app_id"] = local_conf.get("dingtalk_client_id") + config["app_secret"] = local_conf.get("dingtalk_client_secret") + elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + config["app_id"] = local_conf.get("wechatmp_app_id") + config["app_secret"] = local_conf.get("wechatmp_app_secret") + elif current_channel_type == "wechatcom_app": + config["app_id"] = local_conf.get("wechatcomapp_agent_id") + config["app_secret"] = local_conf.get("wechatcomapp_secret") + return config