mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-03-02 08:09:43 +08:00
Merge pull request #2678 from zhayujie/feat-multi-channel
feat: support multi-channel
This commit is contained in:
@@ -608,10 +608,12 @@ API Key创建:在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn)
|
||||
|
||||
以下对可接入通道的配置方式进行说明,应用通道代码在项目的 `channel/` 目录下。
|
||||
|
||||
支持同时可接入多个通道,配置时可通过逗号进行分割,例如 `"channel_type": "feishu,dingtalk"`。
|
||||
|
||||
<details>
|
||||
<summary>1. Web</summary>
|
||||
|
||||
项目启动后默认运行Web通道,配置如下:
|
||||
项目启动后会默认运行Web控制台,配置如下:
|
||||
|
||||
```json
|
||||
{
|
||||
|
||||
@@ -501,7 +501,7 @@ class AgentStreamExecutor:
|
||||
|
||||
# Prepare messages
|
||||
messages = self._prepare_messages()
|
||||
logger.debug(f"Sending {len(messages)} messages to LLM")
|
||||
logger.info(f"Sending {len(messages)} messages to LLM")
|
||||
|
||||
# Prepare tool definitions (OpenAI/Claude format)
|
||||
tools_schema = None
|
||||
|
||||
159
app.py
159
app.py
@@ -13,7 +13,6 @@ from plugins import *
|
||||
import threading
|
||||
|
||||
|
||||
# Global channel manager for restart support
|
||||
_channel_mgr = None
|
||||
|
||||
|
||||
@@ -21,92 +20,130 @@ def get_channel_manager():
|
||||
return _channel_mgr
|
||||
|
||||
|
||||
def _parse_channel_type(raw) -> list:
|
||||
"""
|
||||
Parse channel_type config value into a list of channel names.
|
||||
Supports:
|
||||
- single string: "feishu"
|
||||
- comma-separated string: "feishu, dingtalk"
|
||||
- list: ["feishu", "dingtalk"]
|
||||
"""
|
||||
if isinstance(raw, list):
|
||||
return [ch.strip() for ch in raw if ch.strip()]
|
||||
if isinstance(raw, str):
|
||||
return [ch.strip() for ch in raw.split(",") if ch.strip()]
|
||||
return []
|
||||
|
||||
|
||||
class ChannelManager:
|
||||
"""
|
||||
Manage the lifecycle of a channel, supporting restart from sub-threads.
|
||||
The channel.startup() runs in a daemon thread so that the main thread
|
||||
remains available and a new channel can be started at any time.
|
||||
Manage the lifecycle of multiple channels running concurrently.
|
||||
Each channel.startup() runs in its own daemon thread.
|
||||
The web channel is started as default console unless explicitly disabled.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._channel = None
|
||||
self._channel_thread = None
|
||||
self._channels = {} # channel_name -> channel instance
|
||||
self._threads = {} # channel_name -> thread
|
||||
self._primary_channel = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def channel(self):
|
||||
return self._channel
|
||||
"""Return the primary (first non-web) channel for backward compatibility."""
|
||||
return self._primary_channel
|
||||
|
||||
def start(self, channel_name: str, first_start: bool = False):
|
||||
def get_channel(self, channel_name: str):
|
||||
return self._channels.get(channel_name)
|
||||
|
||||
def start(self, channel_names: list, first_start: bool = False):
|
||||
"""
|
||||
Create and start a channel in a sub-thread.
|
||||
Create and start one or more channels in sub-threads.
|
||||
If first_start is True, plugins and linkai client will also be initialized.
|
||||
"""
|
||||
with self._lock:
|
||||
channel = channel_factory.create_channel(channel_name)
|
||||
self._channel = channel
|
||||
channels = []
|
||||
for name in channel_names:
|
||||
ch = channel_factory.create_channel(name)
|
||||
self._channels[name] = ch
|
||||
channels.append((name, ch))
|
||||
if self._primary_channel is None and name != "web":
|
||||
self._primary_channel = ch
|
||||
|
||||
if self._primary_channel is None and channels:
|
||||
self._primary_channel = channels[0][1]
|
||||
|
||||
if first_start:
|
||||
if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web",
|
||||
"wechatmp_service", "wechatcom_app", "wework",
|
||||
const.FEISHU, const.DINGTALK]:
|
||||
PluginManager().load_plugins()
|
||||
PluginManager().load_plugins()
|
||||
|
||||
if conf().get("use_linkai"):
|
||||
try:
|
||||
from common import cloud_client
|
||||
threading.Thread(target=cloud_client.start, args=(channel, self), daemon=True).start()
|
||||
except Exception as e:
|
||||
threading.Thread(
|
||||
target=cloud_client.start,
|
||||
args=(self._primary_channel, self),
|
||||
daemon=True,
|
||||
).start()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Run channel.startup() in a daemon thread so we can restart later
|
||||
self._channel_thread = threading.Thread(
|
||||
target=self._run_channel, args=(channel,), daemon=True
|
||||
)
|
||||
self._channel_thread.start()
|
||||
logger.debug(f"[ChannelManager] Channel '{channel_name}' started in sub-thread")
|
||||
# Start web console first so its logs print cleanly,
|
||||
# then start remaining channels after a brief pause.
|
||||
web_entry = None
|
||||
other_entries = []
|
||||
for entry in channels:
|
||||
if entry[0] == "web":
|
||||
web_entry = entry
|
||||
else:
|
||||
other_entries.append(entry)
|
||||
|
||||
def _run_channel(self, channel):
|
||||
ordered = ([web_entry] if web_entry else []) + other_entries
|
||||
for i, (name, ch) in enumerate(ordered):
|
||||
if i > 0 and name != "web":
|
||||
time.sleep(0.1)
|
||||
t = threading.Thread(target=self._run_channel, args=(name, ch), daemon=True)
|
||||
self._threads[name] = t
|
||||
t.start()
|
||||
logger.debug(f"[ChannelManager] Channel '{name}' started in sub-thread")
|
||||
|
||||
def _run_channel(self, name: str, channel):
|
||||
try:
|
||||
channel.startup()
|
||||
except Exception as e:
|
||||
logger.error(f"[ChannelManager] Channel startup error: {e}")
|
||||
logger.error(f"[ChannelManager] Channel '{name}' startup error: {e}")
|
||||
logger.exception(e)
|
||||
|
||||
def stop(self):
|
||||
def stop(self, channel_name: str = None):
|
||||
"""
|
||||
Stop the current channel. Since most channel startup() methods block
|
||||
on an HTTP server or stream client, we stop by terminating the thread.
|
||||
Stop channel(s). If channel_name is given, stop only that channel;
|
||||
otherwise stop all channels.
|
||||
"""
|
||||
with self._lock:
|
||||
if self._channel is None:
|
||||
return
|
||||
channel_type = getattr(self._channel, 'channel_type', 'unknown')
|
||||
logger.info(f"[ChannelManager] Stopping channel '{channel_type}'...")
|
||||
|
||||
# Try graceful stop if channel implements it
|
||||
try:
|
||||
if hasattr(self._channel, 'stop'):
|
||||
self._channel.stop()
|
||||
except Exception as e:
|
||||
logger.warning(f"[ChannelManager] Error during channel stop: {e}")
|
||||
|
||||
self._channel = None
|
||||
self._channel_thread = None
|
||||
names = [channel_name] if channel_name else list(self._channels.keys())
|
||||
for name in names:
|
||||
ch = self._channels.pop(name, None)
|
||||
self._threads.pop(name, None)
|
||||
if ch is None:
|
||||
continue
|
||||
logger.info(f"[ChannelManager] Stopping channel '{name}'...")
|
||||
try:
|
||||
if hasattr(ch, 'stop'):
|
||||
ch.stop()
|
||||
except Exception as e:
|
||||
logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}")
|
||||
if channel_name and self._primary_channel is self._channels.get(channel_name):
|
||||
self._primary_channel = None
|
||||
|
||||
def restart(self, new_channel_name: str):
|
||||
"""
|
||||
Restart the channel with a new channel type.
|
||||
Restart a single channel with a new channel type.
|
||||
Can be called from any thread (e.g. linkai config callback).
|
||||
"""
|
||||
logger.info(f"[ChannelManager] Restarting channel to '{new_channel_name}'...")
|
||||
self.stop()
|
||||
|
||||
# Clear singleton cache so a fresh channel instance is created
|
||||
self.stop(new_channel_name)
|
||||
_clear_singleton_cache(new_channel_name)
|
||||
|
||||
time.sleep(1) # Brief pause to allow resources to release
|
||||
self.start(new_channel_name, first_start=False)
|
||||
time.sleep(1)
|
||||
self.start([new_channel_name], first_start=False)
|
||||
logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully")
|
||||
|
||||
|
||||
@@ -130,14 +167,11 @@ def _clear_singleton_cache(channel_name: str):
|
||||
module_path = cls_map.get(channel_name)
|
||||
if not module_path:
|
||||
return
|
||||
# The singleton decorator stores instances in a closure dict keyed by class.
|
||||
# We need to find the actual class and clear it from the closure.
|
||||
try:
|
||||
parts = module_path.rsplit(".", 1)
|
||||
module_name, class_name = parts[0], parts[1]
|
||||
import importlib
|
||||
module = importlib.import_module(module_name)
|
||||
# The module-level name is the wrapper function from @singleton
|
||||
wrapper = getattr(module, class_name, None)
|
||||
if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__:
|
||||
for cell in wrapper.__closure__:
|
||||
@@ -176,17 +210,28 @@ def run():
|
||||
# kill signal
|
||||
sigterm_handler_wrap(signal.SIGTERM)
|
||||
|
||||
# create channel
|
||||
channel_name = conf().get("channel_type", "wx")
|
||||
# Parse channel_type into a list
|
||||
raw_channel = conf().get("channel_type", "wx")
|
||||
|
||||
if "--cmd" in sys.argv:
|
||||
channel_name = "terminal"
|
||||
channel_names = ["terminal"]
|
||||
else:
|
||||
channel_names = _parse_channel_type(raw_channel)
|
||||
if not channel_names:
|
||||
channel_names = ["wx"]
|
||||
|
||||
if channel_name == "wxy":
|
||||
if "wxy" in channel_names:
|
||||
os.environ["WECHATY_LOG"] = "warn"
|
||||
|
||||
# Auto-start web console unless explicitly disabled
|
||||
web_console_enabled = conf().get("web_console", True)
|
||||
if web_console_enabled and "web" not in channel_names:
|
||||
channel_names.append("web")
|
||||
|
||||
logger.info(f"[App] Starting channels: {channel_names}")
|
||||
|
||||
_channel_mgr = ChannelManager()
|
||||
_channel_mgr.start(channel_name, first_start=True)
|
||||
_channel_mgr.start(channel_names, first_start=True)
|
||||
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
@@ -150,7 +150,7 @@ class AgentInitializer:
|
||||
if saved:
|
||||
with agent.messages_lock:
|
||||
agent.messages = saved
|
||||
logger.info(
|
||||
logger.debug(
|
||||
f"[AgentInitializer] Restored {len(saved)} messages "
|
||||
f"({restore_turns} turns cap) for session={session_id}"
|
||||
)
|
||||
|
||||
@@ -24,11 +24,16 @@ handler_pool = ThreadPoolExecutor(max_workers=8) # 处理消息的线程池
|
||||
class ChatChannel(Channel):
|
||||
name = None # 登录的用户名
|
||||
user_id = None # 登录的用户id
|
||||
futures = {} # 记录每个session_id提交到线程池的future对象, 用于重置会话时把没执行的future取消掉,正在执行的不会被取消
|
||||
sessions = {} # 用于控制并发,每个session_id同时只能有一个context在处理
|
||||
lock = threading.Lock() # 用于控制对sessions的访问
|
||||
|
||||
def __init__(self):
|
||||
# Instance-level attributes so each channel subclass has its own
|
||||
# independent session queue and lock. Previously these were class-level,
|
||||
# which caused contexts from one channel (e.g. Feishu) to be consumed
|
||||
# by another channel's consume() thread (e.g. Web), leading to errors
|
||||
# like "No request_id found in context".
|
||||
self.futures = {}
|
||||
self.sessions = {}
|
||||
self.lock = threading.Lock()
|
||||
_thread = threading.Thread(target=self.consume)
|
||||
_thread.setDaemon(True)
|
||||
_thread.start()
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
import sys
|
||||
import time
|
||||
import web
|
||||
import json
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from queue import Queue, Empty
|
||||
|
||||
import web
|
||||
|
||||
from bridge.context import *
|
||||
from bridge.reply import Reply, ReplyType
|
||||
from channel.chat_channel import ChatChannel, check_prefix
|
||||
@@ -11,20 +17,17 @@ from channel.chat_message import ChatMessage
|
||||
from common.log import logger
|
||||
from common.singleton import singleton
|
||||
from config import conf
|
||||
import os
|
||||
import mimetypes
|
||||
import threading
|
||||
import logging
|
||||
|
||||
|
||||
class WebMessage(ChatMessage):
|
||||
def __init__(
|
||||
self,
|
||||
msg_id,
|
||||
content,
|
||||
ctype=ContextType.TEXT,
|
||||
from_user_id="User",
|
||||
to_user_id="Chatgpt",
|
||||
other_user_id="Chatgpt",
|
||||
self,
|
||||
msg_id,
|
||||
content,
|
||||
ctype=ContextType.TEXT,
|
||||
from_user_id="User",
|
||||
to_user_id="Chatgpt",
|
||||
other_user_id="Chatgpt",
|
||||
):
|
||||
self.msg_id = msg_id
|
||||
self.ctype = ctype
|
||||
@@ -38,7 +41,7 @@ class WebMessage(ChatMessage):
|
||||
class WebChannel(ChatChannel):
|
||||
NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE]
|
||||
_instance = None
|
||||
|
||||
|
||||
# def __new__(cls):
|
||||
# if cls._instance is None:
|
||||
# cls._instance = super(WebChannel, cls).__new__(cls)
|
||||
@@ -47,12 +50,11 @@ class WebChannel(ChatChannel):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.msg_id_counter = 0
|
||||
self.session_queues = {} # session_id -> Queue (fallback polling)
|
||||
self.request_to_session = {} # request_id -> session_id
|
||||
self.sse_queues = {} # request_id -> Queue (SSE streaming)
|
||||
self.session_queues = {} # session_id -> Queue (fallback polling)
|
||||
self.request_to_session = {} # request_id -> session_id
|
||||
self.sse_queues = {} # request_id -> Queue (SSE streaming)
|
||||
self._http_server = None
|
||||
|
||||
|
||||
def _generate_msg_id(self):
|
||||
"""生成唯一的消息ID"""
|
||||
self.msg_id_counter += 1
|
||||
@@ -111,6 +113,7 @@ class WebChannel(ChatChannel):
|
||||
|
||||
def _make_sse_callback(self, request_id: str):
|
||||
"""Build an on_event callback that pushes agent stream events into the SSE queue."""
|
||||
|
||||
def on_event(event: dict):
|
||||
if request_id not in self.sse_queues:
|
||||
return
|
||||
@@ -237,28 +240,28 @@ class WebChannel(ChatChannel):
|
||||
data = web.data()
|
||||
json_data = json.loads(data)
|
||||
session_id = json_data.get('session_id')
|
||||
|
||||
|
||||
if not session_id or session_id not in self.session_queues:
|
||||
return json.dumps({"status": "error", "message": "Invalid session ID"})
|
||||
|
||||
|
||||
# 尝试从队列获取响应,不等待
|
||||
try:
|
||||
# 使用peek而不是get,这样如果前端没有成功处理,下次还能获取到
|
||||
response = self.session_queues[session_id].get(block=False)
|
||||
|
||||
|
||||
# 返回响应,包含请求ID以区分不同请求
|
||||
return json.dumps({
|
||||
"status": "success",
|
||||
"status": "success",
|
||||
"has_content": True,
|
||||
"content": response["content"],
|
||||
"request_id": response["request_id"],
|
||||
"timestamp": response["timestamp"]
|
||||
})
|
||||
|
||||
|
||||
except Empty:
|
||||
# 没有新响应
|
||||
return json.dumps({"status": "success", "has_content": False})
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error polling response: {e}")
|
||||
return json.dumps({"status": "error", "message": str(e)})
|
||||
@@ -271,9 +274,10 @@ class WebChannel(ChatChannel):
|
||||
|
||||
def startup(self):
|
||||
port = conf().get("web_port", 9899)
|
||||
|
||||
|
||||
# 打印可用渠道类型提示
|
||||
logger.info("[WebChannel] 当前channel为web,可修改 config.json 配置文件中的 channel_type 字段进行切换。全部可用类型为:")
|
||||
logger.info(
|
||||
"[WebChannel] 全部可用通道如下,可修改 config.json 配置文件中的 channel_type 字段进行切换,多个通道用逗号分隔:")
|
||||
logger.info("[WebChannel] 1. web - 网页")
|
||||
logger.info("[WebChannel] 2. terminal - 终端")
|
||||
logger.info("[WebChannel] 3. feishu - 飞书")
|
||||
@@ -281,16 +285,16 @@ class WebChannel(ChatChannel):
|
||||
logger.info("[WebChannel] 5. wechatcom_app - 企微自建应用")
|
||||
logger.info("[WebChannel] 6. wechatmp - 个人公众号")
|
||||
logger.info("[WebChannel] 7. wechatmp_service - 企业公众号")
|
||||
logger.info("[WebChannel] ✅ Web控制台已运行")
|
||||
logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}")
|
||||
logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port} (请将YOUR_IP替换为服务器IP)")
|
||||
logger.info("[WebChannel] ✅ Web对话网页已运行")
|
||||
|
||||
|
||||
# 确保静态文件目录存在
|
||||
static_dir = os.path.join(os.path.dirname(__file__), 'static')
|
||||
if not os.path.exists(static_dir):
|
||||
os.makedirs(static_dir)
|
||||
logger.debug(f"[WebChannel] Created static directory: {static_dir}")
|
||||
|
||||
|
||||
urls = (
|
||||
'/', 'RootHandler',
|
||||
'/message', 'MessageHandler',
|
||||
@@ -307,14 +311,14 @@ class WebChannel(ChatChannel):
|
||||
'/assets/(.*)', 'AssetsHandler',
|
||||
)
|
||||
app = web.application(urls, globals(), autoreload=False)
|
||||
|
||||
|
||||
# 完全禁用web.py的HTTP日志输出
|
||||
web.httpserver.LogMiddleware.log = lambda self, status, environ: None
|
||||
|
||||
|
||||
# 配置web.py的日志级别为ERROR
|
||||
logging.getLogger("web").setLevel(logging.ERROR)
|
||||
logging.getLogger("web.httpserver").setLevel(logging.ERROR)
|
||||
|
||||
|
||||
# Build WSGI app with middleware (same as runsimple but without print)
|
||||
func = web.httpserver.StaticMiddleware(app.wsgifunc())
|
||||
func = web.httpserver.LogMiddleware(func)
|
||||
|
||||
@@ -160,7 +160,8 @@ available_setting = {
|
||||
# chatgpt指令自定义触发词
|
||||
"clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头
|
||||
# channel配置
|
||||
"channel_type": "", # 通道类型,支持:{wx,wxy,terminal,wechatmp,wechatmp_service,wechatcom_app,dingtalk}
|
||||
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wechatmp,wechatmp_service,wechatcom_app
|
||||
"web_console": True, # 是否自动启动Web控制台(默认启动)。设为False可禁用
|
||||
"subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app
|
||||
"debug": False, # 是否开启debug模式,开启后会打印更多日志
|
||||
"appdata_dir": "", # 数据目录
|
||||
|
||||
Reference in New Issue
Block a user