diff --git a/README.md b/README.md index 09952bd..358cd1f 100644 --- a/README.md +++ b/README.md @@ -608,10 +608,12 @@ API Key创建:在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn) 以下对可接入通道的配置方式进行说明,应用通道代码在项目的 `channel/` 目录下。 +支持同时可接入多个通道,配置时可通过逗号进行分割,例如 `"channel_type": "feishu,dingtalk"`。 +
1. Web -项目启动后默认运行Web通道,配置如下: +项目启动后会默认运行Web控制台,配置如下: ```json { diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index d1b5283..31c9359 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -501,7 +501,7 @@ class AgentStreamExecutor: # Prepare messages messages = self._prepare_messages() - logger.debug(f"Sending {len(messages)} messages to LLM") + logger.info(f"Sending {len(messages)} messages to LLM") # Prepare tool definitions (OpenAI/Claude format) tools_schema = None diff --git a/app.py b/app.py index cc4dd33..3627208 100644 --- a/app.py +++ b/app.py @@ -13,7 +13,6 @@ from plugins import * import threading -# Global channel manager for restart support _channel_mgr = None @@ -21,92 +20,130 @@ def get_channel_manager(): return _channel_mgr +def _parse_channel_type(raw) -> list: + """ + Parse channel_type config value into a list of channel names. + Supports: + - single string: "feishu" + - comma-separated string: "feishu, dingtalk" + - list: ["feishu", "dingtalk"] + """ + if isinstance(raw, list): + return [ch.strip() for ch in raw if ch.strip()] + if isinstance(raw, str): + return [ch.strip() for ch in raw.split(",") if ch.strip()] + return [] + + class ChannelManager: """ - Manage the lifecycle of a channel, supporting restart from sub-threads. - The channel.startup() runs in a daemon thread so that the main thread - remains available and a new channel can be started at any time. + Manage the lifecycle of multiple channels running concurrently. + Each channel.startup() runs in its own daemon thread. + The web channel is started as default console unless explicitly disabled. """ def __init__(self): - self._channel = None - self._channel_thread = None + self._channels = {} # channel_name -> channel instance + self._threads = {} # channel_name -> thread + self._primary_channel = None self._lock = threading.Lock() @property def channel(self): - return self._channel + """Return the primary (first non-web) channel for backward compatibility.""" + return self._primary_channel - def start(self, channel_name: str, first_start: bool = False): + def get_channel(self, channel_name: str): + return self._channels.get(channel_name) + + def start(self, channel_names: list, first_start: bool = False): """ - Create and start a channel in a sub-thread. + Create and start one or more channels in sub-threads. If first_start is True, plugins and linkai client will also be initialized. """ with self._lock: - channel = channel_factory.create_channel(channel_name) - self._channel = channel + channels = [] + for name in channel_names: + ch = channel_factory.create_channel(name) + self._channels[name] = ch + channels.append((name, ch)) + if self._primary_channel is None and name != "web": + self._primary_channel = ch + + if self._primary_channel is None and channels: + self._primary_channel = channels[0][1] if first_start: - if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web", - "wechatmp_service", "wechatcom_app", "wework", - const.FEISHU, const.DINGTALK]: - PluginManager().load_plugins() + PluginManager().load_plugins() if conf().get("use_linkai"): try: from common import cloud_client - threading.Thread(target=cloud_client.start, args=(channel, self), daemon=True).start() - except Exception as e: + threading.Thread( + target=cloud_client.start, + args=(self._primary_channel, self), + daemon=True, + ).start() + except Exception: pass - # Run channel.startup() in a daemon thread so we can restart later - self._channel_thread = threading.Thread( - target=self._run_channel, args=(channel,), daemon=True - ) - self._channel_thread.start() - logger.debug(f"[ChannelManager] Channel '{channel_name}' started in sub-thread") + # Start web console first so its logs print cleanly, + # then start remaining channels after a brief pause. + web_entry = None + other_entries = [] + for entry in channels: + if entry[0] == "web": + web_entry = entry + else: + other_entries.append(entry) - def _run_channel(self, channel): + ordered = ([web_entry] if web_entry else []) + other_entries + for i, (name, ch) in enumerate(ordered): + if i > 0 and name != "web": + time.sleep(0.1) + t = threading.Thread(target=self._run_channel, args=(name, ch), daemon=True) + self._threads[name] = t + t.start() + logger.debug(f"[ChannelManager] Channel '{name}' started in sub-thread") + + def _run_channel(self, name: str, channel): try: channel.startup() except Exception as e: - logger.error(f"[ChannelManager] Channel startup error: {e}") + logger.error(f"[ChannelManager] Channel '{name}' startup error: {e}") logger.exception(e) - def stop(self): + def stop(self, channel_name: str = None): """ - Stop the current channel. Since most channel startup() methods block - on an HTTP server or stream client, we stop by terminating the thread. + Stop channel(s). If channel_name is given, stop only that channel; + otherwise stop all channels. """ with self._lock: - if self._channel is None: - return - channel_type = getattr(self._channel, 'channel_type', 'unknown') - logger.info(f"[ChannelManager] Stopping channel '{channel_type}'...") - - # Try graceful stop if channel implements it - try: - if hasattr(self._channel, 'stop'): - self._channel.stop() - except Exception as e: - logger.warning(f"[ChannelManager] Error during channel stop: {e}") - - self._channel = None - self._channel_thread = None + names = [channel_name] if channel_name else list(self._channels.keys()) + for name in names: + ch = self._channels.pop(name, None) + self._threads.pop(name, None) + if ch is None: + continue + logger.info(f"[ChannelManager] Stopping channel '{name}'...") + try: + if hasattr(ch, 'stop'): + ch.stop() + except Exception as e: + logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}") + if channel_name and self._primary_channel is self._channels.get(channel_name): + self._primary_channel = None def restart(self, new_channel_name: str): """ - Restart the channel with a new channel type. + Restart a single channel with a new channel type. Can be called from any thread (e.g. linkai config callback). """ logger.info(f"[ChannelManager] Restarting channel to '{new_channel_name}'...") - self.stop() - - # Clear singleton cache so a fresh channel instance is created + self.stop(new_channel_name) _clear_singleton_cache(new_channel_name) - - time.sleep(1) # Brief pause to allow resources to release - self.start(new_channel_name, first_start=False) + time.sleep(1) + self.start([new_channel_name], first_start=False) logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully") @@ -130,14 +167,11 @@ def _clear_singleton_cache(channel_name: str): module_path = cls_map.get(channel_name) if not module_path: return - # The singleton decorator stores instances in a closure dict keyed by class. - # We need to find the actual class and clear it from the closure. try: parts = module_path.rsplit(".", 1) module_name, class_name = parts[0], parts[1] import importlib module = importlib.import_module(module_name) - # The module-level name is the wrapper function from @singleton wrapper = getattr(module, class_name, None) if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__: for cell in wrapper.__closure__: @@ -176,17 +210,28 @@ def run(): # kill signal sigterm_handler_wrap(signal.SIGTERM) - # create channel - channel_name = conf().get("channel_type", "wx") + # Parse channel_type into a list + raw_channel = conf().get("channel_type", "wx") if "--cmd" in sys.argv: - channel_name = "terminal" + channel_names = ["terminal"] + else: + channel_names = _parse_channel_type(raw_channel) + if not channel_names: + channel_names = ["wx"] - if channel_name == "wxy": + if "wxy" in channel_names: os.environ["WECHATY_LOG"] = "warn" + # Auto-start web console unless explicitly disabled + web_console_enabled = conf().get("web_console", True) + if web_console_enabled and "web" not in channel_names: + channel_names.append("web") + + logger.info(f"[App] Starting channels: {channel_names}") + _channel_mgr = ChannelManager() - _channel_mgr.start(channel_name, first_start=True) + _channel_mgr.start(channel_names, first_start=True) while True: time.sleep(1) diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index 5154eac..ebf539b 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -150,7 +150,7 @@ class AgentInitializer: if saved: with agent.messages_lock: agent.messages = saved - logger.info( + logger.debug( f"[AgentInitializer] Restored {len(saved)} messages " f"({restore_turns} turns cap) for session={session_id}" ) diff --git a/channel/chat_channel.py b/channel/chat_channel.py index af3607d..0f5f45e 100644 --- a/channel/chat_channel.py +++ b/channel/chat_channel.py @@ -24,11 +24,16 @@ handler_pool = ThreadPoolExecutor(max_workers=8) # 处理消息的线程池 class ChatChannel(Channel): name = None # 登录的用户名 user_id = None # 登录的用户id - futures = {} # 记录每个session_id提交到线程池的future对象, 用于重置会话时把没执行的future取消掉,正在执行的不会被取消 - sessions = {} # 用于控制并发,每个session_id同时只能有一个context在处理 - lock = threading.Lock() # 用于控制对sessions的访问 def __init__(self): + # Instance-level attributes so each channel subclass has its own + # independent session queue and lock. Previously these were class-level, + # which caused contexts from one channel (e.g. Feishu) to be consumed + # by another channel's consume() thread (e.g. Web), leading to errors + # like "No request_id found in context". + self.futures = {} + self.sessions = {} + self.lock = threading.Lock() _thread = threading.Thread(target=self.consume) _thread.setDaemon(True) _thread.start() diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 68f604c..f167c70 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -1,9 +1,15 @@ -import sys import time -import web import json +import logging +import mimetypes +import os +import threading +import time import uuid from queue import Queue, Empty + +import web + from bridge.context import * from bridge.reply import Reply, ReplyType from channel.chat_channel import ChatChannel, check_prefix @@ -11,20 +17,17 @@ from channel.chat_message import ChatMessage from common.log import logger from common.singleton import singleton from config import conf -import os -import mimetypes -import threading -import logging + class WebMessage(ChatMessage): def __init__( - self, - msg_id, - content, - ctype=ContextType.TEXT, - from_user_id="User", - to_user_id="Chatgpt", - other_user_id="Chatgpt", + self, + msg_id, + content, + ctype=ContextType.TEXT, + from_user_id="User", + to_user_id="Chatgpt", + other_user_id="Chatgpt", ): self.msg_id = msg_id self.ctype = ctype @@ -38,7 +41,7 @@ class WebMessage(ChatMessage): class WebChannel(ChatChannel): NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE] _instance = None - + # def __new__(cls): # if cls._instance is None: # cls._instance = super(WebChannel, cls).__new__(cls) @@ -47,12 +50,11 @@ class WebChannel(ChatChannel): def __init__(self): super().__init__() self.msg_id_counter = 0 - self.session_queues = {} # session_id -> Queue (fallback polling) - self.request_to_session = {} # request_id -> session_id - self.sse_queues = {} # request_id -> Queue (SSE streaming) + self.session_queues = {} # session_id -> Queue (fallback polling) + self.request_to_session = {} # request_id -> session_id + self.sse_queues = {} # request_id -> Queue (SSE streaming) self._http_server = None - def _generate_msg_id(self): """生成唯一的消息ID""" self.msg_id_counter += 1 @@ -111,6 +113,7 @@ class WebChannel(ChatChannel): def _make_sse_callback(self, request_id: str): """Build an on_event callback that pushes agent stream events into the SSE queue.""" + def on_event(event: dict): if request_id not in self.sse_queues: return @@ -237,28 +240,28 @@ class WebChannel(ChatChannel): data = web.data() json_data = json.loads(data) session_id = json_data.get('session_id') - + if not session_id or session_id not in self.session_queues: return json.dumps({"status": "error", "message": "Invalid session ID"}) - + # 尝试从队列获取响应,不等待 try: # 使用peek而不是get,这样如果前端没有成功处理,下次还能获取到 response = self.session_queues[session_id].get(block=False) - + # 返回响应,包含请求ID以区分不同请求 return json.dumps({ - "status": "success", + "status": "success", "has_content": True, "content": response["content"], "request_id": response["request_id"], "timestamp": response["timestamp"] }) - + except Empty: # 没有新响应 return json.dumps({"status": "success", "has_content": False}) - + except Exception as e: logger.error(f"Error polling response: {e}") return json.dumps({"status": "error", "message": str(e)}) @@ -271,9 +274,10 @@ class WebChannel(ChatChannel): def startup(self): port = conf().get("web_port", 9899) - + # 打印可用渠道类型提示 - logger.info("[WebChannel] 当前channel为web,可修改 config.json 配置文件中的 channel_type 字段进行切换。全部可用类型为:") + logger.info( + "[WebChannel] 全部可用通道如下,可修改 config.json 配置文件中的 channel_type 字段进行切换,多个通道用逗号分隔:") logger.info("[WebChannel] 1. web - 网页") logger.info("[WebChannel] 2. terminal - 终端") logger.info("[WebChannel] 3. feishu - 飞书") @@ -281,16 +285,16 @@ class WebChannel(ChatChannel): logger.info("[WebChannel] 5. wechatcom_app - 企微自建应用") logger.info("[WebChannel] 6. wechatmp - 个人公众号") logger.info("[WebChannel] 7. wechatmp_service - 企业公众号") + logger.info("[WebChannel] ✅ Web控制台已运行") logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}") logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port} (请将YOUR_IP替换为服务器IP)") - logger.info("[WebChannel] ✅ Web对话网页已运行") - + # 确保静态文件目录存在 static_dir = os.path.join(os.path.dirname(__file__), 'static') if not os.path.exists(static_dir): os.makedirs(static_dir) logger.debug(f"[WebChannel] Created static directory: {static_dir}") - + urls = ( '/', 'RootHandler', '/message', 'MessageHandler', @@ -307,14 +311,14 @@ class WebChannel(ChatChannel): '/assets/(.*)', 'AssetsHandler', ) app = web.application(urls, globals(), autoreload=False) - + # 完全禁用web.py的HTTP日志输出 web.httpserver.LogMiddleware.log = lambda self, status, environ: None - + # 配置web.py的日志级别为ERROR logging.getLogger("web").setLevel(logging.ERROR) logging.getLogger("web.httpserver").setLevel(logging.ERROR) - + # Build WSGI app with middleware (same as runsimple but without print) func = web.httpserver.StaticMiddleware(app.wsgifunc()) func = web.httpserver.LogMiddleware(func) diff --git a/config.py b/config.py index b0af674..f50072e 100644 --- a/config.py +++ b/config.py @@ -160,7 +160,8 @@ available_setting = { # chatgpt指令自定义触发词 "clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头 # channel配置 - "channel_type": "", # 通道类型,支持:{wx,wxy,terminal,wechatmp,wechatmp_service,wechatcom_app,dingtalk} + "channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wechatmp,wechatmp_service,wechatcom_app + "web_console": True, # 是否自动启动Web控制台(默认启动)。设为False可禁用 "subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app "debug": False, # 是否开启debug模式,开启后会打印更多日志 "appdata_dir": "", # 数据目录