diff --git a/agent/prompt/builder.py b/agent/prompt/builder.py
index d80af06..25179ba 100644
--- a/agent/prompt/builder.py
+++ b/agent/prompt/builder.py
@@ -242,6 +242,8 @@ def _build_tooling_section(tools: List[Any], language: str) -> List[str]:
"- 每次工具调用后,评估是否已获得足够信息来推进或完成任务",
"- 避免重复调用相同的工具和相同参数获取相同的信息,除非用户明确要求",
"",
+ "**安全提醒**: 回复中涉及密钥、令牌、密码等敏感信息时,必须脱敏处理,禁止直接显示完整内容。",
+ "",
])
return lines
diff --git a/agent/skills/config.py b/agent/skills/config.py
index bbcd9f6..21e1737 100644
--- a/agent/skills/config.py
+++ b/agent/skills/config.py
@@ -70,33 +70,27 @@ def should_include_skill(
entry: SkillEntry,
config: Optional[Dict] = None,
current_platform: Optional[str] = None,
- lenient: bool = True,
) -> bool:
"""
Determine if a skill should be included based on requirements.
- Similar to clawdbot's shouldIncludeSkill logic, but with lenient mode:
- - In lenient mode (default): Only check explicit disable and platform, ignore missing requirements
- - In strict mode: Check all requirements (binary, env vars, config)
+ Simple rule: Skills are auto-enabled if their requirements are met.
+ - Has required API keys → enabled
+ - Missing API keys → disabled
+ - Wrong keys → enabled but will fail at runtime (LLM will handle error)
:param entry: SkillEntry to check
- :param config: Configuration dictionary
+ :param config: Configuration dictionary (currently unused, reserved for future)
:param current_platform: Current platform (default: auto-detect)
- :param lenient: If True, ignore missing requirements and load all skills (default: True)
:return: True if skill should be included
"""
metadata = entry.metadata
- skill_name = entry.skill.name
- skill_config = get_skill_config(config, skill_name)
-
- # Always check if skill is explicitly disabled in config
- if skill_config and skill_config.get('enabled') is False:
- return False
+ # No metadata = always include (no requirements)
if not metadata:
return True
- # Always check platform requirements (can't work on wrong platform)
+ # Check platform requirements (can't work on wrong platform)
if metadata.os:
platform_name = current_platform or resolve_runtime_platform()
# Map common platform names
@@ -114,12 +108,7 @@ def should_include_skill(
if metadata.always:
return True
- # In lenient mode, skip requirement checks and load all skills
- # Skills will fail gracefully at runtime if requirements are missing
- if lenient:
- return True
-
- # Strict mode: Check all requirements
+ # Check requirements
if metadata.requires:
# Check required binaries (all must be present)
required_bins = metadata.requires.get('bins', [])
@@ -133,29 +122,13 @@ def should_include_skill(
if not has_any_binary(any_bins):
return False
- # Check environment variables (with config fallback)
+ # Check environment variables (API keys)
+ # Simple rule: All required env vars must be set
required_env = metadata.requires.get('env', [])
if required_env:
for env_name in required_env:
- # Check in order: 1) env var, 2) skill config env, 3) skill config apiKey (if primaryEnv)
- if has_env_var(env_name):
- continue
- if skill_config:
- # Check skill config env dict
- skill_env = skill_config.get('env', {})
- if isinstance(skill_env, dict) and env_name in skill_env:
- continue
- # Check skill config apiKey (if this is the primaryEnv)
- if metadata.primary_env == env_name and skill_config.get('apiKey'):
- continue
- # Requirement not satisfied
- return False
-
- # Check config paths
- required_config = metadata.requires.get('config', [])
- if required_config and config:
- for config_path in required_config:
- if not is_config_path_truthy(config, config_path):
+ if not has_env_var(env_name):
+ # Missing required API key → disable skill
return False
return True
diff --git a/agent/skills/formatter.py b/agent/skills/formatter.py
index e77d7d6..7868e09 100644
--- a/agent/skills/formatter.py
+++ b/agent/skills/formatter.py
@@ -34,6 +34,7 @@ def format_skills_for_prompt(skills: List[Skill]) -> str:
lines.append(f" {_escape_xml(skill.name)}")
lines.append(f" {_escape_xml(skill.description)}")
lines.append(f" {_escape_xml(skill.file_path)}")
+ lines.append(f" {_escape_xml(skill.base_dir)}")
lines.append(" ")
lines.append("")
diff --git a/agent/skills/frontmatter.py b/agent/skills/frontmatter.py
index 565c1f7..9905e29 100644
--- a/agent/skills/frontmatter.py
+++ b/agent/skills/frontmatter.py
@@ -23,7 +23,22 @@ def parse_frontmatter(content: str) -> Dict[str, Any]:
frontmatter_text = match.group(1)
- # Simple YAML-like parsing (supports key: value format)
+ # Try to use PyYAML for proper YAML parsing
+ try:
+ import yaml
+ frontmatter = yaml.safe_load(frontmatter_text)
+ if not isinstance(frontmatter, dict):
+ frontmatter = {}
+ return frontmatter
+ except ImportError:
+ # Fallback to simple parsing if PyYAML not available
+ pass
+ except Exception:
+ # If YAML parsing fails, fall back to simple parsing
+ pass
+
+ # Simple YAML-like parsing (supports key: value format only)
+ # This is a fallback for when PyYAML is not available
for line in frontmatter_text.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
@@ -72,10 +87,8 @@ def parse_metadata(frontmatter: Dict[str, Any]) -> Optional[SkillMetadata]:
if not isinstance(metadata_raw, dict):
return None
- # Support both 'moltbot' and 'cow' keys for compatibility
- meta_obj = metadata_raw.get('moltbot') or metadata_raw.get('cow')
- if not meta_obj or not isinstance(meta_obj, dict):
- return None
+ # Use metadata_raw directly (COW format)
+ meta_obj = metadata_raw
# Parse install specs
install_specs = []
diff --git a/agent/skills/manager.py b/agent/skills/manager.py
index 580dc0f..acc3b17 100644
--- a/agent/skills/manager.py
+++ b/agent/skills/manager.py
@@ -82,32 +82,24 @@ class SkillManager:
self,
skill_filter: Optional[List[str]] = None,
include_disabled: bool = False,
- check_requirements: bool = False, # Changed default to False for lenient loading
- lenient: bool = True, # New parameter for lenient mode
) -> List[SkillEntry]:
"""
Filter skills based on criteria.
- By default (lenient=True), all skills are loaded regardless of missing requirements.
- Skills will fail gracefully at runtime if requirements are not met.
+ Simple rule: Skills are auto-enabled if requirements are met.
+ - Has required API keys → included
+ - Missing API keys → excluded
:param skill_filter: List of skill names to include (None = all)
:param include_disabled: Whether to include skills with disable_model_invocation=True
- :param check_requirements: Whether to check skill requirements (default: False)
- :param lenient: If True, ignore missing requirements (default: True)
:return: Filtered list of skill entries
"""
from agent.skills.config import should_include_skill
entries = list(self.skills.values())
- # Check requirements (platform, explicit disable, etc.)
- # In lenient mode, only checks platform and explicit disable
- if check_requirements or not lenient:
- entries = [e for e in entries if should_include_skill(e, self.config, lenient=lenient)]
- else:
- # Lenient mode: only check explicit disable and platform
- entries = [e for e in entries if should_include_skill(e, self.config, lenient=True)]
+ # Check requirements (platform, binaries, env vars)
+ entries = [e for e in entries if should_include_skill(e, self.config)]
# Apply skill filter
if skill_filter is not None:
diff --git a/agent/tools/__init__.py b/agent/tools/__init__.py
index fd3e3b0..fb9d828 100644
--- a/agent/tools/__init__.py
+++ b/agent/tools/__init__.py
@@ -13,11 +13,21 @@ from agent.tools.ls.ls import Ls
from agent.tools.memory.memory_search import MemorySearchTool
from agent.tools.memory.memory_get import MemoryGetTool
+# Import env config tool
+from agent.tools.env_config.env_config import EnvConfig
+
# Import tools with optional dependencies
def _import_optional_tools():
"""Import tools that have optional dependencies"""
tools = {}
+ # Scheduler Tool (requires croniter)
+ try:
+ from agent.tools.scheduler.scheduler_tool import SchedulerTool
+ tools['SchedulerTool'] = SchedulerTool
+ except ImportError:
+ pass
+
# Google Search (requires requests)
try:
from agent.tools.google_search.google_search import GoogleSearch
@@ -43,6 +53,7 @@ def _import_optional_tools():
# Load optional tools
_optional_tools = _import_optional_tools()
+SchedulerTool = _optional_tools.get('SchedulerTool')
GoogleSearch = _optional_tools.get('GoogleSearch')
FileSave = _optional_tools.get('FileSave')
Terminal = _optional_tools.get('Terminal')
@@ -79,6 +90,8 @@ __all__ = [
'Ls',
'MemorySearchTool',
'MemoryGetTool',
+ 'EnvConfig',
+ 'SchedulerTool',
# Optional tools (may be None if dependencies not available)
'GoogleSearch',
'FileSave',
diff --git a/agent/tools/env_config/__init__.py b/agent/tools/env_config/__init__.py
new file mode 100644
index 0000000..2e5822f
--- /dev/null
+++ b/agent/tools/env_config/__init__.py
@@ -0,0 +1,3 @@
+from agent.tools.env_config.env_config import EnvConfig
+
+__all__ = ['EnvConfig']
diff --git a/agent/tools/env_config/env_config.py b/agent/tools/env_config/env_config.py
new file mode 100644
index 0000000..a988f42
--- /dev/null
+++ b/agent/tools/env_config/env_config.py
@@ -0,0 +1,283 @@
+"""
+Environment Configuration Tool - Manage API keys and environment variables
+"""
+
+import os
+import re
+from typing import Dict, Any
+from pathlib import Path
+
+from agent.tools.base_tool import BaseTool, ToolResult
+from common.log import logger
+
+
+# API Key 知识库:常见的环境变量及其描述
+API_KEY_REGISTRY = {
+ # AI 模型服务
+ "OPENAI_API_KEY": "OpenAI API 密钥 (用于GPT模型、Embedding模型)",
+ "GEMINI_API_KEY": "Google Gemini API 密钥",
+ "CLAUDE_API_KEY": "Claude API 密钥 (用于Claude模型)",
+ "LINKAI_API_KEY": "LinkAI智能体平台 API 密钥,支持多种模型切换",
+ # 搜索服务
+ "BOCHA_API_KEY": "博查 AI 搜索 API 密钥 ",
+}
+
+class EnvConfig(BaseTool):
+ """Tool for managing environment variables (API keys, etc.)"""
+
+ name: str = "env_config"
+ description: str = (
+ "Manage API keys and skill configurations stored in the workspace .env file. "
+ "Use this tool when user wants to configure API keys (like BOCHA_API_KEY, OPENAI_API_KEY), "
+ "view configured keys, or manage skill settings. "
+ "Actions: 'set' (add/update key), 'get' (view specific key), 'list' (show all configured keys), 'delete' (remove key). "
+ "Values are automatically masked for security. Changes take effect immediately via hot reload."
+ )
+
+ params: dict = {
+ "type": "object",
+ "properties": {
+ "action": {
+ "type": "string",
+ "description": "Action to perform: 'set', 'get', 'list', 'delete'",
+ "enum": ["set", "get", "list", "delete"]
+ },
+ "key": {
+ "type": "string",
+ "description": (
+ "Environment variable key name. Common keys:\n"
+ "- OPENAI_API_KEY: OpenAI API (GPT models)\n"
+ "- OPENAI_API_BASE: OpenAI API base URL\n"
+ "- CLAUDE_API_KEY: Anthropic Claude API\n"
+ "- GEMINI_API_KEY: Google Gemini API\n"
+ "- LINKAI_API_KEY: LinkAI platform\n"
+ "- BOCHA_API_KEY: Bocha AI search (博查搜索)\n"
+ "Use exact key names (case-sensitive, all uppercase with underscores)"
+ )
+ },
+ "value": {
+ "type": "string",
+ "description": "Value to set for the environment variable (for 'set' action)"
+ }
+ },
+ "required": ["action"]
+ }
+
+ def __init__(self, config: dict = None):
+ self.config = config or {}
+ self.workspace_dir = self.config.get("workspace_dir", os.path.expanduser("~/cow"))
+ self.env_path = os.path.join(self.workspace_dir, '.env')
+ self.agent_bridge = self.config.get("agent_bridge") # Reference to AgentBridge for hot reload
+ # Don't create .env file in __init__ to avoid issues during tool discovery
+ # It will be created on first use in execute()
+
+ def _ensure_env_file(self):
+ """Ensure the .env file exists"""
+ # Create workspace directory if it doesn't exist
+ os.makedirs(self.workspace_dir, exist_ok=True)
+
+ if not os.path.exists(self.env_path):
+ Path(self.env_path).touch()
+ logger.info(f"[EnvConfig] Created .env file at {self.env_path}")
+
+ def _mask_value(self, value: str) -> str:
+ """Mask sensitive parts of a value for logging"""
+ if not value or len(value) <= 10:
+ return "***"
+ return f"{value[:6]}***{value[-4:]}"
+
+ def _read_env_file(self) -> Dict[str, str]:
+ """Read all key-value pairs from .env file"""
+ env_vars = {}
+ if os.path.exists(self.env_path):
+ with open(self.env_path, 'r', encoding='utf-8') as f:
+ for line in f:
+ line = line.strip()
+ # Skip empty lines and comments
+ if not line or line.startswith('#'):
+ continue
+ # Parse KEY=VALUE
+ match = re.match(r'^([^=]+)=(.*)$', line)
+ if match:
+ key, value = match.groups()
+ env_vars[key.strip()] = value.strip()
+ return env_vars
+
+ def _write_env_file(self, env_vars: Dict[str, str]):
+ """Write all key-value pairs to .env file"""
+ with open(self.env_path, 'w', encoding='utf-8') as f:
+ f.write("# Environment variables for agent skills\n")
+ f.write("# Auto-managed by env_config tool\n\n")
+ for key, value in sorted(env_vars.items()):
+ f.write(f"{key}={value}\n")
+
+ def _reload_env(self):
+ """Reload environment variables from .env file"""
+ env_vars = self._read_env_file()
+ for key, value in env_vars.items():
+ os.environ[key] = value
+ logger.debug(f"[EnvConfig] Reloaded {len(env_vars)} environment variables")
+
+ def _refresh_skills(self):
+ """Refresh skills after environment variable changes"""
+ if self.agent_bridge:
+ try:
+ # Reload .env file
+ self._reload_env()
+
+ # Refresh skills in all agent instances
+ refreshed = self.agent_bridge.refresh_all_skills()
+ logger.info(f"[EnvConfig] Refreshed skills in {refreshed} agent instance(s)")
+ return True
+ except Exception as e:
+ logger.warning(f"[EnvConfig] Failed to refresh skills: {e}")
+ return False
+ return False
+
+ def execute(self, args: Dict[str, Any]) -> ToolResult:
+ """
+ Execute environment configuration operation
+
+ :param args: Contains action, key, and value parameters
+ :return: Result of the operation
+ """
+ # Ensure .env file exists on first use
+ self._ensure_env_file()
+
+ action = args.get("action")
+ key = args.get("key")
+ value = args.get("value")
+
+ try:
+ if action == "set":
+ if not key or not value:
+ return ToolResult.fail("Error: 'key' and 'value' are required for 'set' action.")
+
+ # Read current env vars
+ env_vars = self._read_env_file()
+
+ # Update the key
+ env_vars[key] = value
+
+ # Write back to file
+ self._write_env_file(env_vars)
+
+ # Update current process env
+ os.environ[key] = value
+
+ logger.info(f"[EnvConfig] Set {key}={self._mask_value(value)}")
+
+ # Try to refresh skills immediately
+ refreshed = self._refresh_skills()
+
+ result = {
+ "message": f"Successfully set {key}",
+ "key": key,
+ "value": self._mask_value(value),
+ }
+
+ if refreshed:
+ result["note"] = "✅ Skills refreshed automatically - changes are now active"
+ else:
+ result["note"] = "⚠️ Skills not refreshed - restart agent to load new skills"
+
+ return ToolResult.success(result)
+
+ elif action == "get":
+ if not key:
+ return ToolResult.fail("Error: 'key' is required for 'get' action.")
+
+ # Check in file first, then in current env
+ env_vars = self._read_env_file()
+ value = env_vars.get(key) or os.getenv(key)
+
+ # Get description from registry
+ description = API_KEY_REGISTRY.get(key, "未知用途的环境变量")
+
+ if value is not None:
+ logger.info(f"[EnvConfig] Got {key}={self._mask_value(value)}")
+ return ToolResult.success({
+ "key": key,
+ "value": self._mask_value(value),
+ "description": description,
+ "exists": True
+ })
+ else:
+ return ToolResult.success({
+ "key": key,
+ "description": description,
+ "exists": False,
+ "message": f"Environment variable '{key}' is not set"
+ })
+
+ elif action == "list":
+ env_vars = self._read_env_file()
+
+ # Build detailed variable list with descriptions
+ variables_with_info = {}
+ for key, value in env_vars.items():
+ variables_with_info[key] = {
+ "value": self._mask_value(value),
+ "description": API_KEY_REGISTRY.get(key, "未知用途的环境变量")
+ }
+
+ logger.info(f"[EnvConfig] Listed {len(env_vars)} environment variables")
+
+ if not env_vars:
+ return ToolResult.success({
+ "message": "No environment variables configured",
+ "variables": {},
+ "note": "常用的 API 密钥可以通过 env_config(action='set', key='KEY_NAME', value='your-key') 来配置"
+ })
+
+ return ToolResult.success({
+ "message": f"Found {len(env_vars)} environment variable(s)",
+ "variables": variables_with_info
+ })
+
+ elif action == "delete":
+ if not key:
+ return ToolResult.fail("Error: 'key' is required for 'delete' action.")
+
+ # Read current env vars
+ env_vars = self._read_env_file()
+
+ if key not in env_vars:
+ return ToolResult.success({
+ "message": f"Environment variable '{key}' was not set",
+ "key": key
+ })
+
+ # Remove the key
+ del env_vars[key]
+
+ # Write back to file
+ self._write_env_file(env_vars)
+
+ # Remove from current process env
+ if key in os.environ:
+ del os.environ[key]
+
+ logger.info(f"[EnvConfig] Deleted {key}")
+
+ # Try to refresh skills immediately
+ refreshed = self._refresh_skills()
+
+ result = {
+ "message": f"Successfully deleted {key}",
+ "key": key,
+ }
+
+ if refreshed:
+ result["note"] = "✅ Skills refreshed automatically - changes are now active"
+ else:
+ result["note"] = "⚠️ Skills not refreshed - restart agent to apply changes"
+
+ return ToolResult.success(result)
+
+ else:
+ return ToolResult.fail(f"Error: Unknown action '{action}'. Use 'set', 'get', 'list', or 'delete'.")
+
+ except Exception as e:
+ logger.error(f"[EnvConfig] Error: {e}", exc_info=True)
+ return ToolResult.fail(f"EnvConfig tool error: {str(e)}")
diff --git a/agent/tools/scheduler/README.md b/agent/tools/scheduler/README.md
new file mode 100644
index 0000000..ea3432c
--- /dev/null
+++ b/agent/tools/scheduler/README.md
@@ -0,0 +1,286 @@
+# 定时任务工具 (Scheduler Tool)
+
+## 功能简介
+
+定时任务工具允许 Agent 创建、管理和执行定时任务,支持:
+
+- ⏰ **定时提醒**: 在指定时间发送消息
+- 🔄 **周期性任务**: 按固定间隔或 cron 表达式重复执行
+- 🔧 **动态工具调用**: 定时执行其他工具并发送结果(如搜索新闻、查询天气等)
+- 📋 **任务管理**: 查询、启用、禁用、删除任务
+
+## 安装依赖
+
+```bash
+pip install croniter>=2.0.0
+```
+
+## 使用方法
+
+### 1. 创建定时任务
+
+Agent 可以通过自然语言创建定时任务,支持两种类型:
+
+#### 1.1 静态消息任务
+
+发送预定义的消息:
+
+**示例对话:**
+```
+用户: 每天早上9点提醒我开会
+Agent: [调用 scheduler 工具]
+ action: create
+ name: 每日开会提醒
+ message: 该开会了!
+ schedule_type: cron
+ schedule_value: 0 9 * * *
+```
+
+#### 1.2 动态工具调用任务
+
+定时执行工具并发送结果:
+
+**示例对话:**
+```
+用户: 每天早上8点帮我搜索一下当前新闻
+Agent: [调用 scheduler 工具]
+ action: create
+ name: 每日新闻
+ tool_call:
+ tool_name: bocha_search
+ tool_params:
+ query: 今日新闻
+ result_prefix: 📰 今日新闻播报
+ schedule_type: cron
+ schedule_value: 0 8 * * *
+```
+
+**工具调用参数说明:**
+- `tool_name`: 要调用的工具名称(如 `bocha_search`、`web_fetch` 等)
+- `tool_params`: 工具的参数(字典格式)
+- `result_prefix`: 可选,在结果前添加的前缀文本
+
+### 2. 支持的调度类型
+
+#### Cron 表达式 (`cron`)
+使用标准 cron 表达式:
+
+```
+0 9 * * * # 每天 9:00
+0 */2 * * * # 每 2 小时
+30 8 * * 1-5 # 工作日 8:30
+0 0 1 * * # 每月 1 号
+```
+
+#### 固定间隔 (`interval`)
+以秒为单位的间隔:
+
+```
+3600 # 每小时
+86400 # 每天
+1800 # 每 30 分钟
+```
+
+#### 一次性任务 (`once`)
+指定具体时间(ISO 格式):
+
+```
+2024-12-25T09:00:00
+2024-12-31T23:59:59
+```
+
+### 3. 查询任务列表
+
+```
+用户: 查看我的定时任务
+Agent: [调用 scheduler 工具]
+ action: list
+```
+
+### 4. 查看任务详情
+
+```
+用户: 查看任务 abc123 的详情
+Agent: [调用 scheduler 工具]
+ action: get
+ task_id: abc123
+```
+
+### 5. 删除任务
+
+```
+用户: 删除任务 abc123
+Agent: [调用 scheduler 工具]
+ action: delete
+ task_id: abc123
+```
+
+### 6. 启用/禁用任务
+
+```
+用户: 暂停任务 abc123
+Agent: [调用 scheduler 工具]
+ action: disable
+ task_id: abc123
+
+用户: 恢复任务 abc123
+Agent: [调用 scheduler 工具]
+ action: enable
+ task_id: abc123
+```
+
+## 任务存储
+
+任务保存在 JSON 文件中:
+```
+~/cow/scheduler/tasks.json
+```
+
+任务数据结构:
+
+**静态消息任务:**
+```json
+{
+ "id": "abc123",
+ "name": "每日提醒",
+ "enabled": true,
+ "created_at": "2024-01-01T10:00:00",
+ "updated_at": "2024-01-01T10:00:00",
+ "schedule": {
+ "type": "cron",
+ "expression": "0 9 * * *"
+ },
+ "action": {
+ "type": "send_message",
+ "content": "该开会了!",
+ "receiver": "wxid_xxx",
+ "receiver_name": "张三",
+ "is_group": false,
+ "channel_type": "wechat"
+ },
+ "next_run_at": "2024-01-02T09:00:00",
+ "last_run_at": "2024-01-01T09:00:00"
+}
+```
+
+**动态工具调用任务:**
+```json
+{
+ "id": "def456",
+ "name": "每日新闻",
+ "enabled": true,
+ "created_at": "2024-01-01T10:00:00",
+ "updated_at": "2024-01-01T10:00:00",
+ "schedule": {
+ "type": "cron",
+ "expression": "0 8 * * *"
+ },
+ "action": {
+ "type": "tool_call",
+ "tool_name": "bocha_search",
+ "tool_params": {
+ "query": "今日新闻"
+ },
+ "result_prefix": "📰 今日新闻播报",
+ "receiver": "wxid_xxx",
+ "receiver_name": "张三",
+ "is_group": false,
+ "channel_type": "wechat"
+ },
+ "next_run_at": "2024-01-02T08:00:00"
+}
+```
+
+## 后台服务
+
+定时任务由后台服务 `SchedulerService` 管理:
+
+- 每 30 秒检查一次到期任务
+- 自动执行到期任务
+- 计算下次执行时间
+- 记录执行历史和错误
+
+服务在 Agent 初始化时自动启动,无需手动配置。
+
+## 接收者确定
+
+定时任务会发送给**创建任务时的对话对象**:
+
+- 如果在私聊中创建,发送给该用户
+- 如果在群聊中创建,发送到该群
+- 接收者信息在创建时自动保存
+
+## 常见用例
+
+### 1. 每日提醒(静态消息)
+```
+用户: 每天早上8点提醒我吃药
+Agent: ✅ 定时任务创建成功
+ 任务ID: a1b2c3d4
+ 调度: 每天 8:00
+ 消息: 该吃药了!
+```
+
+### 2. 工作日提醒(静态消息)
+```
+用户: 工作日下午6点提醒我下班
+Agent: [创建 cron: 0 18 * * 1-5]
+ 消息: 该下班了!
+```
+
+### 3. 倒计时提醒(静态消息)
+```
+用户: 1小时后提醒我
+Agent: [创建 interval: 3600]
+```
+
+### 4. 每日新闻推送(动态工具调用)
+```
+用户: 每天早上8点帮我搜索一下当前新闻
+Agent: ✅ 定时任务创建成功
+ 任务ID: news001
+ 调度: 每天 8:00
+ 工具: bocha_search(query='今日新闻')
+ 前缀: 📰 今日新闻播报
+```
+
+### 5. 定时天气查询(动态工具调用)
+```
+用户: 每天早上7点查询今天的天气
+Agent: [创建 cron: 0 7 * * *]
+ 工具: bocha_search(query='今日天气')
+ 前缀: 🌤️ 今日天气预报
+```
+
+### 6. 周报提醒(动态工具调用)
+```
+用户: 每周五下午5点搜索本周热点
+Agent: [创建 cron: 0 17 * * 5]
+ 工具: bocha_search(query='本周热点新闻')
+ 前缀: 📊 本周热点回顾
+```
+
+### 4. 特定日期提醒
+```
+用户: 12月25日早上9点提醒我圣诞快乐
+Agent: [创建 once: 2024-12-25T09:00:00]
+```
+
+## 注意事项
+
+1. **时区**: 使用系统本地时区
+2. **精度**: 检查间隔为 30 秒,实际执行可能有 ±30 秒误差
+3. **持久化**: 任务保存在文件中,重启后自动恢复
+4. **一次性任务**: 执行后自动禁用,不会删除(可手动删除)
+5. **错误处理**: 执行失败会记录错误,不影响其他任务
+
+## 技术实现
+
+- **TaskStore**: 任务持久化存储
+- **SchedulerService**: 后台调度服务
+- **SchedulerTool**: Agent 工具接口
+- **Integration**: 与 AgentBridge 集成
+
+## 依赖
+
+- `croniter`: Cron 表达式解析(轻量级,仅 ~50KB)
diff --git a/agent/tools/scheduler/__init__.py b/agent/tools/scheduler/__init__.py
new file mode 100644
index 0000000..dafc8b6
--- /dev/null
+++ b/agent/tools/scheduler/__init__.py
@@ -0,0 +1,7 @@
+"""
+Scheduler tool for managing scheduled tasks
+"""
+
+from .scheduler_tool import SchedulerTool
+
+__all__ = ["SchedulerTool"]
diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py
new file mode 100644
index 0000000..0f8ac35
--- /dev/null
+++ b/agent/tools/scheduler/integration.py
@@ -0,0 +1,239 @@
+"""
+Integration module for scheduler with AgentBridge
+"""
+
+import os
+from typing import Optional
+from config import conf
+from common.log import logger
+from bridge.context import Context, ContextType
+from bridge.reply import Reply, ReplyType
+
+# Global scheduler service instance
+_scheduler_service = None
+_task_store = None
+
+
+def init_scheduler(agent_bridge) -> bool:
+ """
+ Initialize scheduler service
+
+ Args:
+ agent_bridge: AgentBridge instance
+
+ Returns:
+ True if initialized successfully
+ """
+ global _scheduler_service, _task_store
+
+ try:
+ from agent.tools.scheduler.task_store import TaskStore
+ from agent.tools.scheduler.scheduler_service import SchedulerService
+
+ # Get workspace from config
+ workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
+ store_path = os.path.join(workspace_root, "scheduler", "tasks.json")
+
+ # Create task store
+ _task_store = TaskStore(store_path)
+ logger.info(f"[Scheduler] Task store initialized: {store_path}")
+
+ # Create execute callback
+ def execute_task_callback(task: dict):
+ """Callback to execute a scheduled task"""
+ try:
+ action = task.get("action", {})
+ action_type = action.get("type")
+
+ if action_type == "send_message":
+ _execute_send_message(task, agent_bridge)
+ elif action_type == "tool_call":
+ _execute_tool_call(task, agent_bridge)
+ else:
+ logger.warning(f"[Scheduler] Unknown action type: {action_type}")
+ except Exception as e:
+ logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}")
+
+ # Create scheduler service
+ _scheduler_service = SchedulerService(_task_store, execute_task_callback)
+ _scheduler_service.start()
+
+ logger.info("[Scheduler] Scheduler service initialized and started")
+ return True
+
+ except Exception as e:
+ logger.error(f"[Scheduler] Failed to initialize scheduler: {e}")
+ return False
+
+
+def get_task_store():
+ """Get the global task store instance"""
+ return _task_store
+
+
+def get_scheduler_service():
+ """Get the global scheduler service instance"""
+ return _scheduler_service
+
+
+def _execute_send_message(task: dict, agent_bridge):
+ """
+ Execute a send_message action
+
+ Args:
+ task: Task dictionary
+ agent_bridge: AgentBridge instance
+ """
+ try:
+ action = task.get("action", {})
+ content = action.get("content", "")
+ receiver = action.get("receiver")
+ is_group = action.get("is_group", False)
+ channel_type = action.get("channel_type", "unknown")
+
+ if not receiver:
+ logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
+ return
+
+ # Create context for sending message
+ context = Context(ContextType.TEXT, content)
+ context["receiver"] = receiver
+ context["isgroup"] = is_group
+ context["session_id"] = receiver
+
+ # For web channel, generate a virtual request_id
+ if channel_type == "web":
+ import uuid
+ request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
+ context["request_id"] = request_id
+ logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}")
+
+ # Create reply
+ reply = Reply(ReplyType.TEXT, content)
+
+ # Get channel and send
+ from channel.channel_factory import create_channel
+
+ try:
+ channel = create_channel(channel_type)
+ if channel:
+ # For web channel, register the request_id to session mapping
+ if channel_type == "web" and hasattr(channel, 'request_to_session'):
+ channel.request_to_session[request_id] = receiver
+ logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
+
+ channel.send(reply, context)
+ logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}")
+ else:
+ logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
+ except Exception as e:
+ logger.error(f"[Scheduler] Failed to send message: {e}")
+
+ except Exception as e:
+ logger.error(f"[Scheduler] Error in _execute_send_message: {e}")
+
+
+def _execute_tool_call(task: dict, agent_bridge):
+ """
+ Execute a tool_call action
+
+ Args:
+ task: Task dictionary
+ agent_bridge: AgentBridge instance
+ """
+ try:
+ action = task.get("action", {})
+ tool_name = action.get("tool_name")
+ tool_params = action.get("tool_params", {})
+ result_prefix = action.get("result_prefix", "")
+ receiver = action.get("receiver")
+ is_group = action.get("is_group", False)
+ channel_type = action.get("channel_type", "unknown")
+
+ if not tool_name:
+ logger.error(f"[Scheduler] Task {task['id']}: No tool_name specified")
+ return
+
+ if not receiver:
+ logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
+ return
+
+ # Get tool manager and create tool instance
+ from agent.tools.tool_manager import ToolManager
+ tool_manager = ToolManager()
+ tool = tool_manager.create_tool(tool_name)
+
+ if not tool:
+ logger.error(f"[Scheduler] Task {task['id']}: Tool '{tool_name}' not found")
+ return
+
+ # Execute tool
+ logger.info(f"[Scheduler] Task {task['id']}: Executing tool '{tool_name}' with params {tool_params}")
+ result = tool.execute(tool_params)
+
+ # Get result content
+ if hasattr(result, 'result'):
+ content = result.result
+ else:
+ content = str(result)
+
+ # Add prefix if specified
+ if result_prefix:
+ content = f"{result_prefix}\n\n{content}"
+
+ # Send result as message
+ context = Context(ContextType.TEXT, content)
+ context["receiver"] = receiver
+ context["isgroup"] = is_group
+ context["session_id"] = receiver
+
+ # For web channel, generate a virtual request_id
+ if channel_type == "web":
+ import uuid
+ request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
+ context["request_id"] = request_id
+ logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}")
+
+ reply = Reply(ReplyType.TEXT, content)
+
+ # Get channel and send
+ from channel.channel_factory import create_channel
+
+ try:
+ channel = create_channel(channel_type)
+ if channel:
+ # For web channel, register the request_id to session mapping
+ if channel_type == "web" and hasattr(channel, 'request_to_session'):
+ channel.request_to_session[request_id] = receiver
+ logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
+
+ channel.send(reply, context)
+ logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}")
+ else:
+ logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
+ except Exception as e:
+ logger.error(f"[Scheduler] Failed to send tool result: {e}")
+
+ except Exception as e:
+ logger.error(f"[Scheduler] Error in _execute_tool_call: {e}")
+
+
+def attach_scheduler_to_tool(tool, context: Context = None):
+ """
+ Attach scheduler components to a SchedulerTool instance
+
+ Args:
+ tool: SchedulerTool instance
+ context: Current context (optional)
+ """
+ if _task_store:
+ tool.task_store = _task_store
+
+ if context:
+ tool.current_context = context
+
+ # Also set channel_type from config
+ channel_type = conf().get("channel_type", "unknown")
+ if not tool.config:
+ tool.config = {}
+ tool.config["channel_type"] = channel_type
diff --git a/agent/tools/scheduler/scheduler_service.py b/agent/tools/scheduler/scheduler_service.py
new file mode 100644
index 0000000..bc338c2
--- /dev/null
+++ b/agent/tools/scheduler/scheduler_service.py
@@ -0,0 +1,192 @@
+"""
+Background scheduler service for executing scheduled tasks
+"""
+
+import time
+import threading
+from datetime import datetime, timedelta
+from typing import Callable, Optional
+from croniter import croniter
+from common.log import logger
+
+
+class SchedulerService:
+ """
+ Background service that executes scheduled tasks
+ """
+
+ def __init__(self, task_store, execute_callback: Callable):
+ """
+ Initialize scheduler service
+
+ Args:
+ task_store: TaskStore instance
+ execute_callback: Function to call when executing a task
+ """
+ self.task_store = task_store
+ self.execute_callback = execute_callback
+ self.running = False
+ self.thread = None
+ self._lock = threading.Lock()
+
+ def start(self):
+ """Start the scheduler service"""
+ with self._lock:
+ if self.running:
+ logger.warning("[Scheduler] Service already running")
+ return
+
+ self.running = True
+ self.thread = threading.Thread(target=self._run_loop, daemon=True)
+ self.thread.start()
+ logger.info("[Scheduler] Service started")
+
+ def stop(self):
+ """Stop the scheduler service"""
+ with self._lock:
+ if not self.running:
+ return
+
+ self.running = False
+ if self.thread:
+ self.thread.join(timeout=5)
+ logger.info("[Scheduler] Service stopped")
+
+ def _run_loop(self):
+ """Main scheduler loop"""
+ logger.info("[Scheduler] Scheduler loop started")
+
+ while self.running:
+ try:
+ self._check_and_execute_tasks()
+ except Exception as e:
+ logger.error(f"[Scheduler] Error in scheduler loop: {e}")
+
+ # Sleep for 30 seconds between checks
+ time.sleep(30)
+
+ def _check_and_execute_tasks(self):
+ """Check for due tasks and execute them"""
+ now = datetime.now()
+ tasks = self.task_store.list_tasks(enabled_only=True)
+
+ for task in tasks:
+ try:
+ # Check if task is due
+ if self._is_task_due(task, now):
+ logger.info(f"[Scheduler] Executing task: {task['id']} - {task['name']}")
+ self._execute_task(task)
+
+ # Update next run time
+ next_run = self._calculate_next_run(task, now)
+ if next_run:
+ self.task_store.update_task(task['id'], {
+ "next_run_at": next_run.isoformat(),
+ "last_run_at": now.isoformat()
+ })
+ else:
+ # One-time task, disable it
+ self.task_store.update_task(task['id'], {
+ "enabled": False,
+ "last_run_at": now.isoformat()
+ })
+ logger.info(f"[Scheduler] One-time task completed and disabled: {task['id']}")
+ except Exception as e:
+ logger.error(f"[Scheduler] Error processing task {task.get('id')}: {e}")
+
+ def _is_task_due(self, task: dict, now: datetime) -> bool:
+ """
+ Check if a task is due to run
+
+ Args:
+ task: Task dictionary
+ now: Current datetime
+
+ Returns:
+ True if task should run now
+ """
+ next_run_str = task.get("next_run_at")
+ if not next_run_str:
+ # Calculate initial next_run_at
+ next_run = self._calculate_next_run(task, now)
+ if next_run:
+ self.task_store.update_task(task['id'], {
+ "next_run_at": next_run.isoformat()
+ })
+ return False
+ return False
+
+ try:
+ next_run = datetime.fromisoformat(next_run_str)
+ return now >= next_run
+ except:
+ return False
+
+ def _calculate_next_run(self, task: dict, from_time: datetime) -> Optional[datetime]:
+ """
+ Calculate next run time for a task
+
+ Args:
+ task: Task dictionary
+ from_time: Calculate from this time
+
+ Returns:
+ Next run datetime or None for one-time tasks
+ """
+ schedule = task.get("schedule", {})
+ schedule_type = schedule.get("type")
+
+ if schedule_type == "cron":
+ # Cron expression
+ expression = schedule.get("expression")
+ if not expression:
+ return None
+
+ try:
+ cron = croniter(expression, from_time)
+ return cron.get_next(datetime)
+ except Exception as e:
+ logger.error(f"[Scheduler] Invalid cron expression '{expression}': {e}")
+ return None
+
+ elif schedule_type == "interval":
+ # Interval in seconds
+ seconds = schedule.get("seconds", 0)
+ if seconds <= 0:
+ return None
+ return from_time + timedelta(seconds=seconds)
+
+ elif schedule_type == "once":
+ # One-time task at specific time
+ run_at_str = schedule.get("run_at")
+ if not run_at_str:
+ return None
+
+ try:
+ run_at = datetime.fromisoformat(run_at_str)
+ # Only return if in the future
+ if run_at > from_time:
+ return run_at
+ except:
+ pass
+ return None
+
+ return None
+
+ def _execute_task(self, task: dict):
+ """
+ Execute a task
+
+ Args:
+ task: Task dictionary
+ """
+ try:
+ # Call the execute callback
+ self.execute_callback(task)
+ except Exception as e:
+ logger.error(f"[Scheduler] Error executing task {task['id']}: {e}")
+ # Update task with error
+ self.task_store.update_task(task['id'], {
+ "last_error": str(e),
+ "last_error_at": datetime.now().isoformat()
+ })
diff --git a/agent/tools/scheduler/scheduler_tool.py b/agent/tools/scheduler/scheduler_tool.py
new file mode 100644
index 0000000..764711b
--- /dev/null
+++ b/agent/tools/scheduler/scheduler_tool.py
@@ -0,0 +1,439 @@
+"""
+Scheduler tool for creating and managing scheduled tasks
+"""
+
+import uuid
+from datetime import datetime
+from typing import Any, Dict, Optional
+from croniter import croniter
+
+from agent.tools.base_tool import BaseTool, ToolResult
+from bridge.context import Context, ContextType
+from bridge.reply import Reply, ReplyType
+from common.log import logger
+
+
+class SchedulerTool(BaseTool):
+ """
+ Tool for managing scheduled tasks (reminders, notifications, etc.)
+ """
+
+ name: str = "scheduler"
+ description: str = (
+ "创建、查询和管理定时任务。支持两种任务类型:\n"
+ "1. 静态消息任务:定时发送预定义的消息\n"
+ "2. 动态工具任务:定时执行工具调用并发送结果(如搜索新闻、查询天气等)\n\n"
+ "使用方法:\n"
+ "- 创建静态消息任务:action='create', name='任务名', message='消息内容', schedule_type='interval'/'cron'/'once', schedule_value='间隔秒数/cron表达式/时间'\n"
+ "- 创建动态工具任务:action='create', name='任务名', tool_call={'tool_name': '工具名', 'tool_params': {...}, 'result_prefix': '前缀'}, schedule_type='interval'/'cron'/'once', schedule_value='值'\n"
+ "- 查询列表:action='list'\n"
+ "- 查看详情:action='get', task_id='任务ID'\n"
+ "- 删除任务:action='delete', task_id='任务ID'\n"
+ "- 启用任务:action='enable', task_id='任务ID'\n"
+ "- 禁用任务:action='disable', task_id='任务ID'\n\n"
+ "调度类型说明:\n"
+ "- interval: 固定间隔秒数(如3600表示每小时)\n"
+ "- cron: cron表达式(如'0 9 * * *'表示每天9点,'*/10 * * * *'表示每10分钟)\n"
+ "- once: 一次性任务,ISO时间格式(如'2024-12-25T09:00:00')\n\n"
+ "示例:每天早上8点搜索新闻\n"
+ "action='create', name='每日新闻', tool_call={'tool_name': 'bocha_search', 'tool_params': {'query': '今日新闻'}, 'result_prefix': '📰 今日新闻播报'}, schedule_type='cron', schedule_value='0 8 * * *'"
+ )
+ params: dict = {
+ "type": "object",
+ "properties": {
+ "action": {
+ "type": "string",
+ "enum": ["create", "list", "get", "delete", "enable", "disable"],
+ "description": "操作类型: create(创建), list(列表), get(查询), delete(删除), enable(启用), disable(禁用)"
+ },
+ "task_id": {
+ "type": "string",
+ "description": "任务ID (用于 get/delete/enable/disable 操作)"
+ },
+ "name": {
+ "type": "string",
+ "description": "任务名称 (用于 create 操作)"
+ },
+ "message": {
+ "type": "string",
+ "description": "要发送的静态消息内容 (用于 create 操作,与tool_call二选一)"
+ },
+ "tool_call": {
+ "type": "object",
+ "description": "要执行的工具调用 (用于 create 操作,与message二选一)",
+ "properties": {
+ "tool_name": {
+ "type": "string",
+ "description": "工具名称,如 'bocha_search'"
+ },
+ "tool_params": {
+ "type": "object",
+ "description": "工具参数"
+ },
+ "result_prefix": {
+ "type": "string",
+ "description": "结果前缀,如 '今日新闻:'"
+ }
+ },
+ "required": ["tool_name"]
+ },
+ "schedule_type": {
+ "type": "string",
+ "enum": ["cron", "interval", "once"],
+ "description": "调度类型 (用于 create 操作): cron(cron表达式), interval(固定间隔秒数), once(一次性)"
+ },
+ "schedule_value": {
+ "type": "string",
+ "description": (
+ "调度值 (用于 create 操作):\n"
+ "- cron类型: cron表达式,如 '0 9 * * *' (每天9点),'*/10 * * * *' (每10分钟)\n"
+ "- interval类型: 间隔秒数,如 '3600' (每小时),'10' (每10秒)\n"
+ "- once类型: ISO时间,如 '2024-12-25T09:00:00'"
+ )
+ }
+ },
+ "required": ["action"]
+ }
+
+ def __init__(self, config: dict = None):
+ super().__init__()
+ self.config = config or {}
+
+ # Will be set by agent bridge
+ self.task_store = None
+ self.current_context = None
+
+ def execute(self, params: dict) -> ToolResult:
+ """
+ Execute scheduler operations
+
+ Args:
+ params: Dictionary containing:
+ - action: Operation type (create/list/get/delete/enable/disable)
+ - Other parameters depending on action
+
+ Returns:
+ ToolResult object
+ """
+ # Extract parameters
+ action = params.get("action")
+ kwargs = params
+
+ if not self.task_store:
+ return ToolResult.fail("错误: 定时任务系统未初始化")
+
+ try:
+ if action == "create":
+ result = self._create_task(**kwargs)
+ return ToolResult.success(result)
+ elif action == "list":
+ result = self._list_tasks(**kwargs)
+ return ToolResult.success(result)
+ elif action == "get":
+ result = self._get_task(**kwargs)
+ return ToolResult.success(result)
+ elif action == "delete":
+ result = self._delete_task(**kwargs)
+ return ToolResult.success(result)
+ elif action == "enable":
+ result = self._enable_task(**kwargs)
+ return ToolResult.success(result)
+ elif action == "disable":
+ result = self._disable_task(**kwargs)
+ return ToolResult.success(result)
+ else:
+ return ToolResult.fail(f"未知操作: {action}")
+ except Exception as e:
+ logger.error(f"[SchedulerTool] Error: {e}")
+ return ToolResult.fail(f"操作失败: {str(e)}")
+
+ def _create_task(self, **kwargs) -> str:
+ """Create a new scheduled task"""
+ name = kwargs.get("name")
+ message = kwargs.get("message")
+ tool_call = kwargs.get("tool_call")
+ schedule_type = kwargs.get("schedule_type")
+ schedule_value = kwargs.get("schedule_value")
+
+ # Validate required fields
+ if not name:
+ return "错误: 缺少任务名称 (name)"
+ if not message and not tool_call:
+ return "错误: 必须提供 message 或 tool_call 之一"
+ if message and tool_call:
+ return "错误: message 和 tool_call 不能同时提供,请选择其一"
+ if not schedule_type:
+ return "错误: 缺少调度类型 (schedule_type)"
+ if not schedule_value:
+ return "错误: 缺少调度值 (schedule_value)"
+
+ # Validate schedule
+ schedule = self._parse_schedule(schedule_type, schedule_value)
+ if not schedule:
+ return f"错误: 无效的调度配置 - type: {schedule_type}, value: {schedule_value}"
+
+ # Get context info for receiver
+ if not self.current_context:
+ return "错误: 无法获取当前对话上下文"
+
+ context = self.current_context
+
+ # Create task
+ task_id = str(uuid.uuid4())[:8]
+
+ # Build action based on message or tool_call
+ if message:
+ action = {
+ "type": "send_message",
+ "content": message,
+ "receiver": context.get("receiver"),
+ "receiver_name": self._get_receiver_name(context),
+ "is_group": context.get("isgroup", False),
+ "channel_type": self.config.get("channel_type", "unknown")
+ }
+ else: # tool_call
+ action = {
+ "type": "tool_call",
+ "tool_name": tool_call.get("tool_name"),
+ "tool_params": tool_call.get("tool_params", {}),
+ "result_prefix": tool_call.get("result_prefix", ""),
+ "receiver": context.get("receiver"),
+ "receiver_name": self._get_receiver_name(context),
+ "is_group": context.get("isgroup", False),
+ "channel_type": self.config.get("channel_type", "unknown")
+ }
+
+ task = {
+ "id": task_id,
+ "name": name,
+ "enabled": True,
+ "created_at": datetime.now().isoformat(),
+ "updated_at": datetime.now().isoformat(),
+ "schedule": schedule,
+ "action": action
+ }
+
+ # Calculate initial next_run_at
+ next_run = self._calculate_next_run(task)
+ if next_run:
+ task["next_run_at"] = next_run.isoformat()
+
+ # Save task
+ self.task_store.add_task(task)
+
+ # Format response
+ schedule_desc = self._format_schedule_description(schedule)
+ receiver_desc = task["action"]["receiver_name"] or task["action"]["receiver"]
+
+ if message:
+ content_desc = f"💬 消息: {message}"
+ else:
+ tool_name = tool_call.get("tool_name")
+ tool_params_str = str(tool_call.get("tool_params", {}))
+ prefix = tool_call.get("result_prefix", "")
+ content_desc = f"🔧 工具调用: {tool_name}({tool_params_str})"
+ if prefix:
+ content_desc += f"\n📝 结果前缀: {prefix}"
+
+ return (
+ f"✅ 定时任务创建成功\n\n"
+ f"📋 任务ID: {task_id}\n"
+ f"📝 名称: {name}\n"
+ f"⏰ 调度: {schedule_desc}\n"
+ f"👤 接收者: {receiver_desc}\n"
+ f"{content_desc}\n"
+ f"🕐 下次执行: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else '未知'}"
+ )
+
+ def _list_tasks(self, **kwargs) -> str:
+ """List all tasks"""
+ tasks = self.task_store.list_tasks()
+
+ if not tasks:
+ return "📋 暂无定时任务"
+
+ lines = [f"📋 定时任务列表 (共 {len(tasks)} 个)\n"]
+
+ for task in tasks:
+ status = "✅" if task.get("enabled", True) else "❌"
+ schedule_desc = self._format_schedule_description(task.get("schedule", {}))
+ next_run = task.get("next_run_at")
+ next_run_str = datetime.fromisoformat(next_run).strftime('%m-%d %H:%M') if next_run else "未知"
+
+ lines.append(
+ f"{status} [{task['id']}] {task['name']}\n"
+ f" ⏰ {schedule_desc} | 下次: {next_run_str}"
+ )
+
+ return "\n".join(lines)
+
+ def _get_task(self, **kwargs) -> str:
+ """Get task details"""
+ task_id = kwargs.get("task_id")
+ if not task_id:
+ return "错误: 缺少任务ID (task_id)"
+
+ task = self.task_store.get_task(task_id)
+ if not task:
+ return f"错误: 任务 '{task_id}' 不存在"
+
+ status = "启用" if task.get("enabled", True) else "禁用"
+ schedule_desc = self._format_schedule_description(task.get("schedule", {}))
+ action = task.get("action", {})
+ next_run = task.get("next_run_at")
+ next_run_str = datetime.fromisoformat(next_run).strftime('%Y-%m-%d %H:%M:%S') if next_run else "未知"
+ last_run = task.get("last_run_at")
+ last_run_str = datetime.fromisoformat(last_run).strftime('%Y-%m-%d %H:%M:%S') if last_run else "从未执行"
+
+ return (
+ f"📋 任务详情\n\n"
+ f"ID: {task['id']}\n"
+ f"名称: {task['name']}\n"
+ f"状态: {status}\n"
+ f"调度: {schedule_desc}\n"
+ f"接收者: {action.get('receiver_name', action.get('receiver'))}\n"
+ f"消息: {action.get('content')}\n"
+ f"下次执行: {next_run_str}\n"
+ f"上次执行: {last_run_str}\n"
+ f"创建时间: {datetime.fromisoformat(task['created_at']).strftime('%Y-%m-%d %H:%M:%S')}"
+ )
+
+ def _delete_task(self, **kwargs) -> str:
+ """Delete a task"""
+ task_id = kwargs.get("task_id")
+ if not task_id:
+ return "错误: 缺少任务ID (task_id)"
+
+ task = self.task_store.get_task(task_id)
+ if not task:
+ return f"错误: 任务 '{task_id}' 不存在"
+
+ self.task_store.delete_task(task_id)
+ return f"✅ 任务 '{task['name']}' ({task_id}) 已删除"
+
+ def _enable_task(self, **kwargs) -> str:
+ """Enable a task"""
+ task_id = kwargs.get("task_id")
+ if not task_id:
+ return "错误: 缺少任务ID (task_id)"
+
+ task = self.task_store.get_task(task_id)
+ if not task:
+ return f"错误: 任务 '{task_id}' 不存在"
+
+ self.task_store.enable_task(task_id, True)
+ return f"✅ 任务 '{task['name']}' ({task_id}) 已启用"
+
+ def _disable_task(self, **kwargs) -> str:
+ """Disable a task"""
+ task_id = kwargs.get("task_id")
+ if not task_id:
+ return "错误: 缺少任务ID (task_id)"
+
+ task = self.task_store.get_task(task_id)
+ if not task:
+ return f"错误: 任务 '{task_id}' 不存在"
+
+ self.task_store.enable_task(task_id, False)
+ return f"✅ 任务 '{task['name']}' ({task_id}) 已禁用"
+
+ def _parse_schedule(self, schedule_type: str, schedule_value: str) -> Optional[dict]:
+ """Parse and validate schedule configuration"""
+ try:
+ if schedule_type == "cron":
+ # Validate cron expression
+ croniter(schedule_value)
+ return {"type": "cron", "expression": schedule_value}
+
+ elif schedule_type == "interval":
+ # Parse interval in seconds
+ seconds = int(schedule_value)
+ if seconds <= 0:
+ return None
+ return {"type": "interval", "seconds": seconds}
+
+ elif schedule_type == "once":
+ # Parse datetime
+ datetime.fromisoformat(schedule_value)
+ return {"type": "once", "run_at": schedule_value}
+
+ except Exception as e:
+ logger.error(f"[SchedulerTool] Invalid schedule: {e}")
+ return None
+
+ return None
+
+ def _calculate_next_run(self, task: dict) -> Optional[datetime]:
+ """Calculate next run time for a task"""
+ schedule = task.get("schedule", {})
+ schedule_type = schedule.get("type")
+ now = datetime.now()
+
+ if schedule_type == "cron":
+ expression = schedule.get("expression")
+ cron = croniter(expression, now)
+ return cron.get_next(datetime)
+
+ elif schedule_type == "interval":
+ seconds = schedule.get("seconds", 0)
+ from datetime import timedelta
+ return now + timedelta(seconds=seconds)
+
+ elif schedule_type == "once":
+ run_at_str = schedule.get("run_at")
+ return datetime.fromisoformat(run_at_str)
+
+ return None
+
+ def _format_schedule_description(self, schedule: dict) -> str:
+ """Format schedule as human-readable description"""
+ schedule_type = schedule.get("type")
+
+ if schedule_type == "cron":
+ expr = schedule.get("expression", "")
+ # Try to provide friendly description
+ if expr == "0 9 * * *":
+ return "每天 9:00"
+ elif expr == "0 */1 * * *":
+ return "每小时"
+ elif expr == "*/30 * * * *":
+ return "每30分钟"
+ else:
+ return f"Cron: {expr}"
+
+ elif schedule_type == "interval":
+ seconds = schedule.get("seconds", 0)
+ if seconds >= 86400:
+ days = seconds // 86400
+ return f"每 {days} 天"
+ elif seconds >= 3600:
+ hours = seconds // 3600
+ return f"每 {hours} 小时"
+ elif seconds >= 60:
+ minutes = seconds // 60
+ return f"每 {minutes} 分钟"
+ else:
+ return f"每 {seconds} 秒"
+
+ elif schedule_type == "once":
+ run_at = schedule.get("run_at", "")
+ try:
+ dt = datetime.fromisoformat(run_at)
+ return f"一次性 ({dt.strftime('%Y-%m-%d %H:%M')})"
+ except:
+ return "一次性"
+
+ return "未知"
+
+ def _get_receiver_name(self, context: Context) -> str:
+ """Get receiver name from context"""
+ try:
+ msg = context.get("msg")
+ if msg:
+ if context.get("isgroup"):
+ return msg.other_user_nickname or "群聊"
+ else:
+ return msg.from_user_nickname or "用户"
+ except:
+ pass
+ return "未知"
diff --git a/agent/tools/scheduler/task_store.py b/agent/tools/scheduler/task_store.py
new file mode 100644
index 0000000..55e84a1
--- /dev/null
+++ b/agent/tools/scheduler/task_store.py
@@ -0,0 +1,200 @@
+"""
+Task storage management for scheduler
+"""
+
+import json
+import os
+import threading
+from datetime import datetime
+from typing import Dict, List, Optional
+from pathlib import Path
+
+
+class TaskStore:
+ """
+ Manages persistent storage of scheduled tasks
+ """
+
+ def __init__(self, store_path: str = None):
+ """
+ Initialize task store
+
+ Args:
+ store_path: Path to tasks.json file. Defaults to ~/cow/scheduler/tasks.json
+ """
+ if store_path is None:
+ # Default to ~/cow/scheduler/tasks.json
+ home = os.path.expanduser("~")
+ store_path = os.path.join(home, "cow", "scheduler", "tasks.json")
+
+ self.store_path = store_path
+ self.lock = threading.Lock()
+ self._ensure_store_dir()
+
+ def _ensure_store_dir(self):
+ """Ensure the storage directory exists"""
+ store_dir = os.path.dirname(self.store_path)
+ os.makedirs(store_dir, exist_ok=True)
+
+ def load_tasks(self) -> Dict[str, dict]:
+ """
+ Load all tasks from storage
+
+ Returns:
+ Dictionary of task_id -> task_data
+ """
+ with self.lock:
+ if not os.path.exists(self.store_path):
+ return {}
+
+ try:
+ with open(self.store_path, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ return data.get("tasks", {})
+ except Exception as e:
+ print(f"Error loading tasks: {e}")
+ return {}
+
+ def save_tasks(self, tasks: Dict[str, dict]):
+ """
+ Save all tasks to storage
+
+ Args:
+ tasks: Dictionary of task_id -> task_data
+ """
+ with self.lock:
+ try:
+ # Create backup
+ if os.path.exists(self.store_path):
+ backup_path = f"{self.store_path}.bak"
+ try:
+ with open(self.store_path, 'r') as src:
+ with open(backup_path, 'w') as dst:
+ dst.write(src.read())
+ except:
+ pass
+
+ # Save tasks
+ data = {
+ "version": 1,
+ "updated_at": datetime.now().isoformat(),
+ "tasks": tasks
+ }
+
+ with open(self.store_path, 'w', encoding='utf-8') as f:
+ json.dump(data, f, ensure_ascii=False, indent=2)
+ except Exception as e:
+ print(f"Error saving tasks: {e}")
+ raise
+
+ def add_task(self, task: dict) -> bool:
+ """
+ Add a new task
+
+ Args:
+ task: Task data dictionary
+
+ Returns:
+ True if successful
+ """
+ tasks = self.load_tasks()
+ task_id = task.get("id")
+
+ if not task_id:
+ raise ValueError("Task must have an 'id' field")
+
+ if task_id in tasks:
+ raise ValueError(f"Task with id '{task_id}' already exists")
+
+ tasks[task_id] = task
+ self.save_tasks(tasks)
+ return True
+
+ def update_task(self, task_id: str, updates: dict) -> bool:
+ """
+ Update an existing task
+
+ Args:
+ task_id: Task ID
+ updates: Dictionary of fields to update
+
+ Returns:
+ True if successful
+ """
+ tasks = self.load_tasks()
+
+ if task_id not in tasks:
+ raise ValueError(f"Task '{task_id}' not found")
+
+ # Update fields
+ tasks[task_id].update(updates)
+ tasks[task_id]["updated_at"] = datetime.now().isoformat()
+
+ self.save_tasks(tasks)
+ return True
+
+ def delete_task(self, task_id: str) -> bool:
+ """
+ Delete a task
+
+ Args:
+ task_id: Task ID
+
+ Returns:
+ True if successful
+ """
+ tasks = self.load_tasks()
+
+ if task_id not in tasks:
+ raise ValueError(f"Task '{task_id}' not found")
+
+ del tasks[task_id]
+ self.save_tasks(tasks)
+ return True
+
+ def get_task(self, task_id: str) -> Optional[dict]:
+ """
+ Get a specific task
+
+ Args:
+ task_id: Task ID
+
+ Returns:
+ Task data or None if not found
+ """
+ tasks = self.load_tasks()
+ return tasks.get(task_id)
+
+ def list_tasks(self, enabled_only: bool = False) -> List[dict]:
+ """
+ List all tasks
+
+ Args:
+ enabled_only: If True, only return enabled tasks
+
+ Returns:
+ List of task dictionaries
+ """
+ tasks = self.load_tasks()
+ task_list = list(tasks.values())
+
+ if enabled_only:
+ task_list = [t for t in task_list if t.get("enabled", True)]
+
+ # Sort by next_run_at
+ task_list.sort(key=lambda t: t.get("next_run_at", float('inf')))
+
+ return task_list
+
+ def enable_task(self, task_id: str, enabled: bool = True) -> bool:
+ """
+ Enable or disable a task
+
+ Args:
+ task_id: Task ID
+ enabled: True to enable, False to disable
+
+ Returns:
+ True if successful
+ """
+ return self.update_task(task_id, {"enabled": enabled})
diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py
index 56f9a75..ebc2ca7 100644
--- a/bridge/agent_bridge.py
+++ b/bridge/agent_bridge.py
@@ -180,6 +180,7 @@ class AgentBridge:
self.agents = {} # session_id -> Agent instance mapping
self.default_agent = None # For backward compatibility (no session_id)
self.agent: Optional[Agent] = None
+ self.scheduler_initialized = False
def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent:
"""
Create the super agent with COW integration
@@ -268,6 +269,21 @@ class AgentBridge:
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
+ # Load environment variables from workspace .env file
+ env_file = os.path.join(workspace_root, '.env')
+ if os.path.exists(env_file):
+ try:
+ from dotenv import load_dotenv
+ load_dotenv(env_file, override=True)
+ logger.info(f"[AgentBridge] Loaded environment variables from {env_file}")
+ except ImportError:
+ logger.warning("[AgentBridge] python-dotenv not installed, skipping .env file loading")
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to load .env file: {e}")
+
+ # Migrate API keys from config.json to environment variables (if not already set)
+ self._migrate_config_to_env(workspace_root)
+
# Initialize workspace and create template files
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
@@ -357,7 +373,16 @@ class AgentBridge:
for tool_name in tool_manager.tool_classes.keys():
try:
- tool = tool_manager.create_tool(tool_name)
+ # Special handling for EnvConfig tool - pass agent_bridge reference
+ if tool_name == "env_config":
+ from agent.tools import EnvConfig
+ tool = EnvConfig({
+ "workspace_dir": workspace_root,
+ "agent_bridge": self # Pass self reference for hot reload
+ })
+ else:
+ tool = tool_manager.create_tool(tool_name)
+
if tool:
# Apply workspace config to file operation tools
if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
@@ -381,6 +406,36 @@ class AgentBridge:
tools.extend(memory_tools)
logger.info(f"[AgentBridge] Added {len(memory_tools)} memory tools")
+ # Initialize scheduler service (once)
+ if not self.scheduler_initialized:
+ try:
+ from agent.tools.scheduler.integration import init_scheduler
+ if init_scheduler(self):
+ self.scheduler_initialized = True
+ logger.info("[AgentBridge] Scheduler service initialized")
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to initialize scheduler: {e}")
+
+ # Inject scheduler dependencies into SchedulerTool instances
+ if self.scheduler_initialized:
+ try:
+ from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
+ from agent.tools import SchedulerTool
+
+ task_store = get_task_store()
+ scheduler_service = get_scheduler_service()
+
+ for tool in tools:
+ if isinstance(tool, SchedulerTool):
+ tool.task_store = task_store
+ tool.scheduler_service = scheduler_service
+ if not tool.config:
+ tool.config = {}
+ tool.config["channel_type"] = conf().get("channel_type", "unknown")
+ logger.debug("[AgentBridge] Injected scheduler dependencies into SchedulerTool")
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies: {e}")
+
logger.info(f"[AgentBridge] Loaded {len(tools)} tools: {[t.name for t in tools]}")
# Load context files (SOUL.md, USER.md, etc.)
@@ -449,6 +504,21 @@ class AgentBridge:
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
+ # Load environment variables from workspace .env file
+ env_file = os.path.join(workspace_root, '.env')
+ if os.path.exists(env_file):
+ try:
+ from dotenv import load_dotenv
+ load_dotenv(env_file, override=True)
+ logger.info(f"[AgentBridge] Loaded environment variables from {env_file} for session {session_id}")
+ except ImportError:
+ logger.warning(f"[AgentBridge] python-dotenv not installed, skipping .env file loading for session {session_id}")
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to load .env file for session {session_id}: {e}")
+
+ # Migrate API keys from config.json to environment variables (if not already set)
+ self._migrate_config_to_env(workspace_root)
+
# Initialize workspace
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
@@ -550,6 +620,36 @@ class AgentBridge:
if memory_tools:
tools.extend(memory_tools)
+
+ # Initialize scheduler service (once, if not already initialized)
+ if not self.scheduler_initialized:
+ try:
+ from agent.tools.scheduler.integration import init_scheduler
+ if init_scheduler(self):
+ self.scheduler_initialized = True
+ logger.info(f"[AgentBridge] Scheduler service initialized for session {session_id}")
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to initialize scheduler for session {session_id}: {e}")
+
+ # Inject scheduler dependencies into SchedulerTool instances
+ if self.scheduler_initialized:
+ try:
+ from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
+ from agent.tools import SchedulerTool
+
+ task_store = get_task_store()
+ scheduler_service = get_scheduler_service()
+
+ for tool in tools:
+ if isinstance(tool, SchedulerTool):
+ tool.task_store = task_store
+ tool.scheduler_service = scheduler_service
+ if not tool.config:
+ tool.config = {}
+ tool.config["channel_type"] = conf().get("channel_type", "unknown")
+ logger.debug(f"[AgentBridge] Injected scheduler dependencies for session {session_id}")
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies for session {session_id}: {e}")
# Load context files
context_files = load_context_files(workspace_root)
@@ -667,6 +767,17 @@ class AgentBridge:
if not agent:
return Reply(ReplyType.ERROR, "Failed to initialize super agent")
+ # Attach context to scheduler tool if present
+ if context and agent.tools:
+ for tool in agent.tools:
+ if tool.name == "scheduler":
+ try:
+ from agent.tools.scheduler.integration import attach_scheduler_to_tool
+ attach_scheduler_to_tool(tool, context)
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to attach context to scheduler: {e}")
+ break
+
# Use agent's run_stream method
response = agent.run_stream(
user_message=query,
@@ -680,6 +791,72 @@ class AgentBridge:
logger.error(f"Agent reply error: {e}")
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
+ def _migrate_config_to_env(self, workspace_root: str):
+ """
+ Migrate API keys from config.json to .env file if not already set
+
+ Args:
+ workspace_root: Workspace directory path
+ """
+ from config import conf
+ import os
+
+ # Mapping from config.json keys to environment variable names
+ key_mapping = {
+ "open_ai_api_key": "OPENAI_API_KEY",
+ "open_ai_api_base": "OPENAI_API_BASE",
+ "gemini_api_key": "GEMINI_API_KEY",
+ "claude_api_key": "CLAUDE_API_KEY",
+ "linkai_api_key": "LINKAI_API_KEY",
+ }
+
+ env_file = os.path.join(workspace_root, '.env')
+
+ # Read existing env vars from .env file
+ existing_env_vars = {}
+ if os.path.exists(env_file):
+ try:
+ with open(env_file, 'r', encoding='utf-8') as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith('#') and '=' in line:
+ key, _ = line.split('=', 1)
+ existing_env_vars[key.strip()] = True
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to read .env file: {e}")
+
+ # Check which keys need to be migrated
+ keys_to_migrate = {}
+ for config_key, env_key in key_mapping.items():
+ # Skip if already in .env file
+ if env_key in existing_env_vars:
+ continue
+
+ # Get value from config.json
+ value = conf().get(config_key, "")
+ if value and value.strip(): # Only migrate non-empty values
+ keys_to_migrate[env_key] = value.strip()
+
+ # Write new keys to .env file
+ if keys_to_migrate:
+ try:
+ # Ensure .env file exists
+ if not os.path.exists(env_file):
+ os.makedirs(os.path.dirname(env_file), exist_ok=True)
+ open(env_file, 'a').close()
+
+ # Append new keys
+ with open(env_file, 'a', encoding='utf-8') as f:
+ f.write('\n# Auto-migrated from config.json\n')
+ for key, value in keys_to_migrate.items():
+ f.write(f'{key}={value}\n')
+ # Also set in current process
+ os.environ[key] = value
+
+ logger.info(f"[AgentBridge] Migrated {len(keys_to_migrate)} API keys from config.json to .env: {list(keys_to_migrate.keys())}")
+ except Exception as e:
+ logger.warning(f"[AgentBridge] Failed to migrate API keys: {e}")
+
def clear_session(self, session_id: str):
"""
Clear a specific session's agent and conversation history
@@ -695,4 +872,43 @@ class AgentBridge:
"""Clear all agent sessions"""
logger.info(f"[AgentBridge] Clearing all sessions ({len(self.agents)} total)")
self.agents.clear()
- self.default_agent = None
\ No newline at end of file
+ self.default_agent = None
+
+ def refresh_all_skills(self) -> int:
+ """
+ Refresh skills in all agent instances after environment variable changes.
+ This allows hot-reload of skills without restarting the agent.
+
+ Returns:
+ Number of agent instances refreshed
+ """
+ import os
+ from dotenv import load_dotenv
+ from config import conf
+
+ # Reload environment variables from .env file
+ workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
+ env_file = os.path.join(workspace_root, '.env')
+
+ if os.path.exists(env_file):
+ load_dotenv(env_file, override=True)
+ logger.info(f"[AgentBridge] Reloaded environment variables from {env_file}")
+
+ refreshed_count = 0
+
+ # Refresh default agent
+ if self.default_agent and hasattr(self.default_agent, 'skill_manager'):
+ self.default_agent.skill_manager.refresh_skills()
+ refreshed_count += 1
+ logger.info("[AgentBridge] Refreshed skills in default agent")
+
+ # Refresh all session agents
+ for session_id, agent in self.agents.items():
+ if hasattr(agent, 'skill_manager'):
+ agent.skill_manager.refresh_skills()
+ refreshed_count += 1
+
+ if refreshed_count > 0:
+ logger.info(f"[AgentBridge] Refreshed skills in {refreshed_count} agent instance(s)")
+
+ return refreshed_count
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index aae96c6..848acd2 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,6 +9,9 @@ pre-commit
web.py
linkai>=0.0.6.0
agentmesh-sdk>=0.1.3
+python-dotenv>=1.0.0
+PyYAML>=6.0
+croniter>=2.0.0
# feishu websocket mode
lark-oapi
\ No newline at end of file
diff --git a/skills/bocha-search/SKILL.md b/skills/bocha-search/SKILL.md
new file mode 100644
index 0000000..ca52d6c
--- /dev/null
+++ b/skills/bocha-search/SKILL.md
@@ -0,0 +1,91 @@
+---
+name: bocha-search
+description: High-quality web search with AI-optimized results. Use when user needs to search the internet for current information, news, or research topics.
+homepage: https://open.bocha.cn/
+metadata:
+ emoji: 🔍
+ requires:
+ bins: ["curl"]
+ env: ["BOCHA_API_KEY"]
+ primaryEnv: "BOCHA_API_KEY"
+---
+
+# Bocha Search
+
+High-quality web search powered by Bocha AI, optimized for AI consumption. Returns web pages, images, and detailed metadata.
+
+## Setup
+
+This skill requires a Bocha API key. If not configured:
+
+1. Visit https://open.bocha.cn to get an API key
+2. Set the key using: `env_config(action="set", key="BOCHA_API_KEY", value="your-key")`
+3. Or manually add to `~/cow/.env`: `BOCHA_API_KEY=your-key`
+
+## Usage
+
+**Important**: Scripts are located relative to this skill's base directory.
+
+When you see this skill in ``, note the `` path.
+
+```bash
+# General pattern:
+bash "/scripts/search.sh" "" [count] [freshness] [summary]
+
+# Parameters:
+# - query: Search query (required)
+# - count: Number of results (1-50, default: 10)
+# - freshness: Time range filter (default: noLimit)
+# Options: noLimit, oneDay, oneWeek, oneMonth, oneYear, YYYY-MM-DD..YYYY-MM-DD
+# - summary: Include text summary (true/false, default: false)
+```
+
+## Examples
+
+### Basic search
+```bash
+bash "/scripts/search.sh" "latest AI news"
+```
+
+### Search with more results
+```bash
+bash "/scripts/search.sh" "Python tutorials" 20
+```
+
+### Search recent content with summary
+```bash
+bash "/scripts/search.sh" "阿里巴巴ESG报告" 10 oneWeek true
+```
+
+### Search specific date range
+```bash
+bash "/scripts/search.sh" "tech news" 15 "2025-01-01..2025-02-01"
+```
+
+## Response Format
+
+The API returns structured data compatible with Bing Search API:
+
+**Web Pages** (in `data.webPages.value`):
+- `name`: Page title
+- `url`: Page URL
+- `snippet`: Short description
+- `summary`: Full text summary (if requested)
+- `siteName`: Website name
+- `siteIcon`: Website icon URL
+- `datePublished`: Publication date (UTC+8)
+- `language`: Page language
+
+**Images** (in `data.images.value`):
+- `contentUrl`: Image URL
+- `hostPageUrl`: Source page URL
+- `width`, `height`: Image dimensions
+- `thumbnailUrl`: Thumbnail URL
+
+## Notes
+
+- **Optimized for AI**: Results include summaries and structured metadata
+- **Time range**: Use `noLimit` for best results (algorithm auto-optimizes time range)
+- **Timeout**: 30 seconds
+- **Rate limits**: Check your API plan at https://open.bocha.cn
+- **Response format**: Compatible with Bing Search API for easy integration
diff --git a/skills/bocha-search/scripts/search.sh b/skills/bocha-search/scripts/search.sh
new file mode 100755
index 0000000..e803a19
--- /dev/null
+++ b/skills/bocha-search/scripts/search.sh
@@ -0,0 +1,75 @@
+#!/usr/bin/env bash
+# Bocha Web Search API wrapper
+# API Docs: https://open.bocha.cn/
+
+set -euo pipefail
+
+query="${1:-}"
+count="${2:-10}"
+freshness="${3:-noLimit}"
+summary="${4:-false}"
+
+if [ -z "$query" ]; then
+ echo '{"error": "Query is required", "usage": "bash search.sh [count] [freshness] [summary]"}'
+ exit 1
+fi
+
+if [ -z "${BOCHA_API_KEY:-}" ]; then
+ echo '{"error": "BOCHA_API_KEY environment variable is not set", "help": "Visit https://open.bocha.cn to get an API key"}'
+ exit 1
+fi
+
+# Validate count (1-50)
+if ! [[ "$count" =~ ^[0-9]+$ ]] || [ "$count" -lt 1 ] || [ "$count" -gt 50 ]; then
+ count=10
+fi
+
+# Build JSON request body
+request_body=$(cat <&1)
+
+curl_exit_code=$?
+
+if [ $curl_exit_code -ne 0 ]; then
+ echo "{\"error\": \"Failed to call Bocha API\", \"details\": \"$response\"}"
+ exit 1
+fi
+
+# Simple JSON validation - check if response starts with { or [
+if [[ ! "$response" =~ ^[[:space:]]*[\{\[] ]]; then
+ echo "{\"error\": \"Invalid JSON response from API\", \"response\": \"$response\"}"
+ exit 1
+fi
+
+# Extract API code using grep and sed (basic JSON parsing)
+api_code=$(echo "$response" | grep -o '"code"[[:space:]]*:[[:space:]]*[0-9]*' | grep -o '[0-9]*' | head -1)
+
+# If code extraction failed or code is not 200, check for error
+if [ -n "$api_code" ] && [ "$api_code" != "200" ]; then
+ # Try to extract error message
+ api_msg=$(echo "$response" | grep -o '"msg"[[:space:]]*:[[:space:]]*"[^"]*"' | sed 's/"msg"[[:space:]]*:[[:space:]]*"\(.*\)"/\1/' | head -1)
+ if [ -z "$api_msg" ]; then
+ api_msg="Unknown error"
+ fi
+ echo "{\"error\": \"API returned error\", \"code\": $api_code, \"message\": \"$api_msg\"}"
+ exit 1
+fi
+
+# Return the full response
+echo "$response"
diff --git a/skills/skill-creator/SKILL.md b/skills/skill-creator/SKILL.md
index 4a09bc4..9092cb9 100644
--- a/skills/skill-creator/SKILL.md
+++ b/skills/skill-creator/SKILL.md
@@ -243,20 +243,87 @@ If you used `--examples`, delete any placeholder files that are not needed for t
##### Frontmatter
-Write the YAML frontmatter with `name` and `description`:
+Write the YAML frontmatter with `name`, `description`, and optional `metadata`:
- `name`: The skill name
- `description`: This is the primary triggering mechanism for your skill, and helps the agent understand when to use the skill.
- Include both what the Skill does and specific triggers/contexts for when to use it.
- Include all "when to use" information here - Not in the body. The body is only loaded after triggering, so "When to Use This Skill" sections in the body are not helpful to the agent.
- Example description for a `docx` skill: "Comprehensive document creation, editing, and analysis with support for tracked changes, comments, formatting preservation, and text extraction. Use when the agent needs to work with professional documents (.docx files) for: (1) Creating new documents, (2) Modifying or editing content, (3) Working with tracked changes, (4) Adding comments, or any other document tasks"
+- `metadata`: (Optional) Specify requirements and configuration
+ - `requires.bins`: Required binaries (e.g., `["curl", "jq"]`)
+ - `requires.env`: Required environment variables (e.g., `["OPENAI_API_KEY"]`)
+ - `primaryEnv`: Primary environment variable name (for API keys)
+ - `always`: Set to `true` to always load regardless of requirements
+ - `emoji`: Skill icon (optional)
-Do not include any other fields in YAML frontmatter.
+**API Key Requirements**:
+
+If your skill needs an API key, declare it in metadata:
+
+```yaml
+---
+name: my-search
+description: Search using MyAPI
+metadata:
+ requires:
+ bins: ["curl"]
+ env: ["MYAPI_KEY"]
+ primaryEnv: "MYAPI_KEY"
+---
+```
+
+**Auto-enable rule**: Skills are automatically enabled when required environment variables are set, and automatically disabled when missing. No manual configuration needed.
##### Body
Write instructions for using the skill and its bundled resources.
+**If your skill requires an API key**, include setup instructions in the body:
+
+```markdown
+## Setup
+
+This skill requires an API key from [Service Name].
+
+1. Visit https://service.com to get an API key
+2. Configure it using: `env_config(action="set", key="SERVICE_API_KEY", value="your-key")`
+3. Or manually add to `~/cow/.env`: `SERVICE_API_KEY=your-key`
+4. Restart the agent for changes to take effect
+
+## Usage
+...
+```
+
+The bash script should check for the key and provide helpful error messages:
+
+```bash
+#!/usr/bin/env bash
+if [ -z "${SERVICE_API_KEY:-}" ]; then
+ echo "Error: SERVICE_API_KEY not set"
+ echo "Please configure your API key first (see SKILL.md)"
+ exit 1
+fi
+
+curl -H "Authorization: Bearer $SERVICE_API_KEY" ...
+```
+
+**Script Path Convention**:
+
+When writing SKILL.md instructions, remember that:
+- Skills are listed in `` with a `` path
+- Scripts should be referenced as: `/scripts/script_name.sh`
+- The AI will see the base_dir and can construct the full path
+
+Example instruction in SKILL.md:
+```markdown
+## Usage
+
+Scripts are in this skill's base directory (shown in skill listing).
+
+bash "/scripts/my_script.sh"
+```
+
### Step 5: Validate (Optional)
Validate skill format:
diff --git a/skills/web-fetch/SKILL.md b/skills/web-fetch/SKILL.md
index 05d9650..39315fb 100644
--- a/skills/web-fetch/SKILL.md
+++ b/skills/web-fetch/SKILL.md
@@ -1,11 +1,12 @@
---
name: web-fetch
-description: Fetch and extract readable content from web pages
+description: Fetch and extract readable content from web pages. Use for lightweight page access without browser automation.
homepage: https://github.com/zhayujie/chatgpt-on-wechat
metadata:
emoji: 🌐
requires:
bins: ["curl"]
+ always: true
---
# Web Fetch
@@ -14,10 +15,16 @@ Fetch and extract readable content from web pages using curl and basic text proc
## Usage
-Use the provided script to fetch a URL and extract its content:
+**Important**: Scripts are located relative to this skill's base directory.
+
+When you see this skill in ``, note the `` path.
```bash
-bash scripts/fetch.sh [output_file]
+# General pattern:
+bash "/scripts/fetch.sh" [output_file]
+
+# Example (replace with actual path from skill listing):
+bash "~/chatgpt-on-wechat/skills/web-fetch/scripts/fetch.sh" "https://example.com"
```
**Parameters:**
@@ -31,18 +38,18 @@ bash scripts/fetch.sh [output_file]
### Fetch a web page
```bash
-bash scripts/fetch.sh "https://example.com"
+bash "/scripts/fetch.sh" "https://example.com"
```
### Save to file
```bash
-bash scripts/fetch.sh "https://example.com" output.txt
+bash "/scripts/fetch.sh" "https://example.com" output.txt
cat output.txt
```
## Notes
-- Uses curl for HTTP requests (timeout: 20s)
+- Uses curl for HTTP requests (timeout: 10s)
- Extracts title and basic text content
- Removes HTML tags and scripts
- Works with any standard web page
diff --git a/skills/web-fetch/scripts/fetch.sh b/skills/web-fetch/scripts/fetch.sh
index b13a41d..1713b26 100755
--- a/skills/web-fetch/scripts/fetch.sh
+++ b/skills/web-fetch/scripts/fetch.sh
@@ -19,7 +19,7 @@ if [[ ! "$url" =~ ^https?:// ]]; then
fi
# Fetch the page with curl
-html=$(curl -sS -L --max-time 20 \
+html=$(curl -sS -L --max-time 10 \
-H "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" \
-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" \
"$url" 2>&1) || {