From f1c04bc60d352f0cb89b5928d6cdec0bcb705348 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Thu, 5 Mar 2026 15:55:16 +0800 Subject: [PATCH] feat: improve channel connection stability --- app.py | 49 +++++- channel/channel.py | 26 +++ channel/chat_channel.py | 1 + channel/dingtalk/dingtalk_channel.py | 91 +++++++++- channel/feishu/feishu_channel.py | 15 +- common/cloud_client.py | 239 ++++++++++++++++++++++----- 6 files changed, 367 insertions(+), 54 deletions(-) diff --git a/app.py b/app.py index c3315c1..073aae3 100644 --- a/app.py +++ b/app.py @@ -47,6 +47,7 @@ class ChannelManager: self._threads = {} # channel_name -> thread self._primary_channel = None self._lock = threading.Lock() + self.cloud_mode = False # set to True when cloud client is active @property def channel(self): @@ -65,6 +66,7 @@ class ChannelManager: channels = [] for name in channel_names: ch = channel_factory.create_channel(name) + ch.cloud_mode = self.cloud_mode self._channels[name] = ch channels.append((name, ch)) if self._primary_channel is None and name != "web": @@ -136,13 +138,22 @@ class ChannelManager: self._interrupt_thread(th, name) continue logger.info(f"[ChannelManager] Stopping channel '{name}'...") - try: - if hasattr(ch, 'stop'): + graceful = False + if hasattr(ch, 'stop'): + try: ch.stop() - except Exception as e: - logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}") + graceful = True + except Exception as e: + logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}") if th and th.is_alive(): - self._interrupt_thread(th, name) + th.join(timeout=5) + if th.is_alive(): + if graceful: + logger.info(f"[ChannelManager] Channel '{name}' thread still alive after stop(), " + "leaving daemon thread to finish on its own") + else: + logger.warning(f"[ChannelManager] Channel '{name}' thread did not exit in 5s, forcing interrupt") + self._interrupt_thread(th, name) @staticmethod def _interrupt_thread(th: threading.Thread, name: str): @@ -175,6 +186,34 @@ class ChannelManager: self.start([new_channel_name], first_start=False) logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully") + def add_channel(self, channel_name: str): + """ + Dynamically add and start a new channel. + If the channel is already running, restart it instead. + """ + with self._lock: + if channel_name in self._channels: + logger.info(f"[ChannelManager] Channel '{channel_name}' already exists, restarting") + if self._channels.get(channel_name): + self.restart(channel_name) + return + logger.info(f"[ChannelManager] Adding channel '{channel_name}'...") + _clear_singleton_cache(channel_name) + self.start([channel_name], first_start=False) + logger.info(f"[ChannelManager] Channel '{channel_name}' added successfully") + + def remove_channel(self, channel_name: str): + """ + Dynamically stop and remove a running channel. + """ + with self._lock: + if channel_name not in self._channels: + logger.warning(f"[ChannelManager] Channel '{channel_name}' not found, nothing to remove") + return + logger.info(f"[ChannelManager] Removing channel '{channel_name}'...") + self.stop(channel_name) + logger.info(f"[ChannelManager] Channel '{channel_name}' removed successfully") + def _clear_singleton_cache(channel_name: str): """ diff --git a/channel/channel.py b/channel/channel.py index f01189e..385451b 100644 --- a/channel/channel.py +++ b/channel/channel.py @@ -13,12 +13,38 @@ class Channel(object): channel_type = "" NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE, ReplyType.IMAGE] + def __init__(self): + import threading + self._startup_event = threading.Event() + self._startup_error = None + self.cloud_mode = False # set to True by ChannelManager when running with cloud client + def startup(self): """ init channel """ raise NotImplementedError + def report_startup_success(self): + self._startup_error = None + self._startup_event.set() + + def report_startup_error(self, error: str): + self._startup_error = error + self._startup_event.set() + + def wait_startup(self, timeout: float = 3) -> (bool, str): + """ + Wait for channel startup result. + Returns (success: bool, error_msg: str). + """ + ready = self._startup_event.wait(timeout=timeout) + if not ready: + return True, "" + if self._startup_error: + return False, self._startup_error + return True, "" + def stop(self): """ stop channel gracefully, called before restart diff --git a/channel/chat_channel.py b/channel/chat_channel.py index 4c8494f..c2ee238 100644 --- a/channel/chat_channel.py +++ b/channel/chat_channel.py @@ -26,6 +26,7 @@ class ChatChannel(Channel): user_id = None # 登录的用户id def __init__(self): + super().__init__() # Instance-level attributes so each channel subclass has its own # independent session queue and lock. Previously these were class-level, # which caused contexts from one channel (e.g. Feishu) to be consumed diff --git a/channel/dingtalk/dingtalk_channel.py b/channel/dingtalk/dingtalk_channel.py index 31f7701..d572e35 100644 --- a/channel/dingtalk/dingtalk_channel.py +++ b/channel/dingtalk/dingtalk_channel.py @@ -115,6 +115,35 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): # Robot code cache (extracted from incoming messages) self._robot_code = None + def _open_connection(self, client): + """ + Open a DingTalk stream connection directly, bypassing SDK's internal error-swallowing. + Returns (connection_dict, error_str). On success error_str is empty; on failure + connection_dict is None and error_str contains a human-readable message. + """ + try: + resp = requests.post( + "https://api.dingtalk.com/v1.0/gateway/connections/open", + headers={"Content-Type": "application/json", "Accept": "application/json"}, + json={ + "clientId": client.credential.client_id, + "clientSecret": client.credential.client_secret, + "subscriptions": [{"type": "CALLBACK", + "topic": dingtalk_stream.chatbot.ChatbotMessage.TOPIC}], + "ua": "dingtalk-sdk-python/cow", + "localIp": "", + }, + timeout=10, + ) + body = resp.json() + if not resp.ok: + code = body.get("code", resp.status_code) + message = body.get("message", resp.reason) + return None, f"open connection failed: [{code}] {message}" + return body, "" + except Exception as e: + return None, f"open connection failed: {e}" + def startup(self): import asyncio self.dingtalk_client_id = conf().get('dingtalk_client_id') @@ -125,34 +154,80 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): self._stream_client = client client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self) logger.info("[DingTalk] ✅ Stream client initialized, ready to receive messages") + + # Run the connection loop ourselves instead of delegating to client.start(), + # so we can get detailed error messages and respond to stop() quickly. + import urllib.parse as _urlparse + import websockets as _ws + import json as _json + client.pre_start() _first_connect = True while self._running: + # Open connection using our own request so we get detailed error info. + connection, err_msg = self._open_connection(client) + + if connection is None: + if _first_connect: + logger.warning(f"[DingTalk] {err_msg}") + self.report_startup_error(err_msg) + _first_connect = False + else: + logger.warning(f"[DingTalk] {err_msg}, retrying in 10s...") + + # Interruptible sleep: checks _running every 100ms. + for _ in range(100): + if not self._running: + break + time.sleep(0.1) + continue + + if _first_connect: + logger.info("[DingTalk] ✅ Connected to DingTalk stream") + self.report_startup_success() + _first_connect = False + else: + logger.info("[DingTalk] Reconnected to DingTalk stream") + + # Run the WebSocket session in an asyncio loop. + uri = '%s?ticket=%s' % ( + connection['endpoint'], + _urlparse.quote_plus(connection['ticket']) + ) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self._event_loop = loop try: - if not _first_connect: - logger.info("[DingTalk] Reconnecting...") - _first_connect = False - loop.run_until_complete(client.start()) + async def _session(): + async with _ws.connect(uri) as websocket: + client.websocket = websocket + async for raw_message in websocket: + json_message = _json.loads(raw_message) + result = await client.route_message(json_message) + if result == dingtalk_stream.DingTalkStreamClient.TAG_DISCONNECT: + break + + loop.run_until_complete(_session()) except (KeyboardInterrupt, SystemExit): - logger.info("[DingTalk] Startup loop received stop signal, exiting") + logger.info("[DingTalk] Session loop received stop signal, exiting") break except Exception as e: if not self._running: break - logger.warning(f"[DingTalk] Stream connection error: {e}, reconnecting in 3s...") - time.sleep(3) + logger.warning(f"[DingTalk] Stream session error: {e}, reconnecting in 3s...") + for _ in range(30): + if not self._running: + break + time.sleep(0.1) finally: self._event_loop = None try: loop.close() except Exception: pass + logger.info("[DingTalk] Startup loop exited") def stop(self): - import asyncio logger.info("[DingTalk] stop() called, setting _running=False") self._running = False loop = self._event_loop diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index 4b8a300..b53886c 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -169,10 +169,20 @@ class FeiShuChanel(ChatChannel): context.verify_mode = ssl.CERT_NONE return context - # Give this thread its own event loop so lark SDK can call run_until_complete + # lark_oapi.ws.client captures the event loop at module-import time as a module- + # level global variable. When a previous ws thread is force-killed via ctypes its + # loop may still be marked as "running", which causes the next ws_client.start() + # call (in this new thread) to raise "This event loop is already running". + # Fix: replace the module-level loop with a brand-new, idle loop before starting. loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + try: + import lark_oapi.ws.client as _lark_ws_client_mod + _lark_ws_client_mod.loop = loop + except Exception: + pass + startup_error = None for attempt in range(2): try: if attempt == 1: @@ -202,8 +212,11 @@ class FeiShuChanel(ChatChannel): logger.warning(f"[FeiShu] SSL error: {error_msg}, retrying...") continue logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True) + startup_error = error_msg ssl_module.create_default_context = original_create_default_context break + if startup_error: + self.report_startup_error(startup_error) try: loop.close() except Exception: diff --git a/common/cloud_client.py b/common/cloud_client.py index de3af47..75bb588 100644 --- a/common/cloud_client.py +++ b/common/cloud_client.py @@ -20,6 +20,18 @@ import os chat_client: LinkAIClient +CHANNEL_ACTIONS = {"channel_create", "channel_update", "channel_delete"} + +# channelType -> config key mapping for app credentials +CREDENTIAL_MAP = { + "feishu": ("feishu_app_id", "feishu_app_secret"), + "dingtalk": ("dingtalk_client_id", "dingtalk_client_secret"), + "wechatmp": ("wechatmp_app_id", "wechatmp_app_secret"), + "wechatmp_service": ("wechatmp_app_id", "wechatmp_app_secret"), + "wechatcom_app": ("wechatcomapp_agent_id", "wechatcomapp_secret"), +} + + class CloudClient(LinkAIClient): def __init__(self, api_key: str, channel, host: str = ""): super().__init__(api_key, host) @@ -96,6 +108,12 @@ class CloudClient(LinkAIClient): if not self.client_id: return logger.info(f"[CloudClient] Loading remote config: {config}") + + action = config.get("action") + if action in CHANNEL_ACTIONS: + self._dispatch_channel_action(action, config.get("data", {})) + return + if config.get("enabled") != "Y": return @@ -123,50 +141,17 @@ class CloudClient(LinkAIClient): if config.get("model"): local_config["model"] = config.get("model") - # Channel configuration + # Channel configuration (legacy single-channel path) if config.get("channelType"): if local_config.get("channel_type") != config.get("channelType"): local_config["channel_type"] = config.get("channelType") need_restart_channel = True - # Channel-specific app credentials + # Channel-specific app credentials (legacy single-channel path) current_channel_type = local_config.get("channel_type", "") - - if config.get("app_id") is not None: - if current_channel_type == "feishu": - if local_config.get("feishu_app_id") != config.get("app_id"): - local_config["feishu_app_id"] = config.get("app_id") - need_restart_channel = True - elif current_channel_type == "dingtalk": - if local_config.get("dingtalk_client_id") != config.get("app_id"): - local_config["dingtalk_client_id"] = config.get("app_id") - need_restart_channel = True - elif current_channel_type in ("wechatmp", "wechatmp_service"): - if local_config.get("wechatmp_app_id") != config.get("app_id"): - local_config["wechatmp_app_id"] = config.get("app_id") - need_restart_channel = True - elif current_channel_type == "wechatcom_app": - if local_config.get("wechatcomapp_agent_id") != config.get("app_id"): - local_config["wechatcomapp_agent_id"] = config.get("app_id") - need_restart_channel = True - - if config.get("app_secret"): - if current_channel_type == "feishu": - if local_config.get("feishu_app_secret") != config.get("app_secret"): - local_config["feishu_app_secret"] = config.get("app_secret") - need_restart_channel = True - elif current_channel_type == "dingtalk": - if local_config.get("dingtalk_client_secret") != config.get("app_secret"): - local_config["dingtalk_client_secret"] = config.get("app_secret") - need_restart_channel = True - elif current_channel_type in ("wechatmp", "wechatmp_service"): - if local_config.get("wechatmp_app_secret") != config.get("app_secret"): - local_config["wechatmp_app_secret"] = config.get("app_secret") - need_restart_channel = True - elif current_channel_type == "wechatcom_app": - if local_config.get("wechatcomapp_secret") != config.get("app_secret"): - local_config["wechatcomapp_secret"] = config.get("app_secret") - need_restart_channel = True + if self._set_channel_credentials(local_config, current_channel_type, + config.get("app_id"), config.get("app_secret")): + need_restart_channel = True if config.get("admin_password"): if not pconf("Godcmd"): @@ -190,12 +175,169 @@ class CloudClient(LinkAIClient): if pconf("linkai")["midjourney"]: pconf("linkai")["midjourney"]["use_image_create_prefix"] = False - # Save configuration to config.json file self._save_config_to_file(local_config) if need_restart_channel: self._restart_channel(local_config.get("channel_type", "")) + # ------------------------------------------------------------------ + # channel CRUD operations + # ------------------------------------------------------------------ + def _dispatch_channel_action(self, action: str, data: dict): + channel_type = data.get("channelType") + if not channel_type: + logger.warning(f"[CloudClient] Channel action '{action}' missing channelType, data={data}") + return + logger.info(f"[CloudClient] Channel action: {action}, channelType={channel_type}") + + if action == "channel_create": + self._handle_channel_create(channel_type, data) + elif action == "channel_update": + self._handle_channel_update(channel_type, data) + elif action == "channel_delete": + self._handle_channel_delete(channel_type, data) + + def _handle_channel_create(self, channel_type: str, data: dict): + local_config = conf() + self._set_channel_credentials(local_config, channel_type, + data.get("appId"), data.get("appSecret")) + self._add_channel_type(local_config, channel_type) + self._save_config_to_file(local_config) + + if self.channel_mgr: + threading.Thread( + target=self._do_add_channel, args=(channel_type,), daemon=True + ).start() + + def _handle_channel_update(self, channel_type: str, data: dict): + local_config = conf() + enabled = data.get("enabled", "Y") + + self._set_channel_credentials(local_config, channel_type, + data.get("appId"), data.get("appSecret")) + if enabled == "N": + self._remove_channel_type(local_config, channel_type) + else: + # Ensure channel_type is persisted even if this channel was not + # previously listed (e.g. update used as implicit create). + self._add_channel_type(local_config, channel_type) + self._save_config_to_file(local_config) + + if not self.channel_mgr: + return + + if enabled == "N": + threading.Thread( + target=self._do_remove_channel, args=(channel_type,), daemon=True + ).start() + else: + threading.Thread( + target=self._do_restart_channel, args=(self.channel_mgr, channel_type), daemon=True + ).start() + + def _handle_channel_delete(self, channel_type: str, data: dict): + local_config = conf() + self._clear_channel_credentials(local_config, channel_type) + self._remove_channel_type(local_config, channel_type) + self._save_config_to_file(local_config) + + if self.channel_mgr: + threading.Thread( + target=self._do_remove_channel, args=(channel_type,), daemon=True + ).start() + + # ------------------------------------------------------------------ + # channel credentials helpers + # ------------------------------------------------------------------ + @staticmethod + def _set_channel_credentials(local_config: dict, channel_type: str, + app_id, app_secret) -> bool: + """ + Write app_id / app_secret into the correct config keys for *channel_type*. + Returns True if any value actually changed. + """ + cred = CREDENTIAL_MAP.get(channel_type) + if not cred: + return False + id_key, secret_key = cred + changed = False + if app_id is not None and local_config.get(id_key) != app_id: + local_config[id_key] = app_id + changed = True + if app_secret is not None and local_config.get(secret_key) != app_secret: + local_config[secret_key] = app_secret + changed = True + return changed + + @staticmethod + def _clear_channel_credentials(local_config: dict, channel_type: str): + cred = CREDENTIAL_MAP.get(channel_type) + if not cred: + return + id_key, secret_key = cred + local_config.pop(id_key, None) + local_config.pop(secret_key, None) + + # ------------------------------------------------------------------ + # channel_type list helpers + # ------------------------------------------------------------------ + @staticmethod + def _parse_channel_types(local_config: dict) -> list: + raw = local_config.get("channel_type", "") + if isinstance(raw, list): + return [ch.strip() for ch in raw if ch.strip()] + if isinstance(raw, str): + return [ch.strip() for ch in raw.split(",") if ch.strip()] + return [] + + @staticmethod + def _add_channel_type(local_config: dict, channel_type: str): + types = CloudClient._parse_channel_types(local_config) + if channel_type not in types: + types.append(channel_type) + local_config["channel_type"] = ", ".join(types) + + @staticmethod + def _remove_channel_type(local_config: dict, channel_type: str): + types = CloudClient._parse_channel_types(local_config) + if channel_type in types: + types.remove(channel_type) + local_config["channel_type"] = ", ".join(types) + + # ------------------------------------------------------------------ + # channel manager thread helpers + # ------------------------------------------------------------------ + def _do_add_channel(self, channel_type: str): + try: + self.channel_mgr.add_channel(channel_type) + logger.info(f"[CloudClient] Channel '{channel_type}' added successfully") + except Exception as e: + logger.error(f"[CloudClient] Failed to add channel '{channel_type}': {e}") + self.send_channel_status(channel_type, "error", str(e)) + return + self._report_channel_startup(channel_type) + + def _do_remove_channel(self, channel_type: str): + try: + self.channel_mgr.remove_channel(channel_type) + logger.info(f"[CloudClient] Channel '{channel_type}' removed successfully") + except Exception as e: + logger.error(f"[CloudClient] Failed to remove channel '{channel_type}': {e}") + + def _report_channel_startup(self, channel_type: str): + """Wait for channel startup result and report to cloud.""" + ch = self.channel_mgr.get_channel(channel_type) + if not ch: + self.send_channel_status(channel_type, "error", "channel instance not found") + return + success, error = ch.wait_startup(timeout=3) + if success: + logger.info(f"[CloudClient] Channel '{channel_type}' connected, reporting status") + self.send_channel_status(channel_type, "connected") + else: + logger.warning(f"[CloudClient] Channel '{channel_type}' startup failed: {error}") + self.send_channel_status(channel_type, "error", error) + # ------------------------------------------------------------------ # skill callback # ------------------------------------------------------------------ @@ -279,13 +421,15 @@ class CloudClient(LinkAIClient): """ try: mgr.restart(new_channel_type) - # Update the client's channel reference if mgr.channel: self.channel = mgr.channel self.client_type = mgr.channel.channel_type logger.info(f"[CloudClient] Channel reference updated to '{new_channel_type}'") except Exception as e: logger.error(f"[CloudClient] Channel restart failed: {e}") + self.send_channel_status(new_channel_type, "error", str(e)) + return + self._report_channel_startup(new_channel_type) # ------------------------------------------------------------------ # config persistence @@ -322,6 +466,21 @@ def start(channel, channel_mgr=None): time.sleep(1.5) if chat_client.client_id: logger.info("[CloudClient] Console: https://link-ai.tech/console/clients") + if channel_mgr: + channel_mgr.cloud_mode = True + threading.Thread(target=_report_existing_channels, args=(chat_client, channel_mgr), daemon=True).start() + + +def _report_existing_channels(client: CloudClient, mgr): + """Report status for all channels that were started before cloud client connected.""" + try: + for name, ch in list(mgr._channels.items()): + if name == "web": + continue + ch.cloud_mode = True + client._report_channel_startup(name) + except Exception as e: + logger.warning(f"[CloudClient] Failed to report existing channel status: {e}") def _build_config():