diff --git a/agent/chat/service.py b/agent/chat/service.py index 23a121a..af50ce8 100644 --- a/agent/chat/service.py +++ b/agent/chat/service.py @@ -158,6 +158,7 @@ class ChatService: logger.info(f"[ChatService] Agent run completed: session={session_id}") + class _StreamState: """Mutable state shared between the event callback and the run method.""" diff --git a/agent/memory/__init__.py b/agent/memory/__init__.py index f638a9d..89f03c3 100644 --- a/agent/memory/__init__.py +++ b/agent/memory/__init__.py @@ -1,11 +1,21 @@ """ Memory module for AgentMesh -Provides long-term memory capabilities with hybrid search (vector + keyword) +Provides both long-term memory (vector/keyword search) and short-term +conversation history persistence (SQLite). """ from agent.memory.manager import MemoryManager from agent.memory.config import MemoryConfig, get_default_memory_config, set_global_memory_config from agent.memory.embedding import create_embedding_provider +from agent.memory.conversation_store import ConversationStore, get_conversation_store -__all__ = ['MemoryManager', 'MemoryConfig', 'get_default_memory_config', 'set_global_memory_config', 'create_embedding_provider'] +__all__ = [ + 'MemoryManager', + 'MemoryConfig', + 'get_default_memory_config', + 'set_global_memory_config', + 'create_embedding_provider', + 'ConversationStore', + 'get_conversation_store', +] diff --git a/agent/memory/conversation_store.py b/agent/memory/conversation_store.py new file mode 100644 index 0000000..98fc102 --- /dev/null +++ b/agent/memory/conversation_store.py @@ -0,0 +1,616 @@ +""" +Conversation history persistence using SQLite. + +Design: +- sessions table: per-session metadata (channel_type, last_active, msg_count) +- messages table: individual messages stored as JSON, append-only +- Pruning: age-based only (sessions not updated within N days are deleted) +- Thread-safe via a single in-process lock + +Storage path: ~/cow/sessions/conversations.db +""" + +from __future__ import annotations + +import json +import sqlite3 +import threading +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +from common.log import logger + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +_DDL = """ +CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + channel_type TEXT NOT NULL DEFAULT '', + created_at INTEGER NOT NULL, + last_active INTEGER NOT NULL, + msg_count INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + seq INTEGER NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE (session_id, seq) +); + +CREATE INDEX IF NOT EXISTS idx_messages_session + ON messages (session_id, seq); + +CREATE INDEX IF NOT EXISTS idx_sessions_last_active + ON sessions (last_active); +""" + +# Migration: add channel_type column to existing databases that predate it. +_MIGRATION_ADD_CHANNEL_TYPE = """ +ALTER TABLE sessions ADD COLUMN channel_type TEXT NOT NULL DEFAULT ''; +""" + +DEFAULT_MAX_AGE_DAYS: int = 30 + + +def _is_visible_user_message(content: Any) -> bool: + """ + Return True when a user-role message represents actual user input + (not an internal tool_result injected by the agent loop). + """ + if isinstance(content, str): + return bool(content.strip()) + if isinstance(content, list): + return any( + isinstance(b, dict) and b.get("type") == "text" + for b in content + ) + return False + + +def _extract_display_text(content: Any) -> str: + """ + Extract the human-readable text portion from a message content value. + Returns an empty string for tool_use / tool_result blocks. + """ + if isinstance(content, str): + return content.strip() + if isinstance(content, list): + parts = [ + b.get("text", "") + for b in content + if isinstance(b, dict) and b.get("type") == "text" + ] + return "\n".join(p for p in parts if p).strip() + return "" + + +def _extract_tool_calls(content: Any) -> List[Dict[str, Any]]: + """ + Extract tool_use blocks from an assistant message content. + Returns a list of {name, arguments} dicts (result filled in later). + """ + if not isinstance(content, list): + return [] + return [ + {"id": b.get("id", ""), "name": b.get("name", ""), "arguments": b.get("input", {})} + for b in content + if isinstance(b, dict) and b.get("type") == "tool_use" + ] + + +def _extract_tool_results(content: Any) -> Dict[str, str]: + """ + Extract tool_result blocks from a user message, keyed by tool_use_id. + """ + if not isinstance(content, list): + return {} + results = {} + for b in content: + if not isinstance(b, dict) or b.get("type") != "tool_result": + continue + tool_id = b.get("tool_use_id", "") + result_content = b.get("content", "") + if isinstance(result_content, list): + result_content = "\n".join( + rb.get("text", "") for rb in result_content + if isinstance(rb, dict) and rb.get("type") == "text" + ) + results[tool_id] = str(result_content) + return results + + +def _group_into_display_turns( + rows: List[tuple], +) -> List[Dict[str, Any]]: + """ + Convert raw (role, content_json, created_at) DB rows into display turns. + + One display turn = one visible user message + one merged assistant reply. + All intermediate assistant messages (those carrying tool_use) and the final + assistant text reply produced for the same user query are collapsed into a + single assistant turn, exactly matching the live SSE rendering where tools + and the final answer appear inside the same bubble. + + Grouping rules: + - A visible user message starts a new group. + - tool_result user messages are internal; their content is attached to the + matching tool_use entry via tool_use_id and they never become own turns. + - All assistant messages within a group are merged: + * tool_use blocks → tool_calls list (result filled from tool_results) + * text blocks → last non-empty text becomes the display content + """ + # ------------------------------------------------------------------ # + # Pass 1: split rows into groups, each starting with a visible user msg + # ------------------------------------------------------------------ # + # group = (user_row | None, [subsequent_rows]) + # user_row: (content, created_at) + groups: List[tuple] = [] + cur_user: Optional[tuple] = None + cur_rest: List[tuple] = [] + started = False + + for role, raw_content, created_at in rows: + try: + content = json.loads(raw_content) + except Exception: + content = raw_content + + if role == "user" and _is_visible_user_message(content): + if started: + groups.append((cur_user, cur_rest)) + cur_user = (content, created_at) + cur_rest = [] + started = True + else: + cur_rest.append((role, content, created_at)) + + if started: + groups.append((cur_user, cur_rest)) + + # ------------------------------------------------------------------ # + # Pass 2: build display turns from each group + # ------------------------------------------------------------------ # + turns: List[Dict[str, Any]] = [] + + for user_row, rest in groups: + # User turn + if user_row: + content, created_at = user_row + text = _extract_display_text(content) + if text: + turns.append({"role": "user", "content": text, "created_at": created_at}) + + # Collect all tool_calls and tool_results from the rest of the group + all_tool_calls: List[Dict[str, Any]] = [] + tool_results: Dict[str, str] = {} + final_text = "" + final_ts: Optional[int] = None + + for role, content, created_at in rest: + if role == "user": + tool_results.update(_extract_tool_results(content)) + elif role == "assistant": + tcs = _extract_tool_calls(content) + all_tool_calls.extend(tcs) + t = _extract_display_text(content) + if t: + final_text = t + final_ts = created_at + + # Attach tool results to their matching tool_call entries + for tc in all_tool_calls: + tc["result"] = tool_results.get(tc.get("id", ""), "") + + if final_text or all_tool_calls: + turns.append({ + "role": "assistant", + "content": final_text, + "tool_calls": all_tool_calls, + "created_at": final_ts or (user_row[1] if user_row else 0), + }) + + return turns + + +class ConversationStore: + """ + SQLite-backed store for per-session conversation history. + + Usage: + store = ConversationStore(db_path) + store.append_messages("user_123", new_messages, channel_type="feishu") + msgs = store.load_messages("user_123", max_turns=30) + """ + + def __init__(self, db_path: Path): + self._db_path = db_path + self._lock = threading.Lock() + self._init_db() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def load_messages( + self, + session_id: str, + max_turns: int = 30, + ) -> List[Dict[str, Any]]: + """ + Load the most recent messages for a session, for injection into the LLM. + + ALL message types (user text, assistant tool_use, tool_result) are returned + in their original JSON form so the LLM can reconstruct the full context. + + max_turns is a *visible-turn* count: we count only user messages whose + content is actual user text (not tool_result blocks). This prevents + tool-heavy sessions from exhausting the turn budget prematurely. + + Args: + session_id: Unique session identifier. + max_turns: Maximum number of visible user-assistant turns to keep. + + Returns: + Chronologically ordered list of message dicts (role, content). + """ + with self._lock: + conn = self._connect() + try: + rows = conn.execute( + """ + SELECT seq, role, content + FROM messages + WHERE session_id = ? + ORDER BY seq DESC + """, + (session_id,), + ).fetchall() + finally: + conn.close() + + if not rows: + return [] + + # Walk newest-to-oldest counting *visible* user turns (actual user text, + # not tool_result injections). Record the seq of every visible user + # message so we can find a clean cut point later. + visible_turn_seqs: List[int] = [] # newest first + for seq, role, raw_content in rows: + if role != "user": + continue + try: + content = json.loads(raw_content) + except Exception: + content = raw_content + if _is_visible_user_message(content): + visible_turn_seqs.append(seq) + + # Determine the seq of the oldest visible user message we want to keep. + # If the total turns fit within max_turns, keep everything. + if len(visible_turn_seqs) <= max_turns: + cutoff_seq = None # keep all + else: + # The Nth visible user message (0-indexed) is the oldest we keep. + cutoff_seq = visible_turn_seqs[max_turns - 1] + + # Build result in chronological order, starting from cutoff. + # IMPORTANT: we start exactly at cutoff_seq (the visible user message), + # never mid-group, so tool_use / tool_result pairs are always complete. + result = [] + for seq, role, raw_content in reversed(rows): + if cutoff_seq is not None and seq < cutoff_seq: + continue + try: + content = json.loads(raw_content) + except Exception: + content = raw_content + result.append({"role": role, "content": content}) + return result + + def append_messages( + self, + session_id: str, + messages: List[Dict[str, Any]], + channel_type: str = "", + ) -> None: + """ + Append new messages to a session's history. + + Seq numbers continue from the session's current maximum, so + concurrent callers on distinct sessions never collide. + + Args: + session_id: Unique session identifier. + messages: List of message dicts to append. + channel_type: Source channel (e.g. "feishu", "web", "wechat"). + Only written on session creation; ignored on update. + """ + if not messages: + return + + now = int(time.time()) + with self._lock: + conn = self._connect() + try: + with conn: + # Upsert session row. + # channel_type is set only on INSERT (first time); subsequent + # appends just update last_active to avoid overwriting the value. + conn.execute( + """ + INSERT INTO sessions + (session_id, channel_type, created_at, last_active, msg_count) + VALUES (?, ?, ?, ?, 0) + ON CONFLICT(session_id) DO UPDATE SET + last_active = excluded.last_active + """, + (session_id, channel_type, now, now), + ) + + # Determine starting seq for the new batch. + row = conn.execute( + "SELECT COALESCE(MAX(seq), -1) FROM messages WHERE session_id = ?", + (session_id,), + ).fetchone() + next_seq = row[0] + 1 + + for msg in messages: + role = msg.get("role", "") + content = json.dumps( + msg.get("content", ""), ensure_ascii=False + ) + conn.execute( + """ + INSERT OR IGNORE INTO messages + (session_id, seq, role, content, created_at) + VALUES (?, ?, ?, ?, ?) + """, + (session_id, next_seq, role, content, now), + ) + next_seq += 1 + + conn.execute( + """ + UPDATE sessions + SET msg_count = ( + SELECT COUNT(*) FROM messages WHERE session_id = ? + ) + WHERE session_id = ? + """, + (session_id, session_id), + ) + finally: + conn.close() + + def clear_session(self, session_id: str) -> None: + """Delete all messages and the session record for a given session_id.""" + with self._lock: + conn = self._connect() + try: + with conn: + conn.execute( + "DELETE FROM messages WHERE session_id = ?", (session_id,) + ) + conn.execute( + "DELETE FROM sessions WHERE session_id = ?", (session_id,) + ) + finally: + conn.close() + + def cleanup_old_sessions(self, max_age_days: Optional[int] = None) -> int: + """ + Delete sessions that have not been active within max_age_days. + + Args: + max_age_days: Override the default retention period. + + Returns: + Number of sessions deleted. + """ + try: + from config import conf + max_age = max_age_days or conf().get( + "conversation_max_age_days", DEFAULT_MAX_AGE_DAYS + ) + except Exception: + max_age = max_age_days or DEFAULT_MAX_AGE_DAYS + + cutoff = int(time.time()) - max_age * 86400 + deleted = 0 + + with self._lock: + conn = self._connect() + try: + with conn: + stale = conn.execute( + "SELECT session_id FROM sessions WHERE last_active < ?", + (cutoff,), + ).fetchall() + for (sid,) in stale: + conn.execute( + "DELETE FROM messages WHERE session_id = ?", (sid,) + ) + conn.execute( + "DELETE FROM sessions WHERE session_id = ?", (sid,) + ) + deleted += 1 + finally: + conn.close() + + if deleted: + logger.info(f"[ConversationStore] Pruned {deleted} expired sessions") + return deleted + + def load_history_page( + self, + session_id: str, + page: int = 1, + page_size: int = 20, + ) -> Dict[str, Any]: + """ + Load a page of conversation history for UI display, grouped into turns. + + Each "turn" maps to one of: + - A user message (role="user", content=str) + - An assistant message (role="assistant", content=str, + tool_calls=[{name, arguments, result}] when tools were used) + + Internal tool_result user messages are merged into the preceding + assistant entry's tool_calls list and never appear as standalone items. + + Pages are numbered from 1 (most recent). Messages within a page are + returned in chronological order. + + Returns: + { + "messages": [ + { + "role": "user" | "assistant", + "content": str, + "tool_calls": [...], # assistant only, may be [] + "created_at": int, + }, + ... + ], + "total": , + "page": , + "page_size": , + "has_more": bool, + } + """ + page = max(1, page) + with self._lock: + conn = self._connect() + try: + rows = conn.execute( + """ + SELECT role, content, created_at + FROM messages + WHERE session_id = ? + ORDER BY seq ASC + """, + (session_id,), + ).fetchall() + finally: + conn.close() + + visible = _group_into_display_turns(rows) + + total = len(visible) + offset = (page - 1) * page_size + page_items = list(reversed(visible))[offset: offset + page_size] + page_items = list(reversed(page_items)) + + return { + "messages": page_items, + "total": total, + "page": page, + "page_size": page_size, + "has_more": offset + page_size < total, + } + + def get_stats(self) -> Dict[str, Any]: + """Return basic stats keyed by channel_type, for monitoring.""" + with self._lock: + conn = self._connect() + try: + total_sessions = conn.execute( + "SELECT COUNT(*) FROM sessions" + ).fetchone()[0] + total_messages = conn.execute( + "SELECT COUNT(*) FROM messages" + ).fetchone()[0] + by_channel = conn.execute( + """ + SELECT channel_type, COUNT(*) as cnt + FROM sessions + GROUP BY channel_type + ORDER BY cnt DESC + """ + ).fetchall() + return { + "total_sessions": total_sessions, + "total_messages": total_messages, + "by_channel": {row[0] or "unknown": row[1] for row in by_channel}, + } + finally: + conn.close() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _init_db(self) -> None: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + conn = self._connect() + try: + conn.executescript(_DDL) + conn.commit() + self._migrate(conn) + finally: + conn.close() + + def _migrate(self, conn: sqlite3.Connection) -> None: + """Apply incremental schema migrations on existing databases.""" + cols = { + row[1] + for row in conn.execute("PRAGMA table_info(sessions)").fetchall() + } + if "channel_type" not in cols: + try: + conn.execute(_MIGRATION_ADD_CHANNEL_TYPE) + conn.commit() + logger.info("[ConversationStore] Migrated: added channel_type column") + except Exception as e: + logger.warning(f"[ConversationStore] Migration failed: {e}") + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(str(self._db_path), timeout=10) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + return conn + + +# --------------------------------------------------------------------------- +# Singleton +# --------------------------------------------------------------------------- + +_store_instance: Optional[ConversationStore] = None +_store_lock = threading.Lock() + + +def get_conversation_store() -> ConversationStore: + """ + Return the process-wide ConversationStore singleton. + + Reuses the long-term memory database so the project stays with a single + SQLite file: ~/cow/memory/long-term/index.db + The conversation tables (sessions / messages) are separate from the + memory tables (memory_chunks / file_metadata) — no conflicts. + """ + global _store_instance + if _store_instance is not None: + return _store_instance + + with _store_lock: + if _store_instance is not None: + return _store_instance + + try: + from agent.memory.config import get_default_memory_config + db_path = get_default_memory_config().get_db_path() + except Exception: + from common.utils import expand_path + db_path = Path(expand_path("~/cow")) / "memory" / "long-term" / "index.db" + + _store_instance = ConversationStore(db_path) + logger.debug(f"[ConversationStore] Using shared DB at: {db_path}") + return _store_instance diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index 6c46568..b2dae79 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -325,6 +325,10 @@ class AgentBridge: logger.warning(f"[AgentBridge] Failed to attach context to scheduler: {e}") break + # Record message count before execution so we can diff new messages + with agent.messages_lock: + pre_run_len = len(agent.messages) + try: # Use agent's run_stream method with event handler response = agent.run_stream( @@ -336,9 +340,16 @@ class AgentBridge: # Restore original tools if context and context.get("is_scheduled_task"): agent.tools = original_tools - + # Log execution summary event_handler.log_summary() + + # Persist new messages generated during this run + if session_id: + channel_type = (context.get("channel_type") or "") if context else "" + with agent.messages_lock: + new_messages = agent.messages[pre_run_len:] + self._persist_messages(session_id, list(new_messages), channel_type) # Check if there are files to send (from read tool) if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'): @@ -475,6 +486,32 @@ class AgentBridge: except Exception as e: logger.warning(f"[AgentBridge] Failed to migrate API keys: {e}") + def _persist_messages( + self, session_id: str, new_messages: list, channel_type: str = "" + ) -> None: + """ + Persist new messages to the conversation store after each agent run. + + Failures are logged but never propagate — they must not interrupt replies. + """ + if not new_messages: + return + try: + from config import conf + if not conf().get("conversation_persistence", True): + return + except Exception: + pass + try: + from agent.memory import get_conversation_store + get_conversation_store().append_messages( + session_id, new_messages, channel_type=channel_type + ) + except Exception as e: + logger.warning( + f"[AgentBridge] Failed to persist messages for session={session_id}: {e}" + ) + def clear_session(self, session_id: str): """ Clear a specific session's agent and conversation history diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index b9aae38..b481d83 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -118,8 +118,41 @@ class AgentInitializer: # Attach memory manager if memory_manager: agent.memory_manager = memory_manager - + + # Restore persisted conversation history for this session + if session_id: + self._restore_conversation_history(agent, session_id) + return agent + + def _restore_conversation_history(self, agent, session_id: str) -> None: + """ + Load persisted conversation messages from SQLite and inject them + into the agent's in-memory message list. + + Only runs when conversation persistence is enabled (default: True). + Respects agent_max_context_turns to limit how many turns are loaded. + """ + from config import conf + if not conf().get("conversation_persistence", True): + return + + try: + from agent.memory import get_conversation_store + store = get_conversation_store() + max_turns = conf().get("agent_max_context_turns", 30) + saved = store.load_messages(session_id, max_turns=max_turns) + if saved: + with agent.messages_lock: + agent.messages = saved + logger.info( + f"[AgentInitializer] Restored {len(saved)} messages for session={session_id}" + ) + except Exception as e: + logger.warning( + f"[AgentInitializer] Failed to restore conversation history for " + f"session={session_id}: {e}" + ) def _load_env_file(self): """Load environment variables from .env file""" diff --git a/channel/web/static/js/console.js b/channel/web/static/js/console.js index 56ad3e6..d4a0f35 100644 --- a/channel/web/static/js/console.js +++ b/channel/web/static/js/console.js @@ -232,19 +232,37 @@ function renderMarkdown(text) { // ===================================================================== // Chat Module // ===================================================================== -let sessionId = generateSessionId(); let isPolling = false; let loadingContainers = {}; let activeStreams = {}; // request_id -> EventSource let isComposing = false; let appConfig = { use_agent: false, title: 'CowAgent', subtitle: '' }; +const SESSION_ID_KEY = 'cow_session_id'; + function generateSessionId() { return 'session_' + ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16) ); } +// Restore session_id from localStorage so conversation history survives page refresh. +// A new id is only generated when the user explicitly starts a new chat. +function loadOrCreateSessionId() { + const stored = localStorage.getItem(SESSION_ID_KEY); + if (stored) return stored; + const fresh = generateSessionId(); + localStorage.setItem(SESSION_ID_KEY, fresh); + return fresh; +} + +let sessionId = loadOrCreateSessionId(); + +// ---- Conversation history state ---- +let historyPage = 0; // last page fetched (0 = nothing fetched yet) +let historyHasMore = false; +let historyLoading = false; + fetch('/config').then(r => r.json()).then(data => { if (data.status === 'success') { appConfig = data; @@ -257,7 +275,9 @@ fetch('/config').then(r => r.json()).then(data => { document.getElementById('cfg-max-steps').textContent = data.agent_max_steps || '--'; document.getElementById('cfg-channel').textContent = data.channel_type || '--'; } -}).catch(() => {}); + // Load conversation history after config is ready + loadHistory(1); +}).catch(() => { loadHistory(1); }); const chatInput = document.getElementById('chat-input'); const sendBtn = document.getElementById('send-btn'); @@ -530,7 +550,7 @@ function startPolling() { poll(); } -function addUserMessage(content, timestamp) { +function createUserMessageEl(content, timestamp) { const el = document.createElement('div'); el.className = 'flex justify-end px-4 sm:px-6 py-3'; el.innerHTML = ` @@ -541,28 +561,139 @@ function addUserMessage(content, timestamp) {
${formatTime(timestamp)}
`; + return el; +} + +function renderToolCallsHtml(toolCalls) { + if (!toolCalls || toolCalls.length === 0) return ''; + return toolCalls.map(tc => { + const argsStr = formatToolArgs(tc.arguments || {}); + const resultStr = tc.result ? escapeHtml(String(tc.result)) : ''; + const hasResult = !!resultStr; + return ` +
+
+ + ${escapeHtml(tc.name || '')} + +
+
+
+
Input
+
${argsStr}
+
+ ${hasResult ? ` +
+
Output
+
${resultStr}
+
` : ''} +
+
`; + }).join(''); +} + +function createBotMessageEl(content, timestamp, requestId, toolCalls) { + const el = document.createElement('div'); + el.className = 'flex gap-3 px-4 sm:px-6 py-3'; + if (requestId) el.dataset.requestId = requestId; + const toolsHtml = renderToolCallsHtml(toolCalls); + el.innerHTML = ` + CowAgent +
+
+ ${toolsHtml ? `
${toolsHtml}
` : ''} +
${renderMarkdown(content)}
+
+
${formatTime(timestamp)}
+
+ `; + applyHighlighting(el); + return el; +} + +function addUserMessage(content, timestamp) { + const el = createUserMessageEl(content, timestamp); messagesDiv.appendChild(el); scrollChatToBottom(); } function addBotMessage(content, timestamp, requestId) { - const el = document.createElement('div'); - el.className = 'flex gap-3 px-4 sm:px-6 py-3'; - if (requestId) el.dataset.requestId = requestId; - el.innerHTML = ` - CowAgent -
-
- ${renderMarkdown(content)} -
-
${formatTime(timestamp)}
-
- `; + const el = createBotMessageEl(content, timestamp, requestId); messagesDiv.appendChild(el); - applyHighlighting(el); scrollChatToBottom(); } +// Load conversation history from the server (page 1 = most recent messages). +// Subsequent pages prepend older messages when the user scrolls to the top. +function loadHistory(page) { + if (historyLoading) return; + historyLoading = true; + + fetch(`/api/history?session_id=${encodeURIComponent(sessionId)}&page=${page}&page_size=20`) + .then(r => r.json()) + .then(data => { + if (data.status !== 'success' || data.messages.length === 0) return; + + const prevScrollHeight = messagesDiv.scrollHeight; + const isFirstLoad = page === 1; + + // On first load, remove the welcome screen if history exists + if (isFirstLoad) { + const ws = document.getElementById('welcome-screen'); + if (ws) ws.remove(); + } + + // Build a fragment of history message elements in chronological order + const fragment = document.createDocumentFragment(); + + if (data.has_more && page > 1) { + // Keep the "load more" sentinel in place (inserted below) + } + + data.messages.forEach(msg => { + const hasContent = msg.content && msg.content.trim(); + const hasToolCalls = msg.role === 'assistant' && msg.tool_calls && msg.tool_calls.length > 0; + if (!hasContent && !hasToolCalls) return; + const ts = new Date(msg.created_at * 1000); + const el = msg.role === 'user' + ? createUserMessageEl(msg.content, ts) + : createBotMessageEl(msg.content || '', ts, null, msg.tool_calls); + fragment.appendChild(el); + }); + + // Prepend history above any existing messages + const sentinel = document.getElementById('history-load-more'); + const insertBefore = sentinel ? sentinel.nextSibling : messagesDiv.firstChild; + messagesDiv.insertBefore(fragment, insertBefore); + + // Manage the "load more" sentinel at the very top + if (data.has_more) { + if (!document.getElementById('history-load-more')) { + const btn = document.createElement('div'); + btn.id = 'history-load-more'; + btn.className = 'flex justify-center py-3'; + btn.innerHTML = ``; + messagesDiv.insertBefore(btn, messagesDiv.firstChild); + } + } else { + const sentinel = document.getElementById('history-load-more'); + if (sentinel) sentinel.remove(); + } + + historyHasMore = data.has_more; + historyPage = page; + + if (isFirstLoad) { + scrollChatToBottom(); + } else { + // Restore scroll position so loading older messages doesn't jump the view + messagesDiv.scrollTop = messagesDiv.scrollHeight - prevScrollHeight; + } + }) + .catch(() => {}) + .finally(() => { historyLoading = false; }); +} + function addLoadingIndicator() { const el = document.createElement('div'); el.className = 'flex gap-3 px-4 sm:px-6 py-3'; @@ -586,7 +717,9 @@ function newChat() { Object.values(activeStreams).forEach(es => { try { es.close(); } catch (_) {} }); activeStreams = {}; + // Generate a fresh session and persist it so the next page load also starts clean sessionId = generateSessionId(); + localStorage.setItem(SESSION_ID_KEY, sessionId); isPolling = false; loadingContainers = {}; messagesDiv.innerHTML = ''; diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 690211e..68f604c 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -302,6 +302,7 @@ class WebChannel(ChatChannel): '/api/memory', 'MemoryHandler', '/api/memory/content', 'MemoryContentHandler', '/api/scheduler', 'SchedulerHandler', + '/api/history', 'HistoryHandler', '/api/logs', 'LogsHandler', '/assets/(.*)', 'AssetsHandler', ) @@ -471,6 +472,37 @@ class SchedulerHandler: return json.dumps({"status": "error", "message": str(e)}) +class HistoryHandler: + def GET(self): + """ + Return paginated conversation history for a session. + + Query params: + session_id (required) + page int, default 1 (1 = most recent messages) + page_size int, default 20 + """ + web.header('Content-Type', 'application/json; charset=utf-8') + web.header('Access-Control-Allow-Origin', '*') + try: + params = web.input(session_id='', page='1', page_size='20') + session_id = params.session_id.strip() + if not session_id: + return json.dumps({"status": "error", "message": "session_id required"}) + + from agent.memory import get_conversation_store + store = get_conversation_store() + result = store.load_history_page( + session_id=session_id, + page=int(params.page), + page_size=int(params.page_size), + ) + return json.dumps({"status": "success", **result}, ensure_ascii=False) + except Exception as e: + logger.error(f"[WebChannel] History API error: {e}") + return json.dumps({"status": "error", "message": str(e)}) + + class LogsHandler: def GET(self): """Stream the last N lines of run.log as SSE, then tail new lines."""