diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index 31c9359..117d6f9 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -636,11 +636,16 @@ class AgentStreamExecutor: ]) # Check if error is message format error (incomplete tool_use/tool_result pairs) - # This happens when previous conversation had tool failures + # This happens when previous conversation had tool failures or context trimming + # broke tool_use/tool_result pairs. is_message_format_error = any(keyword in error_str_lower for keyword in [ 'tool_use', 'tool_result', 'without', 'immediately after', - 'corresponding', 'must have', 'each' - ]) and 'status: 400' in error_str_lower + 'corresponding', 'must have', 'each', + 'tool_call_id', 'is not found', 'tool_calls', + 'must be a response to a preceeding message' + ]) and ('400' in error_str_lower or 'status: 400' in error_str_lower + or 'invalid_request' in error_str_lower + or 'invalidparameter' in error_str_lower) if is_context_overflow or is_message_format_error: error_type = "context overflow" if is_context_overflow else "message format error" @@ -659,9 +664,10 @@ class AgentStreamExecutor: ) # Aggressive trim didn't help or this is a message format error - # -> clear everything + # -> clear everything and also purge DB to prevent reload of dirty data logger.warning("🔄 Clearing conversation history to recover") self.messages.clear() + self._clear_session_db() if is_context_overflow: raise Exception( "抱歉,对话历史过长导致上下文溢出。我已清空历史记录,请重新描述你的需求。" @@ -907,23 +913,127 @@ class AgentStreamExecutor: def _validate_and_fix_messages(self): """ Validate message history and fix incomplete tool_use/tool_result pairs. - Claude API requires each tool_use to have a corresponding tool_result immediately after. + + All LLM APIs (OpenAI, Claude, Moonshot, DashScope) require: + 1. Each tool_use in an assistant message must have a matching tool_result + in the immediately following user message. + 2. Each tool_result in a user message must reference a tool_use_id that + exists in the preceding assistant message. + + This method performs a full scan and removes any messages that would + cause a 400 error due to broken tool_use/tool_result pairing. """ if not self.messages: return - - # Check last message for incomplete tool_use - if len(self.messages) > 0: + + # Pass 1: remove trailing incomplete tool_use (assistant with tool_use + # but no following tool_result) + while self.messages: last_msg = self.messages[-1] if last_msg.get("role") == "assistant": - # Check if assistant message has tool_use blocks content = last_msg.get("content", []) - if isinstance(content, list): - has_tool_use = any(block.get("type") == "tool_use" for block in content) - if has_tool_use: - # This is incomplete - remove it - logger.warning(f"⚠️ Removing incomplete tool_use message from history") - self.messages.pop() + if isinstance(content, list) and any( + isinstance(b, dict) and b.get("type") == "tool_use" + for b in content + ): + logger.warning("⚠️ Removing trailing incomplete tool_use assistant message") + self.messages.pop() + continue + break + + # Pass 2: full scan for orphaned tool_result and missing tool_result + removed = 0 + i = 0 + while i < len(self.messages): + msg = self.messages[i] + role = msg.get("role") + content = msg.get("content", []) + + if role == "assistant" and isinstance(content, list): + tool_use_ids = { + b.get("id") + for b in content + if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id") + } + if tool_use_ids: + # There must be a following user message with matching tool_results + next_idx = i + 1 + if next_idx >= len(self.messages): + # No following message at all — remove + logger.warning(f"⚠️ Removing assistant tool_use at index {i} (no following tool_result)") + self.messages.pop(i) + removed += 1 + continue + + next_msg = self.messages[next_idx] + next_content = next_msg.get("content", []) + if next_msg.get("role") != "user" or not isinstance(next_content, list): + # Next message is not a user message with tool_results + logger.warning(f"⚠️ Removing assistant tool_use at index {i} (next message is not tool_result)") + self.messages.pop(i) + removed += 1 + continue + + result_ids = { + b.get("tool_use_id") + for b in next_content + if isinstance(b, dict) and b.get("type") == "tool_result" + } + if not tool_use_ids.issubset(result_ids): + # Some tool_use ids have no matching result — remove both + logger.warning( + f"⚠️ Removing mismatched tool_use/result pair at index {i},{next_idx} " + f"(use_ids={tool_use_ids}, result_ids={result_ids})" + ) + self.messages.pop(next_idx) + self.messages.pop(i) + removed += 2 + continue + + elif role == "user" and isinstance(content, list): + has_tool_results = any( + isinstance(b, dict) and b.get("type") == "tool_result" + for b in content + ) + if has_tool_results: + # Check that the preceding message is an assistant with matching tool_use + if i == 0: + logger.warning(f"⚠️ Removing orphaned tool_result at index {i} (no preceding assistant)") + self.messages.pop(i) + removed += 1 + continue + + prev_msg = self.messages[i - 1] + prev_content = prev_msg.get("content", []) + if prev_msg.get("role") != "assistant" or not isinstance(prev_content, list): + logger.warning(f"⚠️ Removing orphaned tool_result at index {i} (prev is not assistant)") + self.messages.pop(i) + removed += 1 + continue + + prev_use_ids = { + b.get("id") + for b in prev_content + if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id") + } + result_ids = { + b.get("tool_use_id") + for b in content + if isinstance(b, dict) and b.get("type") == "tool_result" + } + if not result_ids.issubset(prev_use_ids): + logger.warning( + f"⚠️ Removing orphaned tool_result at index {i} " + f"(result_ids={result_ids} not in prev use_ids={prev_use_ids})" + ) + self.messages.pop(i) + removed += 1 + continue + + i += 1 + + if removed > 0: + logger.info(f"🔧 Message validation: removed {removed} broken message(s)") def _identify_complete_turns(self) -> List[Dict]: """ @@ -946,24 +1056,30 @@ class AgentStreamExecutor: content = msg.get('content', []) if role == 'user': - # 检查是否是用户查询(不是工具结果) + # Determine if this is a real user query (not a tool_result injection + # or an internal hint message injected by the agent loop). is_user_query = False + has_tool_result = False if isinstance(content, list): - is_user_query = any( - block.get('type') == 'text' - for block in content - if isinstance(block, dict) + has_text = any( + isinstance(block, dict) and block.get('type') == 'text' + for block in content ) + has_tool_result = any( + isinstance(block, dict) and block.get('type') == 'tool_result' + for block in content + ) + # A message with tool_result is always internal, even if it + # also contains text blocks (shouldn't happen, but be safe). + is_user_query = has_text and not has_tool_result elif isinstance(content, str): is_user_query = True if is_user_query: - # 开始新轮次 if current_turn['messages']: turns.append(current_turn) current_turn = {'messages': [msg]} else: - # 工具结果,属于当前轮次 current_turn['messages'].append(msg) else: # AI 回复,属于当前轮次 @@ -1252,6 +1368,24 @@ class AgentStreamExecutor: f"~{current_tokens + system_tokens} -> ~{accumulated_tokens + system_tokens} tokens)" ) + def _clear_session_db(self): + """ + Clear the current session's persisted messages from SQLite DB. + + This prevents dirty data (broken tool_use/tool_result pairs) from being + reloaded on the next request or after a restart. + """ + try: + session_id = getattr(self.agent, '_current_session_id', None) + if not session_id: + return + from agent.memory import get_conversation_store + store = get_conversation_store() + store.clear_session(session_id) + logger.info(f"🗑️ Cleared dirty session data from DB: {session_id}") + except Exception as e: + logger.warning(f"Failed to clear session DB: {e}") + def _prepare_messages(self) -> List[Dict[str, Any]]: """ Prepare messages to send to LLM diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index d7401ef..a233852 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -332,9 +332,10 @@ class AgentBridge: Returns: Reply object """ + session_id = None + agent = None try: # Extract session_id from context for user isolation - session_id = None if context: session_id = context.kwargs.get("session_id") or context.get("session_id") @@ -371,6 +372,9 @@ class AgentBridge: if context and hasattr(agent, 'model'): agent.model.channel_type = context.get("channel_type", "") + # Store session_id on agent so executor can clear DB on fatal errors + agent._current_session_id = session_id + # Record message count before execution so we can diff new messages with agent.messages_lock: pre_run_len = len(agent.messages) @@ -395,7 +399,17 @@ class AgentBridge: channel_type = (context.get("channel_type") or "") if context else "" with agent.messages_lock: new_messages = agent.messages[pre_run_len:] - self._persist_messages(session_id, list(new_messages), channel_type) + if new_messages: + self._persist_messages(session_id, list(new_messages), channel_type) + elif pre_run_len > 0 and len(agent.messages) == 0: + # Agent cleared its messages (recovery from format error / overflow) + # Also clear the DB to prevent reloading dirty data + try: + from agent.memory import get_conversation_store + get_conversation_store().clear_session(session_id) + logger.info(f"[AgentBridge] Cleared DB for recovered session: {session_id}") + except Exception as e: + logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}") # Check if there are files to send (from read tool) if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'): @@ -415,6 +429,18 @@ class AgentBridge: except Exception as e: logger.error(f"Agent reply error: {e}") + # If the agent cleared its messages due to format error / overflow, + # also purge the DB so the next request starts clean. + if session_id and agent: + try: + with agent.messages_lock: + msg_count = len(agent.messages) + if msg_count == 0: + from agent.memory import get_conversation_store + get_conversation_store().clear_session(session_id) + logger.info(f"[AgentBridge] Cleared DB for session after error: {session_id}") + except Exception as db_err: + logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}") return Reply(ReplyType.ERROR, f"Agent error: {str(e)}") def _create_file_reply(self, file_info: dict, text_response: str, context: Context = None) -> Reply: diff --git a/docs/en/intro/architecture.mdx b/docs/en/intro/architecture.mdx index 3cdf619..cb1c7d7 100644 --- a/docs/en/intro/architecture.mdx +++ b/docs/en/intro/architecture.mdx @@ -33,18 +33,24 @@ When Agent mode is enabled, CowAgent runs as an autonomous agent with the follow ## Workspace Directory Structure -The Agent workspace is located at `~/.cow` by default and stores system prompts, memory files, and skill files: +The Agent workspace is located at `~/cow` by default and stores system prompts, memory files, and skill files: ``` -~/.cow/ +~/cow/ ├── system.md # Agent system prompt ├── user.md # User profile ├── memory/ # Long-term memory storage │ ├── core.md # Core memory │ └── daily/ # Daily memory -├── skills/ # Custom skills -│ ├── skill-1/ -│ └── skill-2/ +└── skills/ # Custom skills + ├── skill-1/ + └── skill-2/ +``` + +Secret keys are stored separately in `~/.cow` directory for security: + +``` +~/.cow/ └── .env # Secret keys for skills ``` @@ -55,7 +61,7 @@ Configure Agent mode parameters in `config.json`: ```json { "agent": true, - "agent_workspace": "~/.cow", + "agent_workspace": "~/cow", "agent_max_context_tokens": 40000, "agent_max_context_turns": 30, "agent_max_steps": 15 @@ -65,7 +71,7 @@ Configure Agent mode parameters in `config.json`: | Parameter | Description | Default | | --- | --- | --- | | `agent` | Enable Agent mode | `true` | -| `agent_workspace` | Workspace path | `~/.cow` | +| `agent_workspace` | Workspace path | `~/cow` | | `agent_max_context_tokens` | Max context tokens | `40000` | | `agent_max_context_turns` | Max context turns | `30` | | `agent_max_steps` | Max decision steps per task | `15` | diff --git a/docs/intro/architecture.mdx b/docs/intro/architecture.mdx index 67aff37..deae645 100644 --- a/docs/intro/architecture.mdx +++ b/docs/intro/architecture.mdx @@ -33,18 +33,24 @@ CowAgent 的整体架构由以下核心模块组成: ## 工作空间 -Agent 的工作空间默认位于 `~/.cow` 目录,用于存储系统提示词、记忆文件、技能文件等: +Agent 的工作空间默认位于 `~/cow` 目录,用于存储系统提示词、记忆文件、技能文件等: ``` -~/.cow/ +~/cow/ ├── system.md # Agent system prompt ├── user.md # User profile ├── memory/ # Long-term memory storage │ ├── core.md # Core memory │ └── daily/ # Daily memory -├── skills/ # Custom skills -│ ├── skill-1/ -│ └── skill-2/ +└── skills/ # Custom skills + ├── skill-1/ + └── skill-2/ +``` + +秘钥文件单独存储在 `~/.cow` 目录(出于安全考虑): + +``` +~/.cow/ └── .env # Secret keys for skills ``` @@ -55,7 +61,7 @@ Agent 的工作空间默认位于 `~/.cow` 目录,用于存储系统提示词 ```json { "agent": true, - "agent_workspace": "~/.cow", + "agent_workspace": "~/cow", "agent_max_context_tokens": 40000, "agent_max_context_turns": 30, "agent_max_steps": 15 @@ -65,7 +71,7 @@ Agent 的工作空间默认位于 `~/.cow` 目录,用于存储系统提示词 | 参数 | 说明 | 默认值 | | --- | --- | --- | | `agent` | 是否启用 Agent 模式 | `true` | -| `agent_workspace` | 工作空间路径 | `~/.cow` | +| `agent_workspace` | 工作空间路径 | `~/cow` | | `agent_max_context_tokens` | 最大上下文 token 数 | `40000` | | `agent_max_context_turns` | 最大上下文记忆轮次 | `30` | | `agent_max_steps` | 单次任务最大决策步数 | `15` |