diff --git a/agent/prompt/builder.py b/agent/prompt/builder.py
index 25179ba..1727c0b 100644
--- a/agent/prompt/builder.py
+++ b/agent/prompt/builder.py
@@ -237,8 +237,8 @@ def _build_tooling_section(tools: List[Any], language: str) -> List[str]:
"叙述要求: 保持简洁、信息密度高,避免重复显而易见的步骤。",
"",
"完成标准:",
- "- 确保用户的需求得到实际解决,而不仅仅是制定计划",
- "- 当任务需要多次工具调用时,持续推进直到完成",
+ "- 确保用户的需求得到实际解决,而不仅仅是制定计划。",
+ "- 当任务需要多次工具调用时,持续推进直到完成, 解决完后向用户报告结果或回复用户的问题",
"- 每次工具调用后,评估是否已获得足够信息来推进或完成任务",
"- 避免重复调用相同的工具和相同参数获取相同的信息,除非用户明确要求",
"",
diff --git a/agent/protocol/agent.py b/agent/protocol/agent.py
index 5c4f994..6759ebc 100644
--- a/agent/protocol/agent.py
+++ b/agent/protocol/agent.py
@@ -360,6 +360,9 @@ class Agent:
# Update agent's message history from executor
self.messages = executor.messages
+
+ # Store executor reference for agent_bridge to access files_to_send
+ self.stream_executor = executor
# Execute all post-process tools
self._execute_post_process_tools()
diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py
index 7ea1eeb..49812b9 100644
--- a/agent/protocol/agent_stream.py
+++ b/agent/protocol/agent_stream.py
@@ -58,6 +58,9 @@ class AgentStreamExecutor:
# Tool failure tracking for retry protection
self.tool_failure_history = [] # List of (tool_name, args_hash, success) tuples
+
+ # Track files to send (populated by read tool)
+ self.files_to_send = [] # List of file metadata dicts
def _emit_event(self, event_type: str, data: dict = None):
"""Emit event"""
@@ -191,21 +194,47 @@ class AgentStreamExecutor:
logger.info(
f"Memory flush recommended: tokens={current_tokens}, turns={self.agent.memory_manager.flush_manager.turn_count}")
- # Call LLM
- assistant_msg, tool_calls = self._call_llm_stream()
+ # Call LLM (enable retry_on_empty for better reliability)
+ assistant_msg, tool_calls = self._call_llm_stream(retry_on_empty=True)
final_response = assistant_msg
# No tool calls, end loop
if not tool_calls:
# 检查是否返回了空响应
if not assistant_msg:
- logger.warning(f"[Agent] LLM returned empty response (no content and no tool calls)")
+ logger.warning(f"[Agent] LLM returned empty response after retry (no content and no tool calls)")
+ logger.info(f"[Agent] This usually happens when LLM thinks the task is complete after tool execution")
- # 生成通用的友好提示
- final_response = (
- "抱歉,我暂时无法生成回复。请尝试换一种方式描述你的需求,或稍后再试。"
- )
- logger.info(f"Generated fallback response for empty LLM output")
+ # 如果之前有工具调用,强制要求 LLM 生成文本回复
+ if turn > 1:
+ logger.info(f"[Agent] Requesting explicit response from LLM...")
+
+ # 添加一条消息,明确要求回复用户
+ self.messages.append({
+ "role": "user",
+ "content": [{
+ "type": "text",
+ "text": "请向用户说明刚才工具执行的结果或回答用户的问题。"
+ }]
+ })
+
+ # 再调用一次 LLM
+ assistant_msg, tool_calls = self._call_llm_stream(retry_on_empty=False)
+ final_response = assistant_msg
+
+ # 如果还是空,才使用 fallback
+ if not assistant_msg and not tool_calls:
+ logger.warning(f"[Agent] Still empty after explicit request")
+ final_response = (
+ "抱歉,我暂时无法生成回复。请尝试换一种方式描述你的需求,或稍后再试。"
+ )
+ logger.info(f"Generated fallback response for empty LLM output")
+ else:
+ # 第一轮就空回复,直接 fallback
+ final_response = (
+ "抱歉,我暂时无法生成回复。请尝试换一种方式描述你的需求,或稍后再试。"
+ )
+ logger.info(f"Generated fallback response for empty LLM output")
else:
logger.info(f"💭 {assistant_msg[:150]}{'...' if len(assistant_msg) > 150 else ''}")
@@ -235,6 +264,14 @@ class AgentStreamExecutor:
result = self._execute_tool(tool_call)
tool_results.append(result)
+ # Check if this is a file to send (from read tool)
+ if result.get("status") == "success" and isinstance(result.get("result"), dict):
+ result_data = result.get("result")
+ if result_data.get("type") == "file_to_send":
+ # Store file metadata for later sending
+ self.files_to_send.append(result_data)
+ logger.info(f"📎 检测到待发送文件: {result_data.get('file_name', result_data.get('path'))}")
+
# Check for critical error - abort entire conversation
if result.get("status") == "critical_error":
logger.error(f"💥 检测到严重错误,终止对话")
@@ -392,6 +429,7 @@ class AgentStreamExecutor:
# Streaming response
full_content = ""
tool_calls_buffer = {} # {index: {id, name, arguments}}
+ stop_reason = None # Track why the stream stopped
try:
stream = self.model.call_stream(request)
@@ -404,21 +442,47 @@ class AgentStreamExecutor:
if isinstance(error_data, dict):
error_msg = error_data.get("message", chunk.get("message", "Unknown error"))
error_code = error_data.get("code", "")
+ error_type = error_data.get("type", "")
else:
error_msg = chunk.get("message", str(error_data))
error_code = ""
+ error_type = ""
status_code = chunk.get("status_code", "N/A")
- logger.error(f"API Error: {error_msg} (Status: {status_code}, Code: {error_code})")
- logger.error(f"Full error chunk: {chunk}")
- # Raise exception with full error message for retry logic
- raise Exception(f"{error_msg} (Status: {status_code})")
+ # Log error with all available information
+ logger.error(f"🔴 Stream API Error:")
+ logger.error(f" Message: {error_msg}")
+ logger.error(f" Status Code: {status_code}")
+ logger.error(f" Error Code: {error_code}")
+ logger.error(f" Error Type: {error_type}")
+ logger.error(f" Full chunk: {chunk}")
+
+ # Check if this is a context overflow error (keyword-based, works for all models)
+ # Don't rely on specific status codes as different providers use different codes
+ error_msg_lower = error_msg.lower()
+ is_overflow = any(keyword in error_msg_lower for keyword in [
+ 'context length exceeded', 'maximum context length', 'prompt is too long',
+ 'context overflow', 'context window', 'too large', 'exceeds model context',
+ 'request_too_large', 'request exceeds the maximum size', 'tokens exceed'
+ ])
+
+ if is_overflow:
+ # Mark as context overflow for special handling
+ raise Exception(f"[CONTEXT_OVERFLOW] {error_msg} (Status: {status_code})")
+ else:
+ # Raise exception with full error message for retry logic
+ raise Exception(f"{error_msg} (Status: {status_code}, Code: {error_code}, Type: {error_type})")
# Parse chunk
if isinstance(chunk, dict) and "choices" in chunk:
choice = chunk["choices"][0]
delta = choice.get("delta", {})
+
+ # Capture finish_reason if present
+ finish_reason = choice.get("finish_reason")
+ if finish_reason:
+ stop_reason = finish_reason
# Handle text content
if "content" in delta and delta["content"]:
@@ -449,9 +513,46 @@ class AgentStreamExecutor:
tool_calls_buffer[index]["arguments"] += func["arguments"]
except Exception as e:
- error_str = str(e).lower()
+ error_str = str(e)
+ error_str_lower = error_str.lower()
+
+ # Check if error is context overflow (non-retryable, needs session reset)
+ # Method 1: Check for special marker (set in stream error handling above)
+ is_context_overflow = '[context_overflow]' in error_str_lower
+
+ # Method 2: Fallback to keyword matching for non-stream errors
+ if not is_context_overflow:
+ is_context_overflow = any(keyword in error_str_lower for keyword in [
+ 'context length exceeded', 'maximum context length', 'prompt is too long',
+ 'context overflow', 'context window', 'too large', 'exceeds model context',
+ 'request_too_large', 'request exceeds the maximum size'
+ ])
+
+ # Check if error is message format error (incomplete tool_use/tool_result pairs)
+ # This happens when previous conversation had tool failures
+ is_message_format_error = any(keyword in error_str_lower for keyword in [
+ 'tool_use', 'tool_result', 'without', 'immediately after',
+ 'corresponding', 'must have', 'each'
+ ]) and 'status: 400' in error_str_lower
+
+ if is_context_overflow or is_message_format_error:
+ error_type = "context overflow" if is_context_overflow else "message format error"
+ logger.error(f"💥 {error_type} detected: {e}")
+ # Clear message history to recover
+ logger.warning("🔄 Clearing conversation history to recover")
+ self.messages.clear()
+ # Raise special exception with user-friendly message
+ if is_context_overflow:
+ raise Exception(
+ "抱歉,对话历史过长导致上下文溢出。我已清空历史记录,请重新描述你的需求。"
+ )
+ else:
+ raise Exception(
+ "抱歉,之前的对话出现了问题。我已清空历史记录,请重新发送你的消息。"
+ )
+
# Check if error is retryable (timeout, connection, rate limit, server busy, etc.)
- is_retryable = any(keyword in error_str for keyword in [
+ is_retryable = any(keyword in error_str_lower for keyword in [
'timeout', 'timed out', 'connection', 'network',
'rate limit', 'overloaded', 'unavailable', 'busy', 'retry',
'429', '500', '502', '503', '504', '512'
@@ -505,11 +606,12 @@ class AgentStreamExecutor:
# Check for empty response and retry once if enabled
if retry_on_empty and not full_content and not tool_calls:
- logger.warning(f"⚠️ LLM returned empty response, retrying once...")
+ logger.warning(f"⚠️ LLM returned empty response (stop_reason: {stop_reason}), retrying once...")
self._emit_event("message_end", {
"content": "",
"tool_calls": [],
- "empty_retry": True
+ "empty_retry": True,
+ "stop_reason": stop_reason
})
# Retry without retry flag to avoid infinite loop
return self._call_llm_stream(
diff --git a/agent/skills/loader.py b/agent/skills/loader.py
index 0bc8f4a..5d67c47 100644
--- a/agent/skills/loader.py
+++ b/agent/skills/loader.py
@@ -137,6 +137,10 @@ class SkillLoader:
name = frontmatter.get('name', parent_dir_name)
description = frontmatter.get('description', '')
+ # Special handling for linkai-agent: dynamically load apps from config.json
+ if name == 'linkai-agent':
+ description = self._load_linkai_agent_description(skill_dir, description)
+
if not description or not description.strip():
diagnostics.append(f"Skill {name} has no description: {file_path}")
return LoadSkillsResult(skills=[], diagnostics=diagnostics)
@@ -161,6 +165,45 @@ class SkillLoader:
return LoadSkillsResult(skills=[skill], diagnostics=diagnostics)
+ def _load_linkai_agent_description(self, skill_dir: str, default_description: str) -> str:
+ """
+ Dynamically load LinkAI agent description from config.json
+
+ :param skill_dir: Skill directory
+ :param default_description: Default description from SKILL.md
+ :return: Dynamic description with app list
+ """
+ import json
+
+ config_path = os.path.join(skill_dir, "config.json")
+ template_path = os.path.join(skill_dir, "config.json.template")
+
+ # Try to load config.json or fallback to template
+ config_file = config_path if os.path.exists(config_path) else template_path
+
+ if not os.path.exists(config_file):
+ return default_description
+
+ try:
+ with open(config_file, 'r', encoding='utf-8') as f:
+ config = json.load(f)
+
+ apps = config.get("apps", [])
+ if not apps:
+ return default_description
+
+ # Build dynamic description with app details
+ app_descriptions = "; ".join([
+ f"{app['app_name']}({app['app_code']}: {app['app_description']})"
+ for app in apps
+ ])
+
+ return f"Call LinkAI apps/workflows. {app_descriptions}"
+
+ except Exception as e:
+ logger.warning(f"[SkillLoader] Failed to load linkai-agent config: {e}")
+ return default_description
+
def load_all_skills(
self,
managed_dir: Optional[str] = None,
diff --git a/agent/tools/__init__.py b/agent/tools/__init__.py
index b449c25..fe37aec 100644
--- a/agent/tools/__init__.py
+++ b/agent/tools/__init__.py
@@ -8,6 +8,7 @@ from agent.tools.write.write import Write
from agent.tools.edit.edit import Edit
from agent.tools.bash.bash import Bash
from agent.tools.ls.ls import Ls
+from agent.tools.send.send import Send
# Import memory tools
from agent.tools.memory.memory_search import MemorySearchTool
@@ -112,6 +113,7 @@ __all__ = [
'Edit',
'Bash',
'Ls',
+ 'Send',
'MemorySearchTool',
'MemoryGetTool',
'EnvConfig',
diff --git a/agent/tools/bash/bash.py b/agent/tools/bash/bash.py
index e9b6ca0..4d7e564 100644
--- a/agent/tools/bash/bash.py
+++ b/agent/tools/bash/bash.py
@@ -3,12 +3,14 @@ Bash tool - Execute bash commands
"""
import os
+import sys
import subprocess
import tempfile
from typing import Dict, Any
from agent.tools.base_tool import BaseTool, ToolResult
from agent.tools.utils.truncate import truncate_tail, format_size, DEFAULT_MAX_LINES, DEFAULT_MAX_BYTES
+from common.log import logger
class Bash(BaseTool):
@@ -60,6 +62,12 @@ IMPORTANT SAFETY GUIDELINES:
if not command:
return ToolResult.fail("Error: command parameter is required")
+ # Security check: Prevent accessing sensitive config files
+ if "~/.cow/.env" in command or "~/.cow" in command:
+ return ToolResult.fail(
+ "Error: Access denied. API keys and credentials must be accessed through the env_config tool only."
+ )
+
# Optional safety check - only warn about extremely dangerous commands
if self.safety_mode:
warning = self._get_safety_warning(command)
@@ -68,7 +76,31 @@ IMPORTANT SAFETY GUIDELINES:
f"Safety Warning: {warning}\n\nIf you believe this command is safe and necessary, please ask the user for confirmation first, explaining what the command does and why it's needed.")
try:
- # Execute command
+ # Prepare environment with .env file variables
+ env = os.environ.copy()
+
+ # Load environment variables from ~/.cow/.env if it exists
+ env_file = os.path.expanduser("~/.cow/.env")
+ if os.path.exists(env_file):
+ try:
+ from dotenv import dotenv_values
+ env_vars = dotenv_values(env_file)
+ env.update(env_vars)
+ logger.debug(f"[Bash] Loaded {len(env_vars)} variables from {env_file}")
+ except ImportError:
+ logger.debug("[Bash] python-dotenv not installed, skipping .env loading")
+ except Exception as e:
+ logger.debug(f"[Bash] Failed to load .env: {e}")
+
+ # Debug logging
+ logger.debug(f"[Bash] CWD: {self.cwd}")
+ logger.debug(f"[Bash] Command: {command[:500]}")
+ logger.debug(f"[Bash] OPENAI_API_KEY in env: {'OPENAI_API_KEY' in env}")
+ logger.debug(f"[Bash] SHELL: {env.get('SHELL', 'not set')}")
+ logger.debug(f"[Bash] Python executable: {sys.executable}")
+ logger.debug(f"[Bash] Process UID: {os.getuid()}")
+
+ # Execute command with inherited environment variables
result = subprocess.run(
command,
shell=True,
@@ -76,8 +108,50 @@ IMPORTANT SAFETY GUIDELINES:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
- timeout=timeout
+ timeout=timeout,
+ env=env
)
+
+ logger.debug(f"[Bash] Exit code: {result.returncode}")
+ logger.debug(f"[Bash] Stdout length: {len(result.stdout)}")
+ logger.debug(f"[Bash] Stderr length: {len(result.stderr)}")
+
+ # Workaround for exit code 126 with no output
+ if result.returncode == 126 and not result.stdout and not result.stderr:
+ logger.warning(f"[Bash] Exit 126 with no output - trying alternative execution method")
+ # Try using argument list instead of shell=True
+ import shlex
+ try:
+ parts = shlex.split(command)
+ if len(parts) > 0:
+ logger.info(f"[Bash] Retrying with argument list: {parts[:3]}...")
+ retry_result = subprocess.run(
+ parts,
+ cwd=self.cwd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ timeout=timeout,
+ env=env
+ )
+ logger.debug(f"[Bash] Retry exit code: {retry_result.returncode}, stdout: {len(retry_result.stdout)}, stderr: {len(retry_result.stderr)}")
+
+ # If retry succeeded, use retry result
+ if retry_result.returncode == 0 or retry_result.stdout or retry_result.stderr:
+ result = retry_result
+ else:
+ # Both attempts failed - check if this is openai-image-vision skill
+ if 'openai-image-vision' in command or 'vision.sh' in command:
+ # Create a mock result with helpful error message
+ from types import SimpleNamespace
+ result = SimpleNamespace(
+ returncode=1,
+ stdout='{"error": "图片无法解析", "reason": "该图片格式可能不受支持,或图片文件存在问题", "suggestion": "请尝试其他图片"}',
+ stderr=''
+ )
+ logger.info(f"[Bash] Converted exit 126 to user-friendly image error message for vision skill")
+ except Exception as retry_err:
+ logger.warning(f"[Bash] Retry failed: {retry_err}")
# Combine stdout and stderr
output = result.stdout
diff --git a/agent/tools/env_config/env_config.py b/agent/tools/env_config/env_config.py
index a988f42..f0a10fe 100644
--- a/agent/tools/env_config/env_config.py
+++ b/agent/tools/env_config/env_config.py
@@ -27,7 +27,7 @@ class EnvConfig(BaseTool):
name: str = "env_config"
description: str = (
- "Manage API keys and skill configurations stored in the workspace .env file. "
+ "Manage API keys and skill configurations securely. "
"Use this tool when user wants to configure API keys (like BOCHA_API_KEY, OPENAI_API_KEY), "
"view configured keys, or manage skill settings. "
"Actions: 'set' (add/update key), 'get' (view specific key), 'list' (show all configured keys), 'delete' (remove key). "
@@ -65,16 +65,17 @@ class EnvConfig(BaseTool):
def __init__(self, config: dict = None):
self.config = config or {}
- self.workspace_dir = self.config.get("workspace_dir", os.path.expanduser("~/cow"))
- self.env_path = os.path.join(self.workspace_dir, '.env')
+ # Store env config in ~/.cow directory (outside workspace for security)
+ self.env_dir = os.path.expanduser("~/.cow")
+ self.env_path = os.path.join(self.env_dir, '.env')
self.agent_bridge = self.config.get("agent_bridge") # Reference to AgentBridge for hot reload
# Don't create .env file in __init__ to avoid issues during tool discovery
# It will be created on first use in execute()
def _ensure_env_file(self):
"""Ensure the .env file exists"""
- # Create workspace directory if it doesn't exist
- os.makedirs(self.workspace_dir, exist_ok=True)
+ # Create ~/.cow directory if it doesn't exist
+ os.makedirs(self.env_dir, exist_ok=True)
if not os.path.exists(self.env_path):
Path(self.env_path).touch()
diff --git a/agent/tools/ls/ls.py b/agent/tools/ls/ls.py
index d3e5330..d6517b3 100644
--- a/agent/tools/ls/ls.py
+++ b/agent/tools/ls/ls.py
@@ -50,6 +50,13 @@ class Ls(BaseTool):
# Resolve path
absolute_path = self._resolve_path(path)
+ # Security check: Prevent accessing sensitive config directory
+ env_config_dir = os.path.expanduser("~/.cow")
+ if os.path.abspath(absolute_path) == os.path.abspath(env_config_dir):
+ return ToolResult.fail(
+ "Error: Access denied. API keys and credentials must be accessed through the env_config tool only."
+ )
+
if not os.path.exists(absolute_path):
# Provide helpful hint if using relative path
if not os.path.isabs(path) and not path.startswith('~'):
diff --git a/agent/tools/read/read.py b/agent/tools/read/read.py
index 6ecae07..f88bc50 100644
--- a/agent/tools/read/read.py
+++ b/agent/tools/read/read.py
@@ -15,7 +15,7 @@ class Read(BaseTool):
"""Tool for reading file contents"""
name: str = "read"
- description: str = f"Read the contents of a file. Supports text files, PDF files, and images (jpg, png, gif, webp). For text files, output is truncated to {DEFAULT_MAX_LINES} lines or {DEFAULT_MAX_BYTES // 1024}KB (whichever is hit first). Use offset/limit for large files."
+ description: str = f"Read or inspect file contents. For text/PDF files, returns content (truncated to {DEFAULT_MAX_LINES} lines or {DEFAULT_MAX_BYTES // 1024}KB). For images/videos/audio, returns metadata only (file info, size, type). Use offset/limit for large text files."
params: dict = {
"type": "object",
@@ -39,10 +39,25 @@ class Read(BaseTool):
def __init__(self, config: dict = None):
self.config = config or {}
self.cwd = self.config.get("cwd", os.getcwd())
- # Supported image formats
- self.image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
- # Supported PDF format
+
+ # File type categories
+ self.image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.ico'}
+ self.video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm', '.m4v'}
+ self.audio_extensions = {'.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac', '.wma'}
+ self.binary_extensions = {'.exe', '.dll', '.so', '.dylib', '.bin', '.dat', '.db', '.sqlite'}
+ self.archive_extensions = {'.zip', '.tar', '.gz', '.rar', '.7z', '.bz2', '.xz'}
self.pdf_extensions = {'.pdf'}
+
+ # Readable text formats (will be read with truncation)
+ self.text_extensions = {
+ '.txt', '.md', '.markdown', '.rst', '.log', '.csv', '.tsv', '.json', '.xml', '.yaml', '.yml',
+ '.py', '.js', '.ts', '.java', '.c', '.cpp', '.h', '.hpp', '.go', '.rs', '.rb', '.php',
+ '.html', '.css', '.scss', '.sass', '.less', '.vue', '.jsx', '.tsx',
+ '.sh', '.bash', '.zsh', '.fish', '.ps1', '.bat', '.cmd',
+ '.sql', '.r', '.m', '.swift', '.kt', '.scala', '.clj', '.erl', '.ex',
+ '.dockerfile', '.makefile', '.cmake', '.gradle', '.properties', '.ini', '.conf', '.cfg',
+ '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx' # Office documents
+ }
def execute(self, args: Dict[str, Any]) -> ToolResult:
"""
@@ -61,6 +76,13 @@ class Read(BaseTool):
# Resolve path
absolute_path = self._resolve_path(path)
+ # Security check: Prevent reading sensitive config files
+ env_config_path = os.path.expanduser("~/.cow/.env")
+ if os.path.abspath(absolute_path) == os.path.abspath(env_config_path):
+ return ToolResult.fail(
+ "Error: Access denied. API keys and credentials must be accessed through the env_config tool only."
+ )
+
# Check if file exists
if not os.path.exists(absolute_path):
# Provide helpful hint if using relative path
@@ -78,16 +100,25 @@ class Read(BaseTool):
# Check file type
file_ext = Path(absolute_path).suffix.lower()
+ file_size = os.path.getsize(absolute_path)
- # Check if image
+ # Check if image - return metadata for sending
if file_ext in self.image_extensions:
return self._read_image(absolute_path, file_ext)
+ # Check if video/audio/binary/archive - return metadata only
+ if file_ext in self.video_extensions:
+ return self._return_file_metadata(absolute_path, "video", file_size)
+ if file_ext in self.audio_extensions:
+ return self._return_file_metadata(absolute_path, "audio", file_size)
+ if file_ext in self.binary_extensions or file_ext in self.archive_extensions:
+ return self._return_file_metadata(absolute_path, "binary", file_size)
+
# Check if PDF
if file_ext in self.pdf_extensions:
return self._read_pdf(absolute_path, path, offset, limit)
- # Read text file
+ # Read text file (with truncation for large files)
return self._read_text(absolute_path, path, offset, limit)
def _resolve_path(self, path: str) -> str:
@@ -103,25 +134,56 @@ class Read(BaseTool):
return path
return os.path.abspath(os.path.join(self.cwd, path))
+ def _return_file_metadata(self, absolute_path: str, file_type: str, file_size: int) -> ToolResult:
+ """
+ Return file metadata for non-readable files (video, audio, binary, etc.)
+
+ :param absolute_path: Absolute path to the file
+ :param file_type: Type of file (video, audio, binary, etc.)
+ :param file_size: File size in bytes
+ :return: File metadata
+ """
+ file_name = Path(absolute_path).name
+ file_ext = Path(absolute_path).suffix.lower()
+
+ # Determine MIME type
+ mime_types = {
+ # Video
+ '.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mov': 'video/quicktime',
+ '.mkv': 'video/x-matroska', '.webm': 'video/webm',
+ # Audio
+ '.mp3': 'audio/mpeg', '.wav': 'audio/wav', '.ogg': 'audio/ogg',
+ '.m4a': 'audio/mp4', '.flac': 'audio/flac',
+ # Binary
+ '.zip': 'application/zip', '.tar': 'application/x-tar',
+ '.gz': 'application/gzip', '.rar': 'application/x-rar-compressed',
+ }
+ mime_type = mime_types.get(file_ext, 'application/octet-stream')
+
+ result = {
+ "type": f"{file_type}_metadata",
+ "file_type": file_type,
+ "path": absolute_path,
+ "file_name": file_name,
+ "mime_type": mime_type,
+ "size": file_size,
+ "size_formatted": format_size(file_size),
+ "message": f"{file_type.capitalize()} 文件: {file_name} ({format_size(file_size)})\n提示: 如果需要发送此文件,请使用 send 工具。"
+ }
+
+ return ToolResult.success(result)
+
def _read_image(self, absolute_path: str, file_ext: str) -> ToolResult:
"""
- Read image file
+ Read image file - always return metadata only (images should be sent, not read into context)
:param absolute_path: Absolute path to the image file
:param file_ext: File extension
- :return: Result containing image information
+ :return: Result containing image metadata for sending
"""
try:
- # Read image file
- with open(absolute_path, 'rb') as f:
- image_data = f.read()
-
# Get file size
- file_size = len(image_data)
-
- # Return image information (actual image data can be base64 encoded when needed)
- import base64
- base64_data = base64.b64encode(image_data).decode('utf-8')
+ file_size = os.path.getsize(absolute_path)
# Determine MIME type
mime_type_map = {
@@ -133,12 +195,15 @@ class Read(BaseTool):
}
mime_type = mime_type_map.get(file_ext, 'image/jpeg')
+ # Return metadata for images (NOT file_to_send - use send tool to actually send)
result = {
- "type": "image",
+ "type": "image_metadata",
+ "file_type": "image",
+ "path": absolute_path,
"mime_type": mime_type,
"size": file_size,
"size_formatted": format_size(file_size),
- "data": base64_data # Base64 encoded image data
+ "message": f"图片文件: {Path(absolute_path).name} ({format_size(file_size)})\n提示: 如果需要发送此图片,请使用 send 工具。"
}
return ToolResult.success(result)
@@ -157,10 +222,32 @@ class Read(BaseTool):
:return: File content or error message
"""
try:
+ # Check file size first
+ file_size = os.path.getsize(absolute_path)
+ MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
+
+ if file_size > MAX_FILE_SIZE:
+ # File too large, return metadata only
+ return ToolResult.success({
+ "type": "file_to_send",
+ "file_type": "document",
+ "path": absolute_path,
+ "size": file_size,
+ "size_formatted": format_size(file_size),
+ "message": f"文件过大 ({format_size(file_size)} > 50MB),无法读取内容。文件路径: {absolute_path}"
+ })
+
# Read file
with open(absolute_path, 'r', encoding='utf-8') as f:
content = f.read()
+ # Truncate content if too long (20K characters max for model context)
+ MAX_CONTENT_CHARS = 20 * 1024 # 20K characters
+ content_truncated = False
+ if len(content) > MAX_CONTENT_CHARS:
+ content = content[:MAX_CONTENT_CHARS]
+ content_truncated = True
+
all_lines = content.split('\n')
total_file_lines = len(all_lines)
@@ -197,6 +284,10 @@ class Read(BaseTool):
output_text = ""
details = {}
+ # Add truncation warning if content was truncated
+ if content_truncated:
+ output_text = f"[文件内容已截断到前 {format_size(MAX_CONTENT_CHARS)},完整文件大小: {format_size(file_size)}]\n\n"
+
if truncation.first_line_exceeds_limit:
# First line exceeds 30KB limit
first_line_size = format_size(len(all_lines[start_line].encode('utf-8')))
diff --git a/agent/tools/scheduler/README.md b/agent/tools/scheduler/README.md
index ea3432c..55be2f9 100644
--- a/agent/tools/scheduler/README.md
+++ b/agent/tools/scheduler/README.md
@@ -42,24 +42,26 @@ Agent: [调用 scheduler 工具]
**示例对话:**
```
-用户: 每天早上8点帮我搜索一下当前新闻
+用户: 每天早上8点帮我读取一下今日日程
Agent: [调用 scheduler 工具]
action: create
- name: 每日新闻
+ name: 每日日程
tool_call:
- tool_name: bocha_search
+ tool_name: read
tool_params:
- query: 今日新闻
- result_prefix: 📰 今日新闻播报
+ file_path: ~/cow/schedule.txt
+ result_prefix: 📅 今日日程
schedule_type: cron
schedule_value: 0 8 * * *
```
**工具调用参数说明:**
-- `tool_name`: 要调用的工具名称(如 `bocha_search`、`web_fetch` 等)
+- `tool_name`: 要调用的工具名称(如 `bash`、`read`、`write` 等内置工具)
- `tool_params`: 工具的参数(字典格式)
- `result_prefix`: 可选,在结果前添加的前缀文本
+**注意:** 如果要使用 skills(如 bocha-search),需要通过 `bash` 工具调用 skill 脚本
+
### 2. 支持的调度类型
#### Cron 表达式 (`cron`)
@@ -167,7 +169,7 @@ Agent: [调用 scheduler 工具]
```json
{
"id": "def456",
- "name": "每日新闻",
+ "name": "每日日程",
"enabled": true,
"created_at": "2024-01-01T10:00:00",
"updated_at": "2024-01-01T10:00:00",
@@ -177,11 +179,11 @@ Agent: [调用 scheduler 工具]
},
"action": {
"type": "tool_call",
- "tool_name": "bocha_search",
+ "tool_name": "read",
"tool_params": {
- "query": "今日新闻"
+ "file_path": "~/cow/schedule.txt"
},
- "result_prefix": "📰 今日新闻播报",
+ "result_prefix": "📅 今日日程",
"receiver": "wxid_xxx",
"receiver_name": "张三",
"is_group": false,
@@ -234,30 +236,29 @@ Agent: [创建 cron: 0 18 * * 1-5]
Agent: [创建 interval: 3600]
```
-### 4. 每日新闻推送(动态工具调用)
+### 4. 每日日程推送(动态工具调用)
```
-用户: 每天早上8点帮我搜索一下当前新闻
+用户: 每天早上8点帮我读取今日日程
Agent: ✅ 定时任务创建成功
- 任务ID: news001
+ 任务ID: schedule001
调度: 每天 8:00
- 工具: bocha_search(query='今日新闻')
- 前缀: 📰 今日新闻播报
+ 工具: read(file_path='~/cow/schedule.txt')
+ 前缀: 📅 今日日程
```
-### 5. 定时天气查询(动态工具调用)
+### 5. 定时文件备份(动态工具调用)
```
-用户: 每天早上7点查询今天的天气
-Agent: [创建 cron: 0 7 * * *]
- 工具: bocha_search(query='今日天气')
- 前缀: 🌤️ 今日天气预报
+用户: 每天晚上11点备份工作文件
+Agent: [创建 cron: 0 23 * * *]
+ 工具: bash(command='cp ~/cow/work.txt ~/cow/backup/work_$(date +%Y%m%d).txt')
+ 前缀: ✅ 文件已备份
```
-### 6. 周报提醒(动态工具调用)
+### 6. 周报提醒(静态消息)
```
-用户: 每周五下午5点搜索本周热点
+用户: 每周五下午5点提醒我写周报
Agent: [创建 cron: 0 17 * * 5]
- 工具: bocha_search(query='本周热点新闻')
- 前缀: 📊 本周热点回顾
+ 消息: 📊 该写周报了!
```
### 4. 特定日期提醒
diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py
index 1f345bf..8b54ccd 100644
--- a/agent/tools/scheduler/integration.py
+++ b/agent/tools/scheduler/integration.py
@@ -45,10 +45,17 @@ def init_scheduler(agent_bridge) -> bool:
action = task.get("action", {})
action_type = action.get("type")
- if action_type == "send_message":
+ if action_type == "agent_task":
+ _execute_agent_task(task, agent_bridge)
+ elif action_type == "send_message":
+ # Legacy support for old tasks
_execute_send_message(task, agent_bridge)
elif action_type == "tool_call":
+ # Legacy support for old tasks
_execute_tool_call(task, agent_bridge)
+ elif action_type == "skill_call":
+ # Legacy support for old tasks
+ _execute_skill_call(task, agent_bridge)
else:
logger.warning(f"[Scheduler] Unknown action type: {action_type}")
except Exception as e:
@@ -76,6 +83,100 @@ def get_scheduler_service():
return _scheduler_service
+def _execute_agent_task(task: dict, agent_bridge):
+ """
+ Execute an agent_task action - let Agent handle the task
+
+ Args:
+ task: Task dictionary
+ agent_bridge: AgentBridge instance
+ """
+ try:
+ action = task.get("action", {})
+ task_description = action.get("task_description")
+ receiver = action.get("receiver")
+ is_group = action.get("is_group", False)
+ channel_type = action.get("channel_type", "unknown")
+
+ if not task_description:
+ logger.error(f"[Scheduler] Task {task['id']}: No task_description specified")
+ return
+
+ if not receiver:
+ logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
+ return
+
+ # Check for unsupported channels
+ if channel_type == "dingtalk":
+ logger.warning(f"[Scheduler] Task {task['id']}: DingTalk channel does not support scheduled messages (Stream mode limitation). Task will execute but message cannot be sent.")
+
+ logger.info(f"[Scheduler] Task {task['id']}: Executing agent task '{task_description}'")
+
+ # Create context for Agent
+ context = Context(ContextType.TEXT, task_description)
+ context["receiver"] = receiver
+ context["isgroup"] = is_group
+ context["session_id"] = receiver
+
+ # Channel-specific setup
+ if channel_type == "web":
+ import uuid
+ request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
+ context["request_id"] = request_id
+ elif channel_type == "feishu":
+ context["receive_id_type"] = "chat_id" if is_group else "open_id"
+ context["msg"] = None
+ elif channel_type == "dingtalk":
+ # DingTalk requires msg object, set to None for scheduled tasks
+ context["msg"] = None
+ # 如果是单聊,需要传递 sender_staff_id
+ if not is_group:
+ sender_staff_id = action.get("dingtalk_sender_staff_id")
+ if sender_staff_id:
+ context["dingtalk_sender_staff_id"] = sender_staff_id
+
+ # Use Agent to execute the task
+ # Mark this as a scheduled task execution to prevent recursive task creation
+ context["is_scheduled_task"] = True
+
+ try:
+ reply = agent_bridge.agent_reply(task_description, context=context, on_event=None, clear_history=True)
+
+ if reply and reply.content:
+ # Send the reply via channel
+ from channel.channel_factory import create_channel
+
+ try:
+ channel = create_channel(channel_type)
+ if channel:
+ # For web channel, register request_id
+ if channel_type == "web" and hasattr(channel, 'request_to_session'):
+ request_id = context.get("request_id")
+ if request_id:
+ channel.request_to_session[request_id] = receiver
+ logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
+
+ # Send the reply
+ channel.send(reply, context)
+ logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}")
+ else:
+ logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
+ except Exception as e:
+ logger.error(f"[Scheduler] Failed to send result: {e}")
+ else:
+ logger.error(f"[Scheduler] Task {task['id']}: No result from agent execution")
+
+ except Exception as e:
+ logger.error(f"[Scheduler] Failed to execute task via Agent: {e}")
+ import traceback
+ logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
+
+ except Exception as e:
+ logger.error(f"[Scheduler] Error in _execute_agent_task: {e}")
+ import traceback
+ logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
+
+
def _execute_send_message(task: dict, agent_bridge):
"""
Execute a send_message action
@@ -116,6 +217,17 @@ def _execute_send_message(task: dict, agent_bridge):
# Feishu channel will detect this and send as new message instead of reply
context["msg"] = None
logger.debug(f"[Scheduler] Feishu: receive_id_type={context['receive_id_type']}, is_group={is_group}, receiver={receiver}")
+ elif channel_type == "dingtalk":
+ # DingTalk channel setup
+ context["msg"] = None
+ # 如果是单聊,需要传递 sender_staff_id
+ if not is_group:
+ sender_staff_id = action.get("dingtalk_sender_staff_id")
+ if sender_staff_id:
+ context["dingtalk_sender_staff_id"] = sender_staff_id
+ logger.debug(f"[Scheduler] DingTalk single chat: sender_staff_id={sender_staff_id}")
+ else:
+ logger.warning(f"[Scheduler] Task {task['id']}: DingTalk single chat message missing sender_staff_id")
# Create reply
reply = Reply(ReplyType.TEXT, content)
@@ -156,8 +268,9 @@ def _execute_tool_call(task: dict, agent_bridge):
"""
try:
action = task.get("action", {})
- tool_name = action.get("tool_name")
- tool_params = action.get("tool_params", {})
+ # Support both old and new field names
+ tool_name = action.get("call_name") or action.get("tool_name")
+ tool_params = action.get("call_params") or action.get("tool_params", {})
result_prefix = action.get("result_prefix", "")
receiver = action.get("receiver")
is_group = action.get("is_group", False)
@@ -237,6 +350,82 @@ def _execute_tool_call(task: dict, agent_bridge):
logger.error(f"[Scheduler] Error in _execute_tool_call: {e}")
+def _execute_skill_call(task: dict, agent_bridge):
+ """
+ Execute a skill_call action by asking Agent to run the skill
+
+ Args:
+ task: Task dictionary
+ agent_bridge: AgentBridge instance
+ """
+ try:
+ action = task.get("action", {})
+ # Support both old and new field names
+ skill_name = action.get("call_name") or action.get("skill_name")
+ skill_params = action.get("call_params") or action.get("skill_params", {})
+ result_prefix = action.get("result_prefix", "")
+ receiver = action.get("receiver")
+ is_group = action.get("isgroup", False)
+ channel_type = action.get("channel_type", "unknown")
+
+ if not skill_name:
+ logger.error(f"[Scheduler] Task {task['id']}: No skill_name specified")
+ return
+
+ if not receiver:
+ logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
+ return
+
+ logger.info(f"[Scheduler] Task {task['id']}: Executing skill '{skill_name}' with params {skill_params}")
+
+ # Build a natural language query for the Agent to execute the skill
+ # Format: "Use skill-name to do something with params"
+ param_str = ", ".join([f"{k}={v}" for k, v in skill_params.items()])
+ query = f"Use {skill_name} skill"
+ if param_str:
+ query += f" with {param_str}"
+
+ # Create context for Agent
+ context = Context(ContextType.TEXT, query)
+ context["receiver"] = receiver
+ context["isgroup"] = is_group
+ context["session_id"] = receiver
+
+ # Channel-specific setup
+ if channel_type == "web":
+ import uuid
+ request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
+ context["request_id"] = request_id
+ elif channel_type == "feishu":
+ context["receive_id_type"] = "chat_id" if is_group else "open_id"
+ context["msg"] = None
+
+ # Use Agent to execute the skill
+ try:
+ reply = agent_bridge.agent_reply(query, context=context, on_event=None, clear_history=True)
+
+ if reply and reply.content:
+ content = reply.content
+
+ # Add prefix if specified
+ if result_prefix:
+ content = f"{result_prefix}\n\n{content}"
+
+ logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}")
+ else:
+ logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution")
+
+ except Exception as e:
+ logger.error(f"[Scheduler] Failed to execute skill via Agent: {e}")
+ import traceback
+ logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
+
+ except Exception as e:
+ logger.error(f"[Scheduler] Error in _execute_skill_call: {e}")
+ import traceback
+ logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
+
+
def attach_scheduler_to_tool(tool, context: Context = None):
"""
Attach scheduler components to a SchedulerTool instance
diff --git a/agent/tools/scheduler/scheduler_service.py b/agent/tools/scheduler/scheduler_service.py
index bc338c2..248c776 100644
--- a/agent/tools/scheduler/scheduler_service.py
+++ b/agent/tools/scheduler/scheduler_service.py
@@ -118,6 +118,34 @@ class SchedulerService:
try:
next_run = datetime.fromisoformat(next_run_str)
+
+ # Check if task is overdue (e.g., service restart)
+ if next_run < now:
+ time_diff = (now - next_run).total_seconds()
+
+ # If overdue by more than 5 minutes, skip this run and schedule next
+ if time_diff > 300: # 5 minutes
+ logger.warning(f"[Scheduler] Task {task['id']} is overdue by {int(time_diff)}s, skipping and scheduling next run")
+
+ # For one-time tasks, disable them
+ schedule = task.get("schedule", {})
+ if schedule.get("type") == "once":
+ self.task_store.update_task(task['id'], {
+ "enabled": False,
+ "last_run_at": now.isoformat()
+ })
+ logger.info(f"[Scheduler] One-time task {task['id']} expired, disabled")
+ return False
+
+ # For recurring tasks, calculate next run from now
+ next_next_run = self._calculate_next_run(task, now)
+ if next_next_run:
+ self.task_store.update_task(task['id'], {
+ "next_run_at": next_next_run.isoformat()
+ })
+ logger.info(f"[Scheduler] Rescheduled task {task['id']} to {next_next_run}")
+ return False
+
return now >= next_run
except:
return False
diff --git a/agent/tools/scheduler/scheduler_tool.py b/agent/tools/scheduler/scheduler_tool.py
index 764711b..9d961c3 100644
--- a/agent/tools/scheduler/scheduler_tool.py
+++ b/agent/tools/scheduler/scheduler_tool.py
@@ -20,23 +20,16 @@ class SchedulerTool(BaseTool):
name: str = "scheduler"
description: str = (
- "创建、查询和管理定时任务。支持两种任务类型:\n"
- "1. 静态消息任务:定时发送预定义的消息\n"
- "2. 动态工具任务:定时执行工具调用并发送结果(如搜索新闻、查询天气等)\n\n"
+ "创建、查询和管理定时任务。支持固定消息和AI任务两种类型。\n\n"
"使用方法:\n"
- "- 创建静态消息任务:action='create', name='任务名', message='消息内容', schedule_type='interval'/'cron'/'once', schedule_value='间隔秒数/cron表达式/时间'\n"
- "- 创建动态工具任务:action='create', name='任务名', tool_call={'tool_name': '工具名', 'tool_params': {...}, 'result_prefix': '前缀'}, schedule_type='interval'/'cron'/'once', schedule_value='值'\n"
- "- 查询列表:action='list'\n"
- "- 查看详情:action='get', task_id='任务ID'\n"
- "- 删除任务:action='delete', task_id='任务ID'\n"
- "- 启用任务:action='enable', task_id='任务ID'\n"
- "- 禁用任务:action='disable', task_id='任务ID'\n\n"
- "调度类型说明:\n"
- "- interval: 固定间隔秒数(如3600表示每小时)\n"
- "- cron: cron表达式(如'0 9 * * *'表示每天9点,'*/10 * * * *'表示每10分钟)\n"
- "- once: 一次性任务,ISO时间格式(如'2024-12-25T09:00:00')\n\n"
- "示例:每天早上8点搜索新闻\n"
- "action='create', name='每日新闻', tool_call={'tool_name': 'bocha_search', 'tool_params': {'query': '今日新闻'}, 'result_prefix': '📰 今日新闻播报'}, schedule_type='cron', schedule_value='0 8 * * *'"
+ "- 创建:action='create', name='任务名', message/ai_task='内容', schedule_type='once/interval/cron', schedule_value='...'\n"
+ "- 查询:action='list' / action='get', task_id='任务ID'\n"
+ "- 管理:action='delete/enable/disable', task_id='任务ID'\n\n"
+ "调度类型:\n"
+ "- once: 一次性任务,支持相对时间(+5s,+10m,+1h,+1d)或ISO时间\n"
+ "- interval: 固定间隔(秒),如3600表示每小时\n"
+ "- cron: cron表达式,如'0 8 * * *'表示每天8点\n\n"
+ "注意:'X秒后'用once+相对时间,'每X秒'用interval"
)
params: dict = {
"type": "object",
@@ -56,26 +49,11 @@ class SchedulerTool(BaseTool):
},
"message": {
"type": "string",
- "description": "要发送的静态消息内容 (用于 create 操作,与tool_call二选一)"
+ "description": "固定消息内容 (与ai_task二选一)"
},
- "tool_call": {
- "type": "object",
- "description": "要执行的工具调用 (用于 create 操作,与message二选一)",
- "properties": {
- "tool_name": {
- "type": "string",
- "description": "工具名称,如 'bocha_search'"
- },
- "tool_params": {
- "type": "object",
- "description": "工具参数"
- },
- "result_prefix": {
- "type": "string",
- "description": "结果前缀,如 '今日新闻:'"
- }
- },
- "required": ["tool_name"]
+ "ai_task": {
+ "type": "string",
+ "description": "AI任务描述 (与message二选一),如'搜索今日新闻'、'查询天气'"
},
"schedule_type": {
"type": "string",
@@ -84,12 +62,7 @@ class SchedulerTool(BaseTool):
},
"schedule_value": {
"type": "string",
- "description": (
- "调度值 (用于 create 操作):\n"
- "- cron类型: cron表达式,如 '0 9 * * *' (每天9点),'*/10 * * * *' (每10分钟)\n"
- "- interval类型: 间隔秒数,如 '3600' (每小时),'10' (每10秒)\n"
- "- once类型: ISO时间,如 '2024-12-25T09:00:00'"
- )
+ "description": "调度值: cron表达式/间隔秒数/时间(+5s,+10m,+1h或ISO格式)"
}
},
"required": ["action"]
@@ -151,17 +124,20 @@ class SchedulerTool(BaseTool):
"""Create a new scheduled task"""
name = kwargs.get("name")
message = kwargs.get("message")
- tool_call = kwargs.get("tool_call")
+ ai_task = kwargs.get("ai_task")
schedule_type = kwargs.get("schedule_type")
schedule_value = kwargs.get("schedule_value")
# Validate required fields
if not name:
return "错误: 缺少任务名称 (name)"
- if not message and not tool_call:
- return "错误: 必须提供 message 或 tool_call 之一"
- if message and tool_call:
- return "错误: message 和 tool_call 不能同时提供,请选择其一"
+
+ # Check that exactly one of message/ai_task is provided
+ if not message and not ai_task:
+ return "错误: 必须提供 message(固定消息)或 ai_task(AI任务)之一"
+ if message and ai_task:
+ return "错误: message 和 ai_task 只能提供其中一个"
+
if not schedule_type:
return "错误: 缺少调度类型 (schedule_type)"
if not schedule_value:
@@ -181,7 +157,7 @@ class SchedulerTool(BaseTool):
# Create task
task_id = str(uuid.uuid4())[:8]
- # Build action based on message or tool_call
+ # Build action based on message or ai_task
if message:
action = {
"type": "send_message",
@@ -191,19 +167,22 @@ class SchedulerTool(BaseTool):
"is_group": context.get("isgroup", False),
"channel_type": self.config.get("channel_type", "unknown")
}
- else: # tool_call
+ else: # ai_task
action = {
- "type": "tool_call",
- "tool_name": tool_call.get("tool_name"),
- "tool_params": tool_call.get("tool_params", {}),
- "result_prefix": tool_call.get("result_prefix", ""),
+ "type": "agent_task",
+ "task_description": ai_task,
"receiver": context.get("receiver"),
"receiver_name": self._get_receiver_name(context),
"is_group": context.get("isgroup", False),
"channel_type": self.config.get("channel_type", "unknown")
}
- task = {
+ # 针对钉钉单聊,额外存储 sender_staff_id
+ msg = context.kwargs.get("msg")
+ if msg and hasattr(msg, 'sender_staff_id') and not context.get("isgroup", False):
+ action["dingtalk_sender_staff_id"] = msg.sender_staff_id
+
+ task_data = {
"id": task_id,
"name": name,
"enabled": True,
@@ -214,26 +193,21 @@ class SchedulerTool(BaseTool):
}
# Calculate initial next_run_at
- next_run = self._calculate_next_run(task)
+ next_run = self._calculate_next_run(task_data)
if next_run:
- task["next_run_at"] = next_run.isoformat()
+ task_data["next_run_at"] = next_run.isoformat()
# Save task
- self.task_store.add_task(task)
+ self.task_store.add_task(task_data)
# Format response
schedule_desc = self._format_schedule_description(schedule)
- receiver_desc = task["action"]["receiver_name"] or task["action"]["receiver"]
+ receiver_desc = task_data["action"]["receiver_name"] or task_data["action"]["receiver"]
if message:
- content_desc = f"💬 消息: {message}"
+ content_desc = f"💬 固定消息: {message}"
else:
- tool_name = tool_call.get("tool_name")
- tool_params_str = str(tool_call.get("tool_params", {}))
- prefix = tool_call.get("result_prefix", "")
- content_desc = f"🔧 工具调用: {tool_name}({tool_params_str})"
- if prefix:
- content_desc += f"\n📝 结果前缀: {prefix}"
+ content_desc = f"🤖 AI任务: {ai_task}"
return (
f"✅ 定时任务创建成功\n\n"
@@ -353,9 +327,38 @@ class SchedulerTool(BaseTool):
return {"type": "interval", "seconds": seconds}
elif schedule_type == "once":
- # Parse datetime
- datetime.fromisoformat(schedule_value)
- return {"type": "once", "run_at": schedule_value}
+ # Parse datetime - support both relative and absolute time
+
+ # Check if it's relative time (e.g., "+5s", "+10m", "+1h", "+1d")
+ if schedule_value.startswith("+"):
+ import re
+ match = re.match(r'\+(\d+)([smhd])', schedule_value)
+ if match:
+ amount = int(match.group(1))
+ unit = match.group(2)
+
+ from datetime import timedelta
+ now = datetime.now()
+
+ if unit == 's': # seconds
+ target_time = now + timedelta(seconds=amount)
+ elif unit == 'm': # minutes
+ target_time = now + timedelta(minutes=amount)
+ elif unit == 'h': # hours
+ target_time = now + timedelta(hours=amount)
+ elif unit == 'd': # days
+ target_time = now + timedelta(days=amount)
+ else:
+ return None
+
+ return {"type": "once", "run_at": target_time.isoformat()}
+ else:
+ logger.error(f"[SchedulerTool] Invalid relative time format: {schedule_value}")
+ return None
+ else:
+ # Absolute time in ISO format
+ datetime.fromisoformat(schedule_value)
+ return {"type": "once", "run_at": schedule_value}
except Exception as e:
logger.error(f"[SchedulerTool] Invalid schedule: {e}")
diff --git a/agent/tools/send/__init__.py b/agent/tools/send/__init__.py
new file mode 100644
index 0000000..b76702a
--- /dev/null
+++ b/agent/tools/send/__init__.py
@@ -0,0 +1,3 @@
+from .send import Send
+
+__all__ = ['Send']
diff --git a/agent/tools/send/send.py b/agent/tools/send/send.py
new file mode 100644
index 0000000..a778b74
--- /dev/null
+++ b/agent/tools/send/send.py
@@ -0,0 +1,159 @@
+"""
+Send tool - Send files to the user
+"""
+
+import os
+from typing import Dict, Any
+from pathlib import Path
+
+from agent.tools.base_tool import BaseTool, ToolResult
+
+
+class Send(BaseTool):
+ """Tool for sending files to the user"""
+
+ name: str = "send"
+ description: str = "Send a file (image, video, audio, document) to the user. Use this when the user explicitly asks to send/share a file."
+
+ params: dict = {
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Path to the file to send. Can be absolute path or relative to workspace."
+ },
+ "message": {
+ "type": "string",
+ "description": "Optional message to accompany the file"
+ }
+ },
+ "required": ["path"]
+ }
+
+ def __init__(self, config: dict = None):
+ self.config = config or {}
+ self.cwd = self.config.get("cwd", os.getcwd())
+
+ # Supported file types
+ self.image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.ico'}
+ self.video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm', '.m4v'}
+ self.audio_extensions = {'.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac', '.wma'}
+ self.document_extensions = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.md'}
+
+ def execute(self, args: Dict[str, Any]) -> ToolResult:
+ """
+ Execute file send operation
+
+ :param args: Contains file path and optional message
+ :return: File metadata for channel to send
+ """
+ path = args.get("path", "").strip()
+ message = args.get("message", "")
+
+ if not path:
+ return ToolResult.fail("Error: path parameter is required")
+
+ # Resolve path
+ absolute_path = self._resolve_path(path)
+
+ # Check if file exists
+ if not os.path.exists(absolute_path):
+ return ToolResult.fail(f"Error: File not found: {path}")
+
+ # Check if readable
+ if not os.access(absolute_path, os.R_OK):
+ return ToolResult.fail(f"Error: File is not readable: {path}")
+
+ # Get file info
+ file_ext = Path(absolute_path).suffix.lower()
+ file_size = os.path.getsize(absolute_path)
+ file_name = Path(absolute_path).name
+
+ # Determine file type
+ if file_ext in self.image_extensions:
+ file_type = "image"
+ mime_type = self._get_image_mime_type(file_ext)
+ elif file_ext in self.video_extensions:
+ file_type = "video"
+ mime_type = self._get_video_mime_type(file_ext)
+ elif file_ext in self.audio_extensions:
+ file_type = "audio"
+ mime_type = self._get_audio_mime_type(file_ext)
+ elif file_ext in self.document_extensions:
+ file_type = "document"
+ mime_type = self._get_document_mime_type(file_ext)
+ else:
+ file_type = "file"
+ mime_type = "application/octet-stream"
+
+ # Return file_to_send metadata
+ result = {
+ "type": "file_to_send",
+ "file_type": file_type,
+ "path": absolute_path,
+ "file_name": file_name,
+ "mime_type": mime_type,
+ "size": file_size,
+ "size_formatted": self._format_size(file_size),
+ "message": message or f"正在发送 {file_name}"
+ }
+
+ return ToolResult.success(result)
+
+ def _resolve_path(self, path: str) -> str:
+ """Resolve path to absolute path"""
+ path = os.path.expanduser(path)
+ if os.path.isabs(path):
+ return path
+ return os.path.abspath(os.path.join(self.cwd, path))
+
+ def _get_image_mime_type(self, ext: str) -> str:
+ """Get MIME type for image"""
+ mime_map = {
+ '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
+ '.png': 'image/png', '.gif': 'image/gif',
+ '.webp': 'image/webp', '.bmp': 'image/bmp',
+ '.svg': 'image/svg+xml', '.ico': 'image/x-icon'
+ }
+ return mime_map.get(ext, 'image/jpeg')
+
+ def _get_video_mime_type(self, ext: str) -> str:
+ """Get MIME type for video"""
+ mime_map = {
+ '.mp4': 'video/mp4', '.avi': 'video/x-msvideo',
+ '.mov': 'video/quicktime', '.mkv': 'video/x-matroska',
+ '.webm': 'video/webm', '.flv': 'video/x-flv'
+ }
+ return mime_map.get(ext, 'video/mp4')
+
+ def _get_audio_mime_type(self, ext: str) -> str:
+ """Get MIME type for audio"""
+ mime_map = {
+ '.mp3': 'audio/mpeg', '.wav': 'audio/wav',
+ '.ogg': 'audio/ogg', '.m4a': 'audio/mp4',
+ '.flac': 'audio/flac', '.aac': 'audio/aac'
+ }
+ return mime_map.get(ext, 'audio/mpeg')
+
+ def _get_document_mime_type(self, ext: str) -> str:
+ """Get MIME type for document"""
+ mime_map = {
+ '.pdf': 'application/pdf',
+ '.doc': 'application/msword',
+ '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
+ '.xls': 'application/vnd.ms-excel',
+ '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
+ '.ppt': 'application/vnd.ms-powerpoint',
+ '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
+ '.txt': 'text/plain',
+ '.md': 'text/markdown'
+ }
+ return mime_map.get(ext, 'application/octet-stream')
+
+ def _format_size(self, size_bytes: int) -> str:
+ """Format file size in human-readable format"""
+ for unit in ['B', 'KB', 'MB', 'GB']:
+ if size_bytes < 1024.0:
+ return f"{size_bytes:.1f}{unit}"
+ size_bytes /= 1024.0
+ return f"{size_bytes:.1f}TB"
diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py
index ebc2ca7..a9884e5 100644
--- a/bridge/agent_bridge.py
+++ b/bridge/agent_bridge.py
@@ -2,6 +2,7 @@
Agent Bridge - Integrates Agent system with existing COW bridge
"""
+import os
from typing import Optional, List
from agent.protocol import Agent, LLMModel, LLMRequest
@@ -269,8 +270,11 @@ class AgentBridge:
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
- # Load environment variables from workspace .env file
- env_file = os.path.join(workspace_root, '.env')
+ # Migrate API keys from config.json to environment variables (if not already set)
+ self._migrate_config_to_env(workspace_root)
+
+ # Load environment variables from secure .env file location
+ env_file = os.path.expanduser("~/.cow/.env")
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
@@ -280,9 +284,6 @@ class AgentBridge:
logger.warning("[AgentBridge] python-dotenv not installed, skipping .env file loading")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load .env file: {e}")
-
- # Migrate API keys from config.json to environment variables (if not already set)
- self._migrate_config_to_env(workspace_root)
# Initialize workspace and create template files
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
@@ -377,7 +378,6 @@ class AgentBridge:
if tool_name == "env_config":
from agent.tools import EnvConfig
tool = EnvConfig({
- "workspace_dir": workspace_root,
"agent_bridge": self # Pass self reference for hot reload
})
else:
@@ -390,12 +390,6 @@ class AgentBridge:
tool.cwd = file_config.get("cwd", tool.cwd if hasattr(tool, 'cwd') else None)
if 'memory_manager' in file_config:
tool.memory_manager = file_config['memory_manager']
- # Apply API key for bocha_search tool
- elif tool_name == 'bocha_search':
- bocha_api_key = conf().get("bocha_api_key", "")
- if bocha_api_key:
- tool.config = {"bocha_api_key": bocha_api_key}
- tool.api_key = bocha_api_key
tools.append(tool)
logger.debug(f"[AgentBridge] Loaded tool: {tool_name}")
except Exception as e:
@@ -504,8 +498,11 @@ class AgentBridge:
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
- # Load environment variables from workspace .env file
- env_file = os.path.join(workspace_root, '.env')
+ # Migrate API keys from config.json to environment variables (if not already set)
+ self._migrate_config_to_env(workspace_root)
+
+ # Load environment variables from secure .env file location
+ env_file = os.path.expanduser("~/.cow/.env")
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
@@ -609,11 +606,6 @@ class AgentBridge:
tool.cwd = file_config.get("cwd", tool.cwd if hasattr(tool, 'cwd') else None)
if 'memory_manager' in file_config:
tool.memory_manager = file_config['memory_manager']
- elif tool_name == 'bocha_search':
- bocha_api_key = conf().get("bocha_api_key", "")
- if bocha_api_key:
- tool.config = {"bocha_api_key": bocha_api_key}
- tool.api_key = bocha_api_key
tools.append(tool)
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load tool {tool_name} for session {session_id}: {e}")
@@ -767,23 +759,52 @@ class AgentBridge:
if not agent:
return Reply(ReplyType.ERROR, "Failed to initialize super agent")
- # Attach context to scheduler tool if present
- if context and agent.tools:
- for tool in agent.tools:
- if tool.name == "scheduler":
- try:
- from agent.tools.scheduler.integration import attach_scheduler_to_tool
- attach_scheduler_to_tool(tool, context)
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to attach context to scheduler: {e}")
- break
+ # Filter tools based on context
+ original_tools = agent.tools
+ filtered_tools = original_tools
- # Use agent's run_stream method
- response = agent.run_stream(
- user_message=query,
- on_event=on_event,
- clear_history=clear_history
- )
+ # If this is a scheduled task execution, exclude scheduler tool to prevent recursion
+ if context and context.get("is_scheduled_task"):
+ filtered_tools = [tool for tool in agent.tools if tool.name != "scheduler"]
+ agent.tools = filtered_tools
+ logger.info(f"[AgentBridge] Scheduled task execution: excluded scheduler tool ({len(filtered_tools)}/{len(original_tools)} tools)")
+ else:
+ # Attach context to scheduler tool if present
+ if context and agent.tools:
+ for tool in agent.tools:
+ if tool.name == "scheduler":
+ try:
+ from agent.tools.scheduler.integration import attach_scheduler_to_tool
+ attach_scheduler_to_tool(tool, context)
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to attach context to scheduler: {e}")
+ break
+
+ try:
+ # Use agent's run_stream method
+ response = agent.run_stream(
+ user_message=query,
+ on_event=on_event,
+ clear_history=clear_history
+ )
+ finally:
+ # Restore original tools
+ if context and context.get("is_scheduled_task"):
+ agent.tools = original_tools
+
+ # Check if there are files to send (from read tool)
+ if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
+ files_to_send = agent.stream_executor.files_to_send
+ if files_to_send:
+ # Send the first file (for now, handle one file at a time)
+ file_info = files_to_send[0]
+ logger.info(f"[AgentBridge] Sending file: {file_info.get('path')}")
+
+ # Clear files_to_send for next request
+ agent.stream_executor.files_to_send = []
+
+ # Return file reply based on file type
+ return self._create_file_reply(file_info, response, context)
return Reply(ReplyType.TEXT, response)
@@ -791,12 +812,53 @@ class AgentBridge:
logger.error(f"Agent reply error: {e}")
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
+ def _create_file_reply(self, file_info: dict, text_response: str, context: Context = None) -> Reply:
+ """
+ Create a reply for sending files
+
+ Args:
+ file_info: File metadata from read tool
+ text_response: Text response from agent
+ context: Context object
+
+ Returns:
+ Reply object for file sending
+ """
+ file_type = file_info.get("file_type", "file")
+ file_path = file_info.get("path")
+
+ # For images, use IMAGE_URL type (channel will handle upload)
+ if file_type == "image":
+ # Convert local path to file:// URL for channel processing
+ file_url = f"file://{file_path}"
+ logger.info(f"[AgentBridge] Sending image: {file_url}")
+ reply = Reply(ReplyType.IMAGE_URL, file_url)
+ # Attach text message if present (for channels that support text+image)
+ if text_response:
+ reply.text_content = text_response # Store accompanying text
+ return reply
+
+ # For documents (PDF, Excel, Word, PPT), use FILE type
+ if file_type == "document":
+ file_url = f"file://{file_path}"
+ logger.info(f"[AgentBridge] Sending document: {file_url}")
+ reply = Reply(ReplyType.FILE, file_url)
+ reply.file_name = file_info.get("file_name", os.path.basename(file_path))
+ return reply
+
+ # For other files (video, audio), we need channel-specific handling
+ # For now, return text with file info
+ # TODO: Implement video/audio sending when channel supports it
+ message = text_response or file_info.get("message", "文件已准备")
+ message += f"\n\n[文件: {file_info.get('file_name', file_path)}]"
+ return Reply(ReplyType.TEXT, message)
+
def _migrate_config_to_env(self, workspace_root: str):
"""
Migrate API keys from config.json to .env file if not already set
Args:
- workspace_root: Workspace directory path
+ workspace_root: Workspace directory path (not used, kept for compatibility)
"""
from config import conf
import os
@@ -810,7 +872,8 @@ class AgentBridge:
"linkai_api_key": "LINKAI_API_KEY",
}
- env_file = os.path.join(workspace_root, '.env')
+ # Use fixed secure location for .env file
+ env_file = os.path.expanduser("~/.cow/.env")
# Read existing env vars from .env file
existing_env_vars = {}
@@ -830,19 +893,25 @@ class AgentBridge:
for config_key, env_key in key_mapping.items():
# Skip if already in .env file
if env_key in existing_env_vars:
+ logger.debug(f"[AgentBridge] Skipping {env_key} - already in .env")
continue
# Get value from config.json
value = conf().get(config_key, "")
if value and value.strip(): # Only migrate non-empty values
keys_to_migrate[env_key] = value.strip()
+ logger.debug(f"[AgentBridge] Will migrate {env_key} from config.json")
+ else:
+ logger.debug(f"[AgentBridge] Skipping {env_key} - no value in config.json")
# Write new keys to .env file
if keys_to_migrate:
try:
- # Ensure .env file exists
+ # Ensure ~/.cow directory and .env file exist
+ env_dir = os.path.dirname(env_file)
+ if not os.path.exists(env_dir):
+ os.makedirs(env_dir, exist_ok=True)
if not os.path.exists(env_file):
- os.makedirs(os.path.dirname(env_file), exist_ok=True)
open(env_file, 'a').close()
# Append new keys
diff --git a/channel/chat_channel.py b/channel/chat_channel.py
index 1523f67..bceaeef 100644
--- a/channel/chat_channel.py
+++ b/channel/chat_channel.py
@@ -64,15 +64,22 @@ class ChatChannel(Channel):
check_contain(group_name, group_name_keyword_white_list),
]
):
- group_chat_in_one_session = conf().get("group_chat_in_one_session", [])
- session_id = cmsg.actual_user_id
- if any(
- [
- group_name in group_chat_in_one_session,
- "ALL_GROUP" in group_chat_in_one_session,
- ]
- ):
+ # Check global group_shared_session config first
+ group_shared_session = conf().get("group_shared_session", True)
+ if group_shared_session:
+ # All users in the group share the same session
session_id = group_id
+ else:
+ # Check group-specific whitelist (legacy behavior)
+ group_chat_in_one_session = conf().get("group_chat_in_one_session", [])
+ session_id = cmsg.actual_user_id
+ if any(
+ [
+ group_name in group_chat_in_one_session,
+ "ALL_GROUP" in group_chat_in_one_session,
+ ]
+ ):
+ session_id = group_id
else:
logger.debug(f"No need reply, groupName not in whitelist, group_name={group_name}")
return None
@@ -283,7 +290,98 @@ class ChatChannel(Channel):
reply = e_context["reply"]
if not e_context.is_pass() and reply and reply.type:
logger.debug("[chat_channel] ready to send reply: {}, context: {}".format(reply, context))
- self._send(reply, context)
+
+ # 如果是文本回复,尝试提取并发送图片
+ if reply.type == ReplyType.TEXT:
+ self._extract_and_send_images(reply, context)
+ # 如果是图片回复但带有文本内容,先发文本再发图片
+ elif reply.type == ReplyType.IMAGE_URL and hasattr(reply, 'text_content') and reply.text_content:
+ # 先发送文本
+ text_reply = Reply(ReplyType.TEXT, reply.text_content)
+ self._send(text_reply, context)
+ # 短暂延迟后发送图片
+ time.sleep(0.3)
+ self._send(reply, context)
+ else:
+ self._send(reply, context)
+
+ def _extract_and_send_images(self, reply: Reply, context: Context):
+ """
+ 从文本回复中提取图片/视频URL并单独发送
+ 支持格式:[图片: /path/to/image.png], [视频: /path/to/video.mp4], ,
+ 最多发送5个媒体文件
+ """
+ content = reply.content
+ media_items = [] # [(url, type), ...]
+
+ # 正则提取各种格式的媒体URL
+ patterns = [
+ (r'\[图片:\s*([^\]]+)\]', 'image'), # [图片: /path/to/image.png]
+ (r'\[视频:\s*([^\]]+)\]', 'video'), # [视频: /path/to/video.mp4]
+ (r'!\[.*?\]\(([^\)]+)\)', 'image'), #  - 默认图片
+ (r'
]+src=["\']([^"\']+)["\']', 'image'), #
+ (r'