From c0702c8b368dbbede0372809f8a0e70aee05b440 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Mon, 23 Feb 2026 22:19:50 +0800 Subject: [PATCH] feat: web channel stream chat --- bridge/agent_event_handler.py | 10 +- channel/channel.py | 5 +- channel/web/static/css/console.css | 140 +++++++++++++++++++++ channel/web/static/js/console.js | 191 ++++++++++++++++++++++++++++- channel/web/web_channel.py | 164 +++++++++++++++++++------ 5 files changed, 461 insertions(+), 49 deletions(-) diff --git a/bridge/agent_event_handler.py b/bridge/agent_event_handler.py index 8bc7f62..b04c77b 100644 --- a/bridge/agent_event_handler.py +++ b/bridge/agent_event_handler.py @@ -94,15 +94,15 @@ class AgentEventHandler: def _send_to_channel(self, message): """ - Try to send message to channel - - Args: - message: Message to send + Try to send intermediate message to channel. + Skipped in SSE mode because thinking text is already streamed via on_event. """ + if self.context and self.context.get("on_event"): + return + if self.channel: try: from bridge.reply import Reply, ReplyType - # Create a Reply object for the message reply = Reply(ReplyType.TEXT, message) self.channel._send(reply, self.context) except Exception as e: diff --git a/channel/channel.py b/channel/channel.py index 42d613f..f01189e 100644 --- a/channel/channel.py +++ b/channel/channel.py @@ -57,11 +57,14 @@ class Channel(object): if context and "channel_type" not in context: context["channel_type"] = self.channel_type + # Read on_event callback injected by the channel (e.g. web SSE) + on_event = context.get("on_event") if context else None + # Use agent bridge to handle the query return Bridge().fetch_agent_reply( query=query, context=context, - on_event=None, + on_event=on_event, clear_history=False ) except Exception as e: diff --git a/channel/web/static/css/console.css b/channel/web/static/css/console.css index 3775b5d..2390569 100644 --- a/channel/web/static/css/console.css +++ b/channel/web/static/css/console.css @@ -82,6 +82,146 @@ .msg-content hr { border: none; height: 1px; background: #e2e8f0; margin: 1.2em 0; } .dark .msg-content hr { background: rgba(255,255,255,0.1); } +/* SSE Streaming cursor */ +@keyframes blink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } } +.sse-streaming::after { + content: '▋'; + display: inline-block; + margin-left: 2px; + color: #4ABE6E; + animation: blink 0.9s step-end infinite; + font-size: 0.85em; + vertical-align: middle; +} + +/* Agent steps (thinking summaries + tool indicators) */ +.agent-steps:empty { display: none; } +.agent-steps:not(:empty) { + margin-bottom: 0.625rem; + padding-bottom: 0.5rem; + border-bottom: 1px dashed rgba(0, 0, 0, 0.08); +} +.dark .agent-steps:not(:empty) { border-bottom-color: rgba(255, 255, 255, 0.08); } + +.agent-step { + font-size: 0.75rem; + line-height: 1.4; + color: #94a3b8; + margin-bottom: 0.25rem; +} +.agent-step:last-child { margin-bottom: 0; } + +/* Thinking step - collapsible */ +.agent-thinking-step .thinking-header { + display: flex; + align-items: center; + gap: 0.375rem; + cursor: pointer; + user-select: none; +} +.agent-thinking-step .thinking-header.no-toggle { cursor: default; } +.agent-thinking-step .thinking-header:not(.no-toggle):hover { color: #64748b; } +.dark .agent-thinking-step .thinking-header:not(.no-toggle):hover { color: #cbd5e1; } +.agent-thinking-step .thinking-header i:first-child { font-size: 0.625rem; margin-top: 1px; } +.agent-thinking-step .thinking-chevron { + font-size: 0.5rem; + margin-left: auto; + transition: transform 0.2s ease; + opacity: 0.5; +} +.agent-thinking-step.expanded .thinking-chevron { transform: rotate(90deg); } +.agent-thinking-step .thinking-full { + display: none; + margin-top: 0.375rem; + margin-left: 1rem; + padding: 0.5rem; + background: rgba(0, 0, 0, 0.02); + border-radius: 6px; + border: 1px solid rgba(0, 0, 0, 0.04); + font-size: 0.75rem; + line-height: 1.5; + color: #94a3b8; + max-height: 200px; + overflow-y: auto; +} +.dark .agent-thinking-step .thinking-full { + background: rgba(255, 255, 255, 0.02); + border-color: rgba(255, 255, 255, 0.04); +} +.agent-thinking-step.expanded .thinking-full { display: block; } +.agent-thinking-step .thinking-full p { margin: 0.25em 0; } +.agent-thinking-step .thinking-full p:first-child { margin-top: 0; } +.agent-thinking-step .thinking-full p:last-child { margin-bottom: 0; } + +/* Tool step - collapsible */ +.agent-tool-step .tool-header { + display: flex; + align-items: center; + gap: 0.375rem; + cursor: pointer; + user-select: none; + padding: 1px 0; + border-radius: 4px; +} +.agent-tool-step .tool-header:hover { color: #64748b; } +.dark .agent-tool-step .tool-header:hover { color: #cbd5e1; } +.agent-tool-step .tool-icon { font-size: 0.625rem; } +.agent-tool-step .tool-chevron { + font-size: 0.5rem; + margin-left: auto; + transition: transform 0.2s ease; + opacity: 0.5; +} +.agent-tool-step.expanded .tool-chevron { transform: rotate(90deg); } +.agent-tool-step .tool-time { + font-size: 0.65rem; + opacity: 0.6; + margin-left: 0.25rem; +} + +/* Tool detail panel */ +.agent-tool-step .tool-detail { + display: none; + margin-top: 0.375rem; + margin-left: 1rem; + padding: 0.5rem; + background: rgba(0, 0, 0, 0.02); + border-radius: 6px; + border: 1px solid rgba(0, 0, 0, 0.04); +} +.dark .agent-tool-step .tool-detail { + background: rgba(255, 255, 255, 0.02); + border-color: rgba(255, 255, 255, 0.04); +} +.agent-tool-step.expanded .tool-detail { display: block; } +.tool-detail-section { margin-bottom: 0.375rem; } +.tool-detail-section:last-child { margin-bottom: 0; } +.tool-detail-label { + font-size: 0.625rem; + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.05em; + opacity: 0.6; + margin-bottom: 0.125rem; +} +.tool-detail-content { + font-family: 'JetBrains Mono', 'Fira Code', Consolas, monospace; + font-size: 0.7rem; + line-height: 1.5; + white-space: pre-wrap; + word-break: break-all; + max-height: 200px; + overflow-y: auto; + margin: 0; + padding: 0.25rem 0; + background: transparent; + color: inherit; +} +.tool-error-text { color: #f87171; } + +/* Tool failed state */ +.agent-tool-step.tool-failed .tool-name { color: #f87171; } + /* Chat Input */ #chat-input { resize: none; height: 42px; max-height: 180px; diff --git a/channel/web/static/js/console.js b/channel/web/static/js/console.js index 7746160..8b326cb 100644 --- a/channel/web/static/js/console.js +++ b/channel/web/static/js/console.js @@ -225,6 +225,7 @@ function renderMarkdown(text) { let sessionId = generateSessionId(); let isPolling = false; let loadingContainers = {}; +let activeStreams = {}; // request_id -> EventSource let isComposing = false; let appConfig = { use_agent: false, title: 'CowAgent', subtitle: '' }; @@ -310,13 +311,17 @@ function sendMessage() { fetch('/message', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ session_id: sessionId, message: text, timestamp: timestamp.toISOString() }) + body: JSON.stringify({ session_id: sessionId, message: text, stream: true, timestamp: timestamp.toISOString() }) }) .then(r => r.json()) .then(data => { if (data.status === 'success') { - loadingContainers[data.request_id] = loadingEl; - if (!isPolling) startPolling(); + if (data.stream) { + startSSE(data.request_id, loadingEl, timestamp); + } else { + loadingContainers[data.request_id] = loadingEl; + if (!isPolling) startPolling(); + } } else { loadingEl.remove(); addBotMessage(t('error_send'), new Date()); @@ -328,6 +333,163 @@ function sendMessage() { }); } +function startSSE(requestId, loadingEl, timestamp) { + const es = new EventSource(`/stream?request_id=${encodeURIComponent(requestId)}`); + activeStreams[requestId] = es; + + let botEl = null; + let stepsEl = null; // .agent-steps (thinking summaries + tool indicators) + let contentEl = null; // .answer-content (final streaming answer) + let accumulatedText = ''; + let currentToolEl = null; + + function ensureBotEl() { + if (botEl) return; + if (loadingEl) { loadingEl.remove(); loadingEl = null; } + botEl = document.createElement('div'); + botEl.className = 'flex gap-3 px-4 sm:px-6 py-3'; + botEl.dataset.requestId = requestId; + botEl.innerHTML = ` + CowAgent +
+
+
+
+
+
${formatTime(timestamp)}
+
+ `; + messagesDiv.appendChild(botEl); + stepsEl = botEl.querySelector('.agent-steps'); + contentEl = botEl.querySelector('.answer-content'); + } + + es.onmessage = function(e) { + let item; + try { item = JSON.parse(e.data); } catch (_) { return; } + + if (item.type === 'delta') { + ensureBotEl(); + accumulatedText += item.content; + contentEl.innerHTML = renderMarkdown(accumulatedText); + scrollChatToBottom(); + + } else if (item.type === 'tool_start') { + ensureBotEl(); + + // Save current thinking as a collapsible step + if (accumulatedText.trim()) { + const fullText = accumulatedText.trim(); + const oneLine = fullText.replace(/\n+/g, ' '); + const needsTruncate = oneLine.length > 80; + const stepEl = document.createElement('div'); + stepEl.className = 'agent-step agent-thinking-step' + (needsTruncate ? '' : ' no-expand'); + if (needsTruncate) { + const truncated = oneLine.substring(0, 80) + '…'; + stepEl.innerHTML = ` +
+ + ${escapeHtml(truncated)} + +
+
${renderMarkdown(fullText)}
`; + } else { + stepEl.innerHTML = ` +
+ + ${escapeHtml(oneLine)} +
`; + } + stepsEl.appendChild(stepEl); + } + accumulatedText = ''; + contentEl.innerHTML = ''; + + // Add tool execution indicator (collapsible) + currentToolEl = document.createElement('div'); + currentToolEl.className = 'agent-step agent-tool-step'; + const argsStr = formatToolArgs(item.arguments || {}); + currentToolEl.innerHTML = ` +
+ + ${item.tool} + +
+
+
+
Input
+
${argsStr}
+
+
+
`; + stepsEl.appendChild(currentToolEl); + + scrollChatToBottom(); + + } else if (item.type === 'tool_end') { + if (currentToolEl) { + const isError = item.status !== 'success'; + const icon = currentToolEl.querySelector('.tool-icon'); + icon.className = isError + ? 'fas fa-times text-red-400 flex-shrink-0 tool-icon' + : 'fas fa-check text-primary-400 flex-shrink-0 tool-icon'; + + // Show execution time + const nameEl = currentToolEl.querySelector('.tool-name'); + if (item.execution_time !== undefined) { + nameEl.innerHTML += ` ${item.execution_time}s`; + } + + // Fill output section + const outputSection = currentToolEl.querySelector('.tool-output-section'); + if (outputSection && item.result) { + outputSection.innerHTML = ` +
${isError ? 'Error' : 'Output'}
+
${escapeHtml(String(item.result))}
`; + } + + if (isError) currentToolEl.classList.add('tool-failed'); + currentToolEl = null; + } + + } else if (item.type === 'done') { + es.close(); + delete activeStreams[requestId]; + + const finalText = item.content || accumulatedText; + + if (!botEl && finalText) { + if (loadingEl) { loadingEl.remove(); loadingEl = null; } + addBotMessage(finalText, new Date((item.timestamp || Date.now() / 1000) * 1000), requestId); + } else if (botEl) { + contentEl.classList.remove('sse-streaming'); + if (finalText) contentEl.innerHTML = renderMarkdown(finalText); + applyHighlighting(botEl); + } + scrollChatToBottom(); + + } else if (item.type === 'error') { + es.close(); + delete activeStreams[requestId]; + if (loadingEl) { loadingEl.remove(); loadingEl = null; } + addBotMessage(t('error_send'), new Date()); + } + }; + + es.onerror = function() { + es.close(); + delete activeStreams[requestId]; + if (loadingEl) { loadingEl.remove(); loadingEl = null; } + if (!botEl) { + addBotMessage(t('error_send'), new Date()); + } else if (accumulatedText) { + contentEl.classList.remove('sse-streaming'); + contentEl.innerHTML = renderMarkdown(accumulatedText); + applyHighlighting(botEl); + } + }; +} + function startPolling() { if (isPolling) return; isPolling = true; @@ -379,7 +541,7 @@ function addBotMessage(content, timestamp, requestId) { el.className = 'flex gap-3 px-4 sm:px-6 py-3'; if (requestId) el.dataset.requestId = requestId; el.innerHTML = ` - CowAgent + CowAgent
${renderMarkdown(content)} @@ -396,7 +558,7 @@ function addLoadingIndicator() { const el = document.createElement('div'); el.className = 'flex gap-3 px-4 sm:px-6 py-3'; el.innerHTML = ` - CowAgent + CowAgent
@@ -411,6 +573,10 @@ function addLoadingIndicator() { } function newChat() { + // Close all active SSE connections for the current session + Object.values(activeStreams).forEach(es => { try { es.close(); } catch (_) {} }); + activeStreams = {}; + sessionId = generateSessionId(); isPolling = false; loadingContainers = {}; @@ -473,6 +639,21 @@ function formatTime(date) { return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }); } +function escapeHtml(str) { + const div = document.createElement('div'); + div.appendChild(document.createTextNode(str)); + return div.innerHTML; +} + +function formatToolArgs(args) { + if (!args || Object.keys(args).length === 0) return '(none)'; + try { + return escapeHtml(JSON.stringify(args, null, 2)); + } catch (_) { + return escapeHtml(String(args)); + } +} + function scrollChatToBottom() { messagesDiv.scrollTop = messagesDiv.scrollHeight; } diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 2ba25ee..6c70836 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -3,7 +3,6 @@ import time import web import json import uuid -import io from queue import Queue, Empty from bridge.context import * from bridge.reply import Reply, ReplyType @@ -13,7 +12,7 @@ from common.log import logger from common.singleton import singleton from config import conf import os -import mimetypes # 添加这行来处理MIME类型 +import mimetypes import threading import logging @@ -47,9 +46,10 @@ class WebChannel(ChatChannel): def __init__(self): super().__init__() - self.msg_id_counter = 0 # 添加消息ID计数器 - self.session_queues = {} # 存储session_id到队列的映射 - self.request_to_session = {} # 存储request_id到session_id的映射 + self.msg_id_counter = 0 + self.session_queues = {} # session_id -> Queue (fallback polling) + self.request_to_session = {} # request_id -> session_id + self.sse_queues = {} # request_id -> Queue (SSE streaming) self._http_server = None @@ -71,22 +71,30 @@ class WebChannel(ChatChannel): if reply.type == ReplyType.IMAGE_URL: time.sleep(0.5) - # 获取请求ID和会话ID request_id = context.get("request_id", None) - if not request_id: logger.error("No request_id found in context, cannot send message") return - - # 通过request_id获取session_id + session_id = self.request_to_session.get(request_id) if not session_id: logger.error(f"No session_id found for request {request_id}") return - - # 检查是否有会话队列 + + # SSE mode: push done event to SSE queue + if request_id in self.sse_queues: + content = reply.content if reply.content is not None else "" + self.sse_queues[request_id].put({ + "type": "done", + "content": content, + "request_id": request_id, + "timestamp": time.time() + }) + logger.debug(f"SSE done sent for request {request_id}") + return + + # Fallback: polling mode if session_id in self.session_queues: - # 创建响应数据,包含请求ID以区分不同请求的响应 response_data = { "type": str(reply.type), "content": reply.content, @@ -94,69 +102,133 @@ class WebChannel(ChatChannel): "request_id": request_id } self.session_queues[session_id].put(response_data) - logger.debug(f"Response sent to queue for session {session_id}, request {request_id}") + logger.debug(f"Response sent to poll queue for session {session_id}, request {request_id}") else: logger.warning(f"No response queue found for session {session_id}, response dropped") - + except Exception as e: logger.error(f"Error in send method: {e}") + def _make_sse_callback(self, request_id: str): + """Build an on_event callback that pushes agent stream events into the SSE queue.""" + def on_event(event: dict): + if request_id not in self.sse_queues: + return + q = self.sse_queues[request_id] + event_type = event.get("type") + data = event.get("data", {}) + + if event_type == "message_update": + delta = data.get("delta", "") + if delta: + q.put({"type": "delta", "content": delta}) + + elif event_type == "tool_execution_start": + tool_name = data.get("tool_name", "tool") + arguments = data.get("arguments", {}) + q.put({"type": "tool_start", "tool": tool_name, "arguments": arguments}) + + elif event_type == "tool_execution_end": + tool_name = data.get("tool_name", "tool") + status = data.get("status", "success") + result = data.get("result", "") + exec_time = data.get("execution_time", 0) + # Truncate long results to avoid huge SSE payloads + result_str = str(result) + if len(result_str) > 2000: + result_str = result_str[:2000] + "…" + q.put({ + "type": "tool_end", + "tool": tool_name, + "status": status, + "result": result_str, + "execution_time": round(exec_time, 2) + }) + + return on_event + def post_message(self): """ Handle incoming messages from users via POST request. Returns a request_id for tracking this specific request. """ try: - data = web.data() # 获取原始POST数据 + data = web.data() json_data = json.loads(data) session_id = json_data.get('session_id', f'session_{int(time.time())}') prompt = json_data.get('message', '') - - # 生成请求ID + use_sse = json_data.get('stream', True) + request_id = self._generate_request_id() - - # 将请求ID与会话ID关联 self.request_to_session[request_id] = session_id - - # 确保会话队列存在 + if session_id not in self.session_queues: self.session_queues[session_id] = Queue() - - # Web channel 不需要前缀,确保消息能通过前缀检查 + + if use_sse: + self.sse_queues[request_id] = Queue() + trigger_prefixs = conf().get("single_chat_prefix", [""]) if check_prefix(prompt, trigger_prefixs) is None: - # 如果没有匹配到前缀,给消息加上第一个前缀 if trigger_prefixs: prompt = trigger_prefixs[0] + prompt logger.debug(f"[WebChannel] Added prefix to message: {prompt}") - - # 创建消息对象 + msg = WebMessage(self._generate_msg_id(), prompt) - msg.from_user_id = session_id # 使用会话ID作为用户ID - - # 创建上下文,明确指定 isgroup=False + msg.from_user_id = session_id + context = self._compose_context(ContextType.TEXT, prompt, msg=msg, isgroup=False) - - # 检查 context 是否为 None(可能被插件过滤等) + if context is None: logger.warning(f"[WebChannel] Context is None for session {session_id}, message may be filtered") + if request_id in self.sse_queues: + del self.sse_queues[request_id] return json.dumps({"status": "error", "message": "Message was filtered"}) - # 覆盖必要的字段(_compose_context 会设置默认值,但我们需要使用实际的 session_id) context["session_id"] = session_id context["receiver"] = session_id context["request_id"] = request_id - - # 异步处理消息 - 只传递上下文 + + if use_sse: + context["on_event"] = self._make_sse_callback(request_id) + threading.Thread(target=self.produce, args=(context,)).start() - - # 返回请求ID - return json.dumps({"status": "success", "request_id": request_id}) - + + return json.dumps({"status": "success", "request_id": request_id, "stream": use_sse}) + except Exception as e: logger.error(f"Error processing message: {e}") return json.dumps({"status": "error", "message": str(e)}) + def stream_response(self, request_id: str): + """ + SSE generator for a given request_id. + Yields UTF-8 encoded bytes to avoid WSGI Latin-1 mangling. + """ + if request_id not in self.sse_queues: + yield b"data: {\"type\": \"error\", \"message\": \"invalid request_id\"}\n\n" + return + + q = self.sse_queues[request_id] + timeout = 300 # 5 minutes max + deadline = time.time() + timeout + + try: + while time.time() < deadline: + try: + item = q.get(timeout=1) + except Empty: + yield b": keepalive\n\n" + continue + + payload = json.dumps(item, ensure_ascii=False) + yield f"data: {payload}\n\n".encode("utf-8") + + if item.get("type") == "done": + break + finally: + self.sse_queues.pop(request_id, None) + def poll_response(self): """ Poll for responses using the session_id. @@ -223,6 +295,7 @@ class WebChannel(ChatChannel): '/', 'RootHandler', '/message', 'MessageHandler', '/poll', 'PollHandler', + '/stream', 'StreamHandler', '/chat', 'ChatHandler', '/config', 'ConfigHandler', '/assets/(.*)', 'AssetsHandler', @@ -272,6 +345,21 @@ class PollHandler: return WebChannel().poll_response() +class StreamHandler: + def GET(self): + params = web.input(request_id='') + request_id = params.request_id + if not request_id: + raise web.badrequest() + + web.header('Content-Type', 'text/event-stream; charset=utf-8') + web.header('Cache-Control', 'no-cache') + web.header('X-Accel-Buffering', 'no') + web.header('Access-Control-Allow-Origin', '*') + + return WebChannel().stream_response(request_id) + + class ChatHandler: def GET(self): # 正常返回聊天页面