diff --git a/agent/chat/__init__.py b/agent/chat/__init__.py new file mode 100644 index 0000000..69a90a9 --- /dev/null +++ b/agent/chat/__init__.py @@ -0,0 +1,3 @@ +from agent.chat.service import ChatService + +__all__ = ["ChatService"] diff --git a/agent/chat/service.py b/agent/chat/service.py new file mode 100644 index 0000000..23a121a --- /dev/null +++ b/agent/chat/service.py @@ -0,0 +1,168 @@ +""" +ChatService - Wraps the Agent stream execution to produce CHAT protocol chunks. + +Translates agent events (message_update, message_end, tool_execution_end, etc.) +into the CHAT socket protocol format (content chunks with segment_id, tool_calls chunks). +""" + +import time +from typing import Callable, Optional + +from common.log import logger + + +class ChatService: + """ + High-level service that runs an Agent for a given query and streams + the results as CHAT protocol chunks via a callback. + + Usage: + svc = ChatService(agent_bridge) + svc.run(query, session_id, send_chunk_fn) + """ + + def __init__(self, agent_bridge): + """ + :param agent_bridge: AgentBridge instance (manages agent lifecycle) + """ + self.agent_bridge = agent_bridge + + def run(self, query: str, session_id: str, send_chunk_fn: Callable[[dict], None]): + """ + Run the agent for *query* and stream results back via *send_chunk_fn*. + + The method blocks until the agent finishes. After it returns the SDK + will automatically send the final (streaming=false) message. + + :param query: user query text + :param session_id: session identifier for agent isolation + :param send_chunk_fn: callable(chunk_data: dict) to send a streaming chunk + """ + agent = self.agent_bridge.get_agent(session_id=session_id) + if agent is None: + raise RuntimeError("Failed to initialise agent for the session") + + # State shared between the event callback and this method + state = _StreamState() + + def on_event(event: dict): + """Translate agent events into CHAT protocol chunks.""" + event_type = event.get("type") + data = event.get("data", {}) + + if event_type == "message_update": + # Incremental text delta + delta = data.get("delta", "") + if delta: + send_chunk_fn({ + "chunk_type": "content", + "delta": delta, + "segment_id": state.segment_id, + }) + + elif event_type == "message_end": + # A content segment finished. + tool_calls = data.get("tool_calls", []) + if tool_calls: + # After tool_calls are executed the next content will be + # a new segment; collect tool results until turn_end. + state.pending_tool_results = [] + + elif event_type == "tool_execution_end": + tool_name = data.get("tool_name", "") + arguments = data.get("arguments", {}) + result = data.get("result", "") + status = data.get("status", "unknown") + execution_time = data.get("execution_time", 0) + elapsed_str = f"{execution_time:.2f}s" + + # Serialise result to string if needed + if not isinstance(result, str): + import json + try: + result = json.dumps(result, ensure_ascii=False) + except Exception: + result = str(result) + + tool_info = { + "name": tool_name, + "arguments": arguments, + "result": result, + "status": status, + "elapsed": elapsed_str, + } + + if state.pending_tool_results is not None: + state.pending_tool_results.append(tool_info) + + elif event_type == "turn_end": + has_tool_calls = data.get("has_tool_calls", False) + if has_tool_calls and state.pending_tool_results: + # Flush collected tool results as a single tool_calls chunk + send_chunk_fn({ + "chunk_type": "tool_calls", + "tool_calls": state.pending_tool_results, + }) + state.pending_tool_results = None + # Next content belongs to a new segment + state.segment_id += 1 + + # Run the agent with our event callback --------------------------- + logger.info(f"[ChatService] Starting agent run: session={session_id}, query={query[:80]}") + + from config import conf + max_context_turns = conf().get("agent_max_context_turns", 30) + + # Get full system prompt with skills + full_system_prompt = agent.get_full_system_prompt() + + # Create a copy of messages for this execution + with agent.messages_lock: + messages_copy = agent.messages.copy() + original_length = len(agent.messages) + + from agent.protocol.agent_stream import AgentStreamExecutor + + executor = AgentStreamExecutor( + agent=agent, + model=agent.model, + system_prompt=full_system_prompt, + tools=agent.tools, + max_turns=agent.max_steps, + on_event=on_event, + messages=messages_copy, + max_context_turns=max_context_turns, + ) + + try: + response = executor.run_stream(query) + except Exception: + # If executor cleared messages (context overflow), sync back + if len(executor.messages) == 0: + with agent.messages_lock: + agent.messages.clear() + logger.info("[ChatService] Cleared agent message history after executor recovery") + raise + + # Append only the NEW messages from this execution (thread-safe) + with agent.messages_lock: + new_messages = executor.messages[original_length:] + agent.messages.extend(new_messages) + + # Store executor reference for files_to_send access + agent.stream_executor = executor + + # Execute post-process tools + agent._execute_post_process_tools() + + logger.info(f"[ChatService] Agent run completed: session={session_id}") + + +class _StreamState: + """Mutable state shared between the event callback and the run method.""" + + def __init__(self): + self.segment_id: int = 0 + # None means we are not accumulating tool results right now. + # A list means we are in the middle of a tool-execution phase. + self.pending_tool_results: Optional[list] = None diff --git a/bridge/agent_event_handler.py b/bridge/agent_event_handler.py index 17a0920..8bc7f62 100644 --- a/bridge/agent_event_handler.py +++ b/bridge/agent_event_handler.py @@ -74,7 +74,7 @@ class AgentEventHandler: # Only send thinking process if followed by tool calls if tool_calls: if self.current_thinking.strip(): - logger.debug(f"💭 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}") + logger.info(f"💭 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}") # Send thinking process to channel self._send_to_channel(f"{self.current_thinking.strip()}") else: diff --git a/common/cloud_client.py b/common/cloud_client.py index c3dd1b8..de3af47 100644 --- a/common/cloud_client.py +++ b/common/cloud_client.py @@ -28,6 +28,7 @@ class CloudClient(LinkAIClient): self.channel_mgr = None self._skill_service = None self._memory_service = None + self._chat_service = None @property def skill_service(self): @@ -61,6 +62,20 @@ class CloudClient(LinkAIClient): logger.error(f"[CloudClient] Failed to init MemoryService: {e}") return self._memory_service + @property + def chat_service(self): + """Lazy-init ChatService (requires AgentBridge via Bridge singleton).""" + if self._chat_service is None: + try: + from agent.chat.service import ChatService + from bridge.bridge import Bridge + agent_bridge = Bridge().get_agent_bridge() + self._chat_service = ChatService(agent_bridge) + logger.debug("[CloudClient] ChatService initialised") + except Exception as e: + logger.error(f"[CloudClient] Failed to init ChatService: {e}") + return self._chat_service + # ------------------------------------------------------------------ # message push callback # ------------------------------------------------------------------ @@ -223,6 +238,28 @@ class CloudClient(LinkAIClient): return svc.dispatch(action, payload) + # ------------------------------------------------------------------ + # chat callback + # ------------------------------------------------------------------ + def on_chat(self, data: dict, send_chunk_fn): + """ + Handle CHAT messages from the cloud console. + Runs the agent in streaming mode and sends chunks back via send_chunk_fn. + + :param data: message data with 'action' and 'payload' (query, session_id) + :param send_chunk_fn: callable(chunk_data: dict) to send one streaming chunk + """ + payload = data.get("payload", {}) + query = payload.get("query", "") + session_id = payload.get("session_id", "cloud_console") + logger.info(f"[CloudClient] on_chat: session={session_id}, query={query[:80]}") + + svc = self.chat_service + if svc is None: + raise RuntimeError("ChatService not available") + + svc.run(query=query, session_id=session_id, send_chunk_fn=send_chunk_fn) + # ------------------------------------------------------------------ # channel restart helpers # ------------------------------------------------------------------