fix: incomplete historical session messages

This commit is contained in:
zhayujie
2026-02-28 15:03:33 +08:00
parent fccfa92d7e
commit b788a3dd4e
4 changed files with 210 additions and 38 deletions

View File

@@ -636,11 +636,16 @@ class AgentStreamExecutor:
])
# Check if error is message format error (incomplete tool_use/tool_result pairs)
# This happens when previous conversation had tool failures
# This happens when previous conversation had tool failures or context trimming
# broke tool_use/tool_result pairs.
is_message_format_error = any(keyword in error_str_lower for keyword in [
'tool_use', 'tool_result', 'without', 'immediately after',
'corresponding', 'must have', 'each'
]) and 'status: 400' in error_str_lower
'corresponding', 'must have', 'each',
'tool_call_id', 'is not found', 'tool_calls',
'must be a response to a preceeding message'
]) and ('400' in error_str_lower or 'status: 400' in error_str_lower
or 'invalid_request' in error_str_lower
or 'invalidparameter' in error_str_lower)
if is_context_overflow or is_message_format_error:
error_type = "context overflow" if is_context_overflow else "message format error"
@@ -659,9 +664,10 @@ class AgentStreamExecutor:
)
# Aggressive trim didn't help or this is a message format error
# -> clear everything
# -> clear everything and also purge DB to prevent reload of dirty data
logger.warning("🔄 Clearing conversation history to recover")
self.messages.clear()
self._clear_session_db()
if is_context_overflow:
raise Exception(
"抱歉,对话历史过长导致上下文溢出。我已清空历史记录,请重新描述你的需求。"
@@ -907,23 +913,127 @@ class AgentStreamExecutor:
def _validate_and_fix_messages(self):
"""
Validate message history and fix incomplete tool_use/tool_result pairs.
Claude API requires each tool_use to have a corresponding tool_result immediately after.
All LLM APIs (OpenAI, Claude, Moonshot, DashScope) require:
1. Each tool_use in an assistant message must have a matching tool_result
in the immediately following user message.
2. Each tool_result in a user message must reference a tool_use_id that
exists in the preceding assistant message.
This method performs a full scan and removes any messages that would
cause a 400 error due to broken tool_use/tool_result pairing.
"""
if not self.messages:
return
# Check last message for incomplete tool_use
if len(self.messages) > 0:
# Pass 1: remove trailing incomplete tool_use (assistant with tool_use
# but no following tool_result)
while self.messages:
last_msg = self.messages[-1]
if last_msg.get("role") == "assistant":
# Check if assistant message has tool_use blocks
content = last_msg.get("content", [])
if isinstance(content, list):
has_tool_use = any(block.get("type") == "tool_use" for block in content)
if has_tool_use:
# This is incomplete - remove it
logger.warning(f"⚠️ Removing incomplete tool_use message from history")
self.messages.pop()
if isinstance(content, list) and any(
isinstance(b, dict) and b.get("type") == "tool_use"
for b in content
):
logger.warning("⚠️ Removing trailing incomplete tool_use assistant message")
self.messages.pop()
continue
break
# Pass 2: full scan for orphaned tool_result and missing tool_result
removed = 0
i = 0
while i < len(self.messages):
msg = self.messages[i]
role = msg.get("role")
content = msg.get("content", [])
if role == "assistant" and isinstance(content, list):
tool_use_ids = {
b.get("id")
for b in content
if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id")
}
if tool_use_ids:
# There must be a following user message with matching tool_results
next_idx = i + 1
if next_idx >= len(self.messages):
# No following message at all — remove
logger.warning(f"⚠️ Removing assistant tool_use at index {i} (no following tool_result)")
self.messages.pop(i)
removed += 1
continue
next_msg = self.messages[next_idx]
next_content = next_msg.get("content", [])
if next_msg.get("role") != "user" or not isinstance(next_content, list):
# Next message is not a user message with tool_results
logger.warning(f"⚠️ Removing assistant tool_use at index {i} (next message is not tool_result)")
self.messages.pop(i)
removed += 1
continue
result_ids = {
b.get("tool_use_id")
for b in next_content
if isinstance(b, dict) and b.get("type") == "tool_result"
}
if not tool_use_ids.issubset(result_ids):
# Some tool_use ids have no matching result — remove both
logger.warning(
f"⚠️ Removing mismatched tool_use/result pair at index {i},{next_idx} "
f"(use_ids={tool_use_ids}, result_ids={result_ids})"
)
self.messages.pop(next_idx)
self.messages.pop(i)
removed += 2
continue
elif role == "user" and isinstance(content, list):
has_tool_results = any(
isinstance(b, dict) and b.get("type") == "tool_result"
for b in content
)
if has_tool_results:
# Check that the preceding message is an assistant with matching tool_use
if i == 0:
logger.warning(f"⚠️ Removing orphaned tool_result at index {i} (no preceding assistant)")
self.messages.pop(i)
removed += 1
continue
prev_msg = self.messages[i - 1]
prev_content = prev_msg.get("content", [])
if prev_msg.get("role") != "assistant" or not isinstance(prev_content, list):
logger.warning(f"⚠️ Removing orphaned tool_result at index {i} (prev is not assistant)")
self.messages.pop(i)
removed += 1
continue
prev_use_ids = {
b.get("id")
for b in prev_content
if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id")
}
result_ids = {
b.get("tool_use_id")
for b in content
if isinstance(b, dict) and b.get("type") == "tool_result"
}
if not result_ids.issubset(prev_use_ids):
logger.warning(
f"⚠️ Removing orphaned tool_result at index {i} "
f"(result_ids={result_ids} not in prev use_ids={prev_use_ids})"
)
self.messages.pop(i)
removed += 1
continue
i += 1
if removed > 0:
logger.info(f"🔧 Message validation: removed {removed} broken message(s)")
def _identify_complete_turns(self) -> List[Dict]:
"""
@@ -946,24 +1056,30 @@ class AgentStreamExecutor:
content = msg.get('content', [])
if role == 'user':
# 检查是否是用户查询(不是工具结果)
# Determine if this is a real user query (not a tool_result injection
# or an internal hint message injected by the agent loop).
is_user_query = False
has_tool_result = False
if isinstance(content, list):
is_user_query = any(
block.get('type') == 'text'
for block in content
if isinstance(block, dict)
has_text = any(
isinstance(block, dict) and block.get('type') == 'text'
for block in content
)
has_tool_result = any(
isinstance(block, dict) and block.get('type') == 'tool_result'
for block in content
)
# A message with tool_result is always internal, even if it
# also contains text blocks (shouldn't happen, but be safe).
is_user_query = has_text and not has_tool_result
elif isinstance(content, str):
is_user_query = True
if is_user_query:
# 开始新轮次
if current_turn['messages']:
turns.append(current_turn)
current_turn = {'messages': [msg]}
else:
# 工具结果,属于当前轮次
current_turn['messages'].append(msg)
else:
# AI 回复,属于当前轮次
@@ -1252,6 +1368,24 @@ class AgentStreamExecutor:
f"~{current_tokens + system_tokens} -> ~{accumulated_tokens + system_tokens} tokens)"
)
def _clear_session_db(self):
"""
Clear the current session's persisted messages from SQLite DB.
This prevents dirty data (broken tool_use/tool_result pairs) from being
reloaded on the next request or after a restart.
"""
try:
session_id = getattr(self.agent, '_current_session_id', None)
if not session_id:
return
from agent.memory import get_conversation_store
store = get_conversation_store()
store.clear_session(session_id)
logger.info(f"🗑️ Cleared dirty session data from DB: {session_id}")
except Exception as e:
logger.warning(f"Failed to clear session DB: {e}")
def _prepare_messages(self) -> List[Dict[str, Any]]:
"""
Prepare messages to send to LLM

View File

@@ -332,9 +332,10 @@ class AgentBridge:
Returns:
Reply object
"""
session_id = None
agent = None
try:
# Extract session_id from context for user isolation
session_id = None
if context:
session_id = context.kwargs.get("session_id") or context.get("session_id")
@@ -371,6 +372,9 @@ class AgentBridge:
if context and hasattr(agent, 'model'):
agent.model.channel_type = context.get("channel_type", "")
# Store session_id on agent so executor can clear DB on fatal errors
agent._current_session_id = session_id
# Record message count before execution so we can diff new messages
with agent.messages_lock:
pre_run_len = len(agent.messages)
@@ -395,7 +399,17 @@ class AgentBridge:
channel_type = (context.get("channel_type") or "") if context else ""
with agent.messages_lock:
new_messages = agent.messages[pre_run_len:]
self._persist_messages(session_id, list(new_messages), channel_type)
if new_messages:
self._persist_messages(session_id, list(new_messages), channel_type)
elif pre_run_len > 0 and len(agent.messages) == 0:
# Agent cleared its messages (recovery from format error / overflow)
# Also clear the DB to prevent reloading dirty data
try:
from agent.memory import get_conversation_store
get_conversation_store().clear_session(session_id)
logger.info(f"[AgentBridge] Cleared DB for recovered session: {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}")
# Check if there are files to send (from read tool)
if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
@@ -415,6 +429,18 @@ class AgentBridge:
except Exception as e:
logger.error(f"Agent reply error: {e}")
# If the agent cleared its messages due to format error / overflow,
# also purge the DB so the next request starts clean.
if session_id and agent:
try:
with agent.messages_lock:
msg_count = len(agent.messages)
if msg_count == 0:
from agent.memory import get_conversation_store
get_conversation_store().clear_session(session_id)
logger.info(f"[AgentBridge] Cleared DB for session after error: {session_id}")
except Exception as db_err:
logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}")
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
def _create_file_reply(self, file_info: dict, text_response: str, context: Context = None) -> Reply:

View File

@@ -33,18 +33,24 @@ When Agent mode is enabled, CowAgent runs as an autonomous agent with the follow
## Workspace Directory Structure
The Agent workspace is located at `~/.cow` by default and stores system prompts, memory files, and skill files:
The Agent workspace is located at `~/cow` by default and stores system prompts, memory files, and skill files:
```
~/.cow/
~/cow/
├── system.md # Agent system prompt
├── user.md # User profile
├── memory/ # Long-term memory storage
│ ├── core.md # Core memory
│ └── daily/ # Daily memory
── skills/ # Custom skills
├── skill-1/
└── skill-2/
── skills/ # Custom skills
├── skill-1/
└── skill-2/
```
Secret keys are stored separately in `~/.cow` directory for security:
```
~/.cow/
└── .env # Secret keys for skills
```
@@ -55,7 +61,7 @@ Configure Agent mode parameters in `config.json`:
```json
{
"agent": true,
"agent_workspace": "~/.cow",
"agent_workspace": "~/cow",
"agent_max_context_tokens": 40000,
"agent_max_context_turns": 30,
"agent_max_steps": 15
@@ -65,7 +71,7 @@ Configure Agent mode parameters in `config.json`:
| Parameter | Description | Default |
| --- | --- | --- |
| `agent` | Enable Agent mode | `true` |
| `agent_workspace` | Workspace path | `~/.cow` |
| `agent_workspace` | Workspace path | `~/cow` |
| `agent_max_context_tokens` | Max context tokens | `40000` |
| `agent_max_context_turns` | Max context turns | `30` |
| `agent_max_steps` | Max decision steps per task | `15` |

View File

@@ -33,18 +33,24 @@ CowAgent 的整体架构由以下核心模块组成:
## 工作空间
Agent 的工作空间默认位于 `~/.cow` 目录,用于存储系统提示词、记忆文件、技能文件等:
Agent 的工作空间默认位于 `~/cow` 目录,用于存储系统提示词、记忆文件、技能文件等:
```
~/.cow/
~/cow/
├── system.md # Agent system prompt
├── user.md # User profile
├── memory/ # Long-term memory storage
│ ├── core.md # Core memory
│ └── daily/ # Daily memory
── skills/ # Custom skills
├── skill-1/
└── skill-2/
── skills/ # Custom skills
├── skill-1/
└── skill-2/
```
秘钥文件单独存储在 `~/.cow` 目录(出于安全考虑):
```
~/.cow/
└── .env # Secret keys for skills
```
@@ -55,7 +61,7 @@ Agent 的工作空间默认位于 `~/.cow` 目录,用于存储系统提示词
```json
{
"agent": true,
"agent_workspace": "~/.cow",
"agent_workspace": "~/cow",
"agent_max_context_tokens": 40000,
"agent_max_context_turns": 30,
"agent_max_steps": 15
@@ -65,7 +71,7 @@ Agent 的工作空间默认位于 `~/.cow` 目录,用于存储系统提示词
| 参数 | 说明 | 默认值 |
| --- | --- | --- |
| `agent` | 是否启用 Agent 模式 | `true` |
| `agent_workspace` | 工作空间路径 | `~/.cow` |
| `agent_workspace` | 工作空间路径 | `~/cow` |
| `agent_max_context_tokens` | 最大上下文 token 数 | `40000` |
| `agent_max_context_turns` | 最大上下文记忆轮次 | `30` |
| `agent_max_steps` | 单次任务最大决策步数 | `15` |