diff --git a/app.py b/app.py index dd18203..c3315c1 100644 --- a/app.py +++ b/app.py @@ -118,22 +118,51 @@ class ChannelManager: Stop channel(s). If channel_name is given, stop only that channel; otherwise stop all channels. """ + # Pop under lock, then stop outside lock to avoid deadlock with self._lock: names = [channel_name] if channel_name else list(self._channels.keys()) + to_stop = [] for name in names: ch = self._channels.pop(name, None) - self._threads.pop(name, None) - if ch is None: - continue - logger.info(f"[ChannelManager] Stopping channel '{name}'...") - try: - if hasattr(ch, 'stop'): - ch.stop() - except Exception as e: - logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}") + th = self._threads.pop(name, None) + to_stop.append((name, ch, th)) if channel_name and self._primary_channel is self._channels.get(channel_name): self._primary_channel = None + for name, ch, th in to_stop: + if ch is None: + logger.warning(f"[ChannelManager] Channel '{name}' not found in managed channels") + if th and th.is_alive(): + self._interrupt_thread(th, name) + continue + logger.info(f"[ChannelManager] Stopping channel '{name}'...") + try: + if hasattr(ch, 'stop'): + ch.stop() + except Exception as e: + logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}") + if th and th.is_alive(): + self._interrupt_thread(th, name) + + @staticmethod + def _interrupt_thread(th: threading.Thread, name: str): + """Raise SystemExit in target thread to break blocking loops like start_forever.""" + import ctypes + try: + tid = th.ident + if tid is None: + return + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_ulong(tid), ctypes.py_object(SystemExit) + ) + if res == 1: + logger.info(f"[ChannelManager] Interrupted thread for channel '{name}'") + elif res > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(tid), None) + logger.warning(f"[ChannelManager] Failed to interrupt thread for channel '{name}'") + except Exception as e: + logger.warning(f"[ChannelManager] Thread interrupt error for '{name}': {e}") + def restart(self, new_channel_name: str): """ Restart a single channel with a new channel type. diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index f85357d..62df5c8 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -145,7 +145,7 @@ class AgentInitializer: # after a restart. The full max_turns budget is reserved for the # live conversation that follows. max_turns = conf().get("agent_max_context_turns", 30) - restore_turns = min(6, max(1, max_turns // 3)) + restore_turns = max(4, max_turns // 5) saved = store.load_messages(session_id, max_turns=restore_turns) if saved: with agent.messages_lock: diff --git a/channel/dingtalk/dingtalk_channel.py b/channel/dingtalk/dingtalk_channel.py index 4c41d3b..31f7701 100644 --- a/channel/dingtalk/dingtalk_channel.py +++ b/channel/dingtalk/dingtalk_channel.py @@ -101,6 +101,8 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600)) self._stream_client = None + self._running = False + self._event_loop = None logger.debug("[DingTalk] client_id={}, client_secret={} ".format( self.dingtalk_client_id, self.dingtalk_client_secret)) # 无需群校验和前缀 @@ -114,21 +116,54 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): self._robot_code = None def startup(self): + import asyncio + self.dingtalk_client_id = conf().get('dingtalk_client_id') + self.dingtalk_client_secret = conf().get('dingtalk_client_secret') + self._running = True credential = dingtalk_stream.Credential(self.dingtalk_client_id, self.dingtalk_client_secret) client = dingtalk_stream.DingTalkStreamClient(credential) self._stream_client = client client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self) - logger.info("[DingTalk] ✅ Stream connected, ready to receive messages") - client.start_forever() + logger.info("[DingTalk] ✅ Stream client initialized, ready to receive messages") + _first_connect = True + while self._running: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self._event_loop = loop + try: + if not _first_connect: + logger.info("[DingTalk] Reconnecting...") + _first_connect = False + loop.run_until_complete(client.start()) + except (KeyboardInterrupt, SystemExit): + logger.info("[DingTalk] Startup loop received stop signal, exiting") + break + except Exception as e: + if not self._running: + break + logger.warning(f"[DingTalk] Stream connection error: {e}, reconnecting in 3s...") + time.sleep(3) + finally: + self._event_loop = None + try: + loop.close() + except Exception: + pass + logger.info("[DingTalk] Startup loop exited") def stop(self): - if self._stream_client: + import asyncio + logger.info("[DingTalk] stop() called, setting _running=False") + self._running = False + loop = self._event_loop + if loop and not loop.is_closed(): try: - self._stream_client.stop() - logger.info("[DingTalk] Stream client stopped") + loop.call_soon_threadsafe(loop.stop) + logger.info("[DingTalk] Sent stop signal to event loop") except Exception as e: - logger.warning(f"[DingTalk] Error stopping stream client: {e}") - self._stream_client = None + logger.warning(f"[DingTalk] Error stopping event loop: {e}") + self._stream_client = None + logger.info("[DingTalk] stop() completed") def get_access_token(self): """ @@ -465,23 +500,21 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): async def process(self, callback: dingtalk_stream.CallbackMessage): try: incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) - + # 缓存 robot_code,用于后续图片下载 if hasattr(incoming_message, 'robot_code'): self._robot_code_cache = incoming_message.robot_code - - # Debug: 打印完整的 event 数据 - logger.debug(f"[DingTalk] ===== Incoming Message Debug =====") - logger.debug(f"[DingTalk] callback.data keys: {callback.data.keys() if hasattr(callback.data, 'keys') else 'N/A'}") - logger.debug(f"[DingTalk] incoming_message attributes: {dir(incoming_message)}") - logger.debug(f"[DingTalk] robot_code: {getattr(incoming_message, 'robot_code', 'N/A')}") - logger.debug(f"[DingTalk] chatbot_corp_id: {getattr(incoming_message, 'chatbot_corp_id', 'N/A')}") - logger.debug(f"[DingTalk] chatbot_user_id: {getattr(incoming_message, 'chatbot_user_id', 'N/A')}") - logger.debug(f"[DingTalk] conversation_id: {getattr(incoming_message, 'conversation_id', 'N/A')}") - logger.debug(f"[DingTalk] Raw callback.data: {callback.data}") - logger.debug(f"[DingTalk] =====================================") - - image_download_handler = self # 传入方法所在的类实例 + + # Filter out stale messages from before channel startup (offline backlog) + create_at = getattr(incoming_message, 'create_at', None) + if create_at: + msg_age_s = time.time() - int(create_at) / 1000 + if msg_age_s > 60: + logger.warning(f"[DingTalk] stale msg filtered (age={msg_age_s:.0f}s), " + f"msg_id={getattr(incoming_message, 'message_id', 'N/A')}") + return AckMessage.STATUS_OK, 'OK' + + image_download_handler = self dingtalk_msg = DingTalkMessage(incoming_message, image_download_handler) if dingtalk_msg.is_group: @@ -490,8 +523,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): self.handle_single(dingtalk_msg) return AckMessage.STATUS_OK, 'OK' except Exception as e: - logger.error(f"[DingTalk] process error: {e}") - logger.exception(e) # 打印完整堆栈跟踪 + logger.error(f"[DingTalk] process error: {e}", exc_info=True) return AckMessage.STATUS_SYSTEM_EXCEPTION, 'ERROR' @time_checker diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index b9f4908..4b8a300 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -61,6 +61,8 @@ class FeiShuChanel(ChatChannel): # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(60 * 60 * 7.1) self._http_server = None + self._ws_client = None + self._ws_thread = None logger.debug("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format( self.feishu_app_id, self.feishu_app_secret, self.feishu_token, self.feishu_event_mode)) # 无需群校验和前缀 @@ -73,12 +75,37 @@ class FeiShuChanel(ChatChannel): raise Exception("lark_oapi not installed") def startup(self): + self.feishu_app_id = conf().get('feishu_app_id') + self.feishu_app_secret = conf().get('feishu_app_secret') + self.feishu_token = conf().get('feishu_token') + self.feishu_event_mode = conf().get('feishu_event_mode', 'websocket') if self.feishu_event_mode == 'websocket': self._startup_websocket() else: self._startup_webhook() def stop(self): + import ctypes + logger.info("[FeiShu] stop() called") + ws_client = self._ws_client + self._ws_client = None + ws_thread = self._ws_thread + self._ws_thread = None + # Interrupt the ws thread first so its blocking start() unblocks + if ws_thread and ws_thread.is_alive(): + try: + tid = ws_thread.ident + if tid: + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_ulong(tid), ctypes.py_object(SystemExit) + ) + if res == 1: + logger.info("[FeiShu] Interrupted ws thread via ctypes") + elif res > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(tid), None) + except Exception as e: + logger.warning(f"[FeiShu] Error interrupting ws thread: {e}") + # lark.ws.Client has no stop() method; thread interruption above is sufficient if self._http_server: try: self._http_server.stop() @@ -86,6 +113,7 @@ class FeiShuChanel(ChatChannel): except Exception as e: logger.warning(f"[FeiShu] Error stopping HTTP server: {e}") self._http_server = None + logger.info("[FeiShu] stop() completed") def _startup_webhook(self): """启动HTTP服务器接收事件(webhook模式)""" @@ -129,29 +157,26 @@ class FeiShuChanel(ChatChannel): .register_p2_im_message_receive_v1(handle_message_event) \ .build() - # 尝试连接,如果遇到SSL错误则自动禁用证书验证 def start_client_with_retry(): - """启动websocket客户端,自动处理SSL证书错误""" - # 全局禁用SSL证书验证(在导入lark_oapi之前设置) + """Run ws client in this thread with its own event loop to avoid conflicts.""" + import asyncio import ssl as ssl_module - - # 保存原始的SSL上下文创建方法 original_create_default_context = ssl_module.create_default_context def create_unverified_context(*args, **kwargs): - """创建一个不验证证书的SSL上下文""" context = original_create_default_context(*args, **kwargs) context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context - # 尝试正常连接,如果失败则禁用SSL验证 + # Give this thread its own event loop so lark SDK can call run_until_complete + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + for attempt in range(2): try: if attempt == 1: - # 第二次尝试:禁用SSL验证 - logger.warning("[FeiShu] SSL certificate verification disabled due to certificate error. " - "This may happen when using corporate proxy or self-signed certificates.") + logger.warning("[FeiShu] Retrying with SSL verification disabled...") ssl_module.create_default_context = create_unverified_context ssl_module._create_unverified_context = create_unverified_context @@ -161,37 +186,34 @@ class FeiShuChanel(ChatChannel): event_handler=event_handler, log_level=lark.LogLevel.WARNING ) - + self._ws_client = ws_client logger.debug("[FeiShu] Websocket client starting...") ws_client.start() - # 如果成功启动,跳出循环 break + except (SystemExit, KeyboardInterrupt): + logger.info("[FeiShu] Websocket thread received stop signal") + break except Exception as e: error_msg = str(e) - # 检查是否是SSL证书验证错误 - is_ssl_error = "CERTIFICATE_VERIFY_FAILED" in error_msg or "certificate verify failed" in error_msg.lower() - + is_ssl_error = ("CERTIFICATE_VERIFY_FAILED" in error_msg + or "certificate verify failed" in error_msg.lower()) if is_ssl_error and attempt == 0: - # 第一次遇到SSL错误,记录日志并继续循环(下次会禁用验证) - logger.warning(f"[FeiShu] SSL certificate verification failed: {error_msg}") - logger.info("[FeiShu] Retrying connection with SSL verification disabled...") + logger.warning(f"[FeiShu] SSL error: {error_msg}, retrying...") continue - else: - # 其他错误或禁用验证后仍失败,抛出异常 - logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True) - # 恢复原始方法 - ssl_module.create_default_context = original_create_default_context - raise + logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True) + ssl_module.create_default_context = original_create_default_context + break + try: + loop.close() + except Exception: + pass + logger.info("[FeiShu] Websocket thread exited") - # 注意:不恢复原始方法,因为ws_client.start()会持续运行 - - # 在新线程中启动客户端,避免阻塞主线程 ws_thread = threading.Thread(target=start_client_with_retry, daemon=True) + self._ws_thread = ws_thread ws_thread.start() - - # 保持主线程运行 - logger.info("[FeiShu] ✅ Websocket connected, ready to receive messages") + logger.info("[FeiShu] ✅ Websocket thread started, ready to receive messages") ws_thread.join() def _handle_message_event(self, event: dict): @@ -212,6 +234,15 @@ class FeiShuChanel(ChatChannel): return self.receivedMsgs[msg_id] = True + # Filter out stale messages from before channel startup (offline backlog) + import time as _time + create_time_ms = msg.get("create_time") + if create_time_ms: + msg_age_s = _time.time() - int(create_time_ms) / 1000 + if msg_age_s > 60: + logger.warning(f"[FeiShu] stale msg filtered (age={msg_age_s:.0f}s), msg_id={msg_id}") + return + is_group = False chat_type = msg.get("chat_type") diff --git a/channel/web/chat.html b/channel/web/chat.html index a9aabea..886a857 100644 --- a/channel/web/chat.html +++ b/channel/web/chat.html @@ -427,14 +427,35 @@
View, enable, or disable agent skills
-Loading skills...
-Skills will be displayed here after loading
+Loading skills...
+Skills will be displayed here after loading
+View and manage messaging channels
+ + @@ -582,6 +610,32 @@ + + +