feat: key management and scheduled task tools

This commit is contained in:
zhayujie
2026-02-01 19:21:12 +08:00
parent d337140577
commit 4c8712d683
21 changed files with 2170 additions and 68 deletions

View File

@@ -242,6 +242,8 @@ def _build_tooling_section(tools: List[Any], language: str) -> List[str]:
"- 每次工具调用后,评估是否已获得足够信息来推进或完成任务",
"- 避免重复调用相同的工具和相同参数获取相同的信息,除非用户明确要求",
"",
"**安全提醒**: 回复中涉及密钥、令牌、密码等敏感信息时,必须脱敏处理,禁止直接显示完整内容。",
"",
])
return lines

View File

@@ -70,33 +70,27 @@ def should_include_skill(
entry: SkillEntry,
config: Optional[Dict] = None,
current_platform: Optional[str] = None,
lenient: bool = True,
) -> bool:
"""
Determine if a skill should be included based on requirements.
Similar to clawdbot's shouldIncludeSkill logic, but with lenient mode:
- In lenient mode (default): Only check explicit disable and platform, ignore missing requirements
- In strict mode: Check all requirements (binary, env vars, config)
Simple rule: Skills are auto-enabled if their requirements are met.
- Has required API keys → enabled
- Missing API keys → disabled
- Wrong keys → enabled but will fail at runtime (LLM will handle error)
:param entry: SkillEntry to check
:param config: Configuration dictionary
:param config: Configuration dictionary (currently unused, reserved for future)
:param current_platform: Current platform (default: auto-detect)
:param lenient: If True, ignore missing requirements and load all skills (default: True)
:return: True if skill should be included
"""
metadata = entry.metadata
skill_name = entry.skill.name
skill_config = get_skill_config(config, skill_name)
# Always check if skill is explicitly disabled in config
if skill_config and skill_config.get('enabled') is False:
return False
# No metadata = always include (no requirements)
if not metadata:
return True
# Always check platform requirements (can't work on wrong platform)
# Check platform requirements (can't work on wrong platform)
if metadata.os:
platform_name = current_platform or resolve_runtime_platform()
# Map common platform names
@@ -114,12 +108,7 @@ def should_include_skill(
if metadata.always:
return True
# In lenient mode, skip requirement checks and load all skills
# Skills will fail gracefully at runtime if requirements are missing
if lenient:
return True
# Strict mode: Check all requirements
# Check requirements
if metadata.requires:
# Check required binaries (all must be present)
required_bins = metadata.requires.get('bins', [])
@@ -133,29 +122,13 @@ def should_include_skill(
if not has_any_binary(any_bins):
return False
# Check environment variables (with config fallback)
# Check environment variables (API keys)
# Simple rule: All required env vars must be set
required_env = metadata.requires.get('env', [])
if required_env:
for env_name in required_env:
# Check in order: 1) env var, 2) skill config env, 3) skill config apiKey (if primaryEnv)
if has_env_var(env_name):
continue
if skill_config:
# Check skill config env dict
skill_env = skill_config.get('env', {})
if isinstance(skill_env, dict) and env_name in skill_env:
continue
# Check skill config apiKey (if this is the primaryEnv)
if metadata.primary_env == env_name and skill_config.get('apiKey'):
continue
# Requirement not satisfied
return False
# Check config paths
required_config = metadata.requires.get('config', [])
if required_config and config:
for config_path in required_config:
if not is_config_path_truthy(config, config_path):
if not has_env_var(env_name):
# Missing required API key → disable skill
return False
return True

View File

@@ -34,6 +34,7 @@ def format_skills_for_prompt(skills: List[Skill]) -> str:
lines.append(f" <name>{_escape_xml(skill.name)}</name>")
lines.append(f" <description>{_escape_xml(skill.description)}</description>")
lines.append(f" <location>{_escape_xml(skill.file_path)}</location>")
lines.append(f" <base_dir>{_escape_xml(skill.base_dir)}</base_dir>")
lines.append(" </skill>")
lines.append("</available_skills>")

View File

@@ -23,7 +23,22 @@ def parse_frontmatter(content: str) -> Dict[str, Any]:
frontmatter_text = match.group(1)
# Simple YAML-like parsing (supports key: value format)
# Try to use PyYAML for proper YAML parsing
try:
import yaml
frontmatter = yaml.safe_load(frontmatter_text)
if not isinstance(frontmatter, dict):
frontmatter = {}
return frontmatter
except ImportError:
# Fallback to simple parsing if PyYAML not available
pass
except Exception:
# If YAML parsing fails, fall back to simple parsing
pass
# Simple YAML-like parsing (supports key: value format only)
# This is a fallback for when PyYAML is not available
for line in frontmatter_text.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
@@ -72,10 +87,8 @@ def parse_metadata(frontmatter: Dict[str, Any]) -> Optional[SkillMetadata]:
if not isinstance(metadata_raw, dict):
return None
# Support both 'moltbot' and 'cow' keys for compatibility
meta_obj = metadata_raw.get('moltbot') or metadata_raw.get('cow')
if not meta_obj or not isinstance(meta_obj, dict):
return None
# Use metadata_raw directly (COW format)
meta_obj = metadata_raw
# Parse install specs
install_specs = []

View File

@@ -82,32 +82,24 @@ class SkillManager:
self,
skill_filter: Optional[List[str]] = None,
include_disabled: bool = False,
check_requirements: bool = False, # Changed default to False for lenient loading
lenient: bool = True, # New parameter for lenient mode
) -> List[SkillEntry]:
"""
Filter skills based on criteria.
By default (lenient=True), all skills are loaded regardless of missing requirements.
Skills will fail gracefully at runtime if requirements are not met.
Simple rule: Skills are auto-enabled if requirements are met.
- Has required API keys → included
- Missing API keys → excluded
:param skill_filter: List of skill names to include (None = all)
:param include_disabled: Whether to include skills with disable_model_invocation=True
:param check_requirements: Whether to check skill requirements (default: False)
:param lenient: If True, ignore missing requirements (default: True)
:return: Filtered list of skill entries
"""
from agent.skills.config import should_include_skill
entries = list(self.skills.values())
# Check requirements (platform, explicit disable, etc.)
# In lenient mode, only checks platform and explicit disable
if check_requirements or not lenient:
entries = [e for e in entries if should_include_skill(e, self.config, lenient=lenient)]
else:
# Lenient mode: only check explicit disable and platform
entries = [e for e in entries if should_include_skill(e, self.config, lenient=True)]
# Check requirements (platform, binaries, env vars)
entries = [e for e in entries if should_include_skill(e, self.config)]
# Apply skill filter
if skill_filter is not None:

View File

@@ -13,11 +13,21 @@ from agent.tools.ls.ls import Ls
from agent.tools.memory.memory_search import MemorySearchTool
from agent.tools.memory.memory_get import MemoryGetTool
# Import env config tool
from agent.tools.env_config.env_config import EnvConfig
# Import tools with optional dependencies
def _import_optional_tools():
"""Import tools that have optional dependencies"""
tools = {}
# Scheduler Tool (requires croniter)
try:
from agent.tools.scheduler.scheduler_tool import SchedulerTool
tools['SchedulerTool'] = SchedulerTool
except ImportError:
pass
# Google Search (requires requests)
try:
from agent.tools.google_search.google_search import GoogleSearch
@@ -43,6 +53,7 @@ def _import_optional_tools():
# Load optional tools
_optional_tools = _import_optional_tools()
SchedulerTool = _optional_tools.get('SchedulerTool')
GoogleSearch = _optional_tools.get('GoogleSearch')
FileSave = _optional_tools.get('FileSave')
Terminal = _optional_tools.get('Terminal')
@@ -79,6 +90,8 @@ __all__ = [
'Ls',
'MemorySearchTool',
'MemoryGetTool',
'EnvConfig',
'SchedulerTool',
# Optional tools (may be None if dependencies not available)
'GoogleSearch',
'FileSave',

View File

@@ -0,0 +1,3 @@
from agent.tools.env_config.env_config import EnvConfig
__all__ = ['EnvConfig']

View File

@@ -0,0 +1,283 @@
"""
Environment Configuration Tool - Manage API keys and environment variables
"""
import os
import re
from typing import Dict, Any
from pathlib import Path
from agent.tools.base_tool import BaseTool, ToolResult
from common.log import logger
# API Key 知识库:常见的环境变量及其描述
API_KEY_REGISTRY = {
# AI 模型服务
"OPENAI_API_KEY": "OpenAI API 密钥 (用于GPT模型、Embedding模型)",
"GEMINI_API_KEY": "Google Gemini API 密钥",
"CLAUDE_API_KEY": "Claude API 密钥 (用于Claude模型)",
"LINKAI_API_KEY": "LinkAI智能体平台 API 密钥,支持多种模型切换",
# 搜索服务
"BOCHA_API_KEY": "博查 AI 搜索 API 密钥 ",
}
class EnvConfig(BaseTool):
"""Tool for managing environment variables (API keys, etc.)"""
name: str = "env_config"
description: str = (
"Manage API keys and skill configurations stored in the workspace .env file. "
"Use this tool when user wants to configure API keys (like BOCHA_API_KEY, OPENAI_API_KEY), "
"view configured keys, or manage skill settings. "
"Actions: 'set' (add/update key), 'get' (view specific key), 'list' (show all configured keys), 'delete' (remove key). "
"Values are automatically masked for security. Changes take effect immediately via hot reload."
)
params: dict = {
"type": "object",
"properties": {
"action": {
"type": "string",
"description": "Action to perform: 'set', 'get', 'list', 'delete'",
"enum": ["set", "get", "list", "delete"]
},
"key": {
"type": "string",
"description": (
"Environment variable key name. Common keys:\n"
"- OPENAI_API_KEY: OpenAI API (GPT models)\n"
"- OPENAI_API_BASE: OpenAI API base URL\n"
"- CLAUDE_API_KEY: Anthropic Claude API\n"
"- GEMINI_API_KEY: Google Gemini API\n"
"- LINKAI_API_KEY: LinkAI platform\n"
"- BOCHA_API_KEY: Bocha AI search (博查搜索)\n"
"Use exact key names (case-sensitive, all uppercase with underscores)"
)
},
"value": {
"type": "string",
"description": "Value to set for the environment variable (for 'set' action)"
}
},
"required": ["action"]
}
def __init__(self, config: dict = None):
self.config = config or {}
self.workspace_dir = self.config.get("workspace_dir", os.path.expanduser("~/cow"))
self.env_path = os.path.join(self.workspace_dir, '.env')
self.agent_bridge = self.config.get("agent_bridge") # Reference to AgentBridge for hot reload
# Don't create .env file in __init__ to avoid issues during tool discovery
# It will be created on first use in execute()
def _ensure_env_file(self):
"""Ensure the .env file exists"""
# Create workspace directory if it doesn't exist
os.makedirs(self.workspace_dir, exist_ok=True)
if not os.path.exists(self.env_path):
Path(self.env_path).touch()
logger.info(f"[EnvConfig] Created .env file at {self.env_path}")
def _mask_value(self, value: str) -> str:
"""Mask sensitive parts of a value for logging"""
if not value or len(value) <= 10:
return "***"
return f"{value[:6]}***{value[-4:]}"
def _read_env_file(self) -> Dict[str, str]:
"""Read all key-value pairs from .env file"""
env_vars = {}
if os.path.exists(self.env_path):
with open(self.env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
# Parse KEY=VALUE
match = re.match(r'^([^=]+)=(.*)$', line)
if match:
key, value = match.groups()
env_vars[key.strip()] = value.strip()
return env_vars
def _write_env_file(self, env_vars: Dict[str, str]):
"""Write all key-value pairs to .env file"""
with open(self.env_path, 'w', encoding='utf-8') as f:
f.write("# Environment variables for agent skills\n")
f.write("# Auto-managed by env_config tool\n\n")
for key, value in sorted(env_vars.items()):
f.write(f"{key}={value}\n")
def _reload_env(self):
"""Reload environment variables from .env file"""
env_vars = self._read_env_file()
for key, value in env_vars.items():
os.environ[key] = value
logger.debug(f"[EnvConfig] Reloaded {len(env_vars)} environment variables")
def _refresh_skills(self):
"""Refresh skills after environment variable changes"""
if self.agent_bridge:
try:
# Reload .env file
self._reload_env()
# Refresh skills in all agent instances
refreshed = self.agent_bridge.refresh_all_skills()
logger.info(f"[EnvConfig] Refreshed skills in {refreshed} agent instance(s)")
return True
except Exception as e:
logger.warning(f"[EnvConfig] Failed to refresh skills: {e}")
return False
return False
def execute(self, args: Dict[str, Any]) -> ToolResult:
"""
Execute environment configuration operation
:param args: Contains action, key, and value parameters
:return: Result of the operation
"""
# Ensure .env file exists on first use
self._ensure_env_file()
action = args.get("action")
key = args.get("key")
value = args.get("value")
try:
if action == "set":
if not key or not value:
return ToolResult.fail("Error: 'key' and 'value' are required for 'set' action.")
# Read current env vars
env_vars = self._read_env_file()
# Update the key
env_vars[key] = value
# Write back to file
self._write_env_file(env_vars)
# Update current process env
os.environ[key] = value
logger.info(f"[EnvConfig] Set {key}={self._mask_value(value)}")
# Try to refresh skills immediately
refreshed = self._refresh_skills()
result = {
"message": f"Successfully set {key}",
"key": key,
"value": self._mask_value(value),
}
if refreshed:
result["note"] = "✅ Skills refreshed automatically - changes are now active"
else:
result["note"] = "⚠️ Skills not refreshed - restart agent to load new skills"
return ToolResult.success(result)
elif action == "get":
if not key:
return ToolResult.fail("Error: 'key' is required for 'get' action.")
# Check in file first, then in current env
env_vars = self._read_env_file()
value = env_vars.get(key) or os.getenv(key)
# Get description from registry
description = API_KEY_REGISTRY.get(key, "未知用途的环境变量")
if value is not None:
logger.info(f"[EnvConfig] Got {key}={self._mask_value(value)}")
return ToolResult.success({
"key": key,
"value": self._mask_value(value),
"description": description,
"exists": True
})
else:
return ToolResult.success({
"key": key,
"description": description,
"exists": False,
"message": f"Environment variable '{key}' is not set"
})
elif action == "list":
env_vars = self._read_env_file()
# Build detailed variable list with descriptions
variables_with_info = {}
for key, value in env_vars.items():
variables_with_info[key] = {
"value": self._mask_value(value),
"description": API_KEY_REGISTRY.get(key, "未知用途的环境变量")
}
logger.info(f"[EnvConfig] Listed {len(env_vars)} environment variables")
if not env_vars:
return ToolResult.success({
"message": "No environment variables configured",
"variables": {},
"note": "常用的 API 密钥可以通过 env_config(action='set', key='KEY_NAME', value='your-key') 来配置"
})
return ToolResult.success({
"message": f"Found {len(env_vars)} environment variable(s)",
"variables": variables_with_info
})
elif action == "delete":
if not key:
return ToolResult.fail("Error: 'key' is required for 'delete' action.")
# Read current env vars
env_vars = self._read_env_file()
if key not in env_vars:
return ToolResult.success({
"message": f"Environment variable '{key}' was not set",
"key": key
})
# Remove the key
del env_vars[key]
# Write back to file
self._write_env_file(env_vars)
# Remove from current process env
if key in os.environ:
del os.environ[key]
logger.info(f"[EnvConfig] Deleted {key}")
# Try to refresh skills immediately
refreshed = self._refresh_skills()
result = {
"message": f"Successfully deleted {key}",
"key": key,
}
if refreshed:
result["note"] = "✅ Skills refreshed automatically - changes are now active"
else:
result["note"] = "⚠️ Skills not refreshed - restart agent to apply changes"
return ToolResult.success(result)
else:
return ToolResult.fail(f"Error: Unknown action '{action}'. Use 'set', 'get', 'list', or 'delete'.")
except Exception as e:
logger.error(f"[EnvConfig] Error: {e}", exc_info=True)
return ToolResult.fail(f"EnvConfig tool error: {str(e)}")

View File

@@ -0,0 +1,286 @@
# 定时任务工具 (Scheduler Tool)
## 功能简介
定时任务工具允许 Agent 创建、管理和执行定时任务,支持:
-**定时提醒**: 在指定时间发送消息
- 🔄 **周期性任务**: 按固定间隔或 cron 表达式重复执行
- 🔧 **动态工具调用**: 定时执行其他工具并发送结果(如搜索新闻、查询天气等)
- 📋 **任务管理**: 查询、启用、禁用、删除任务
## 安装依赖
```bash
pip install croniter>=2.0.0
```
## 使用方法
### 1. 创建定时任务
Agent 可以通过自然语言创建定时任务,支持两种类型:
#### 1.1 静态消息任务
发送预定义的消息:
**示例对话:**
```
用户: 每天早上9点提醒我开会
Agent: [调用 scheduler 工具]
action: create
name: 每日开会提醒
message: 该开会了!
schedule_type: cron
schedule_value: 0 9 * * *
```
#### 1.2 动态工具调用任务
定时执行工具并发送结果:
**示例对话:**
```
用户: 每天早上8点帮我搜索一下当前新闻
Agent: [调用 scheduler 工具]
action: create
name: 每日新闻
tool_call:
tool_name: bocha_search
tool_params:
query: 今日新闻
result_prefix: 📰 今日新闻播报
schedule_type: cron
schedule_value: 0 8 * * *
```
**工具调用参数说明:**
- `tool_name`: 要调用的工具名称(如 `bocha_search``web_fetch` 等)
- `tool_params`: 工具的参数(字典格式)
- `result_prefix`: 可选,在结果前添加的前缀文本
### 2. 支持的调度类型
#### Cron 表达式 (`cron`)
使用标准 cron 表达式:
```
0 9 * * * # 每天 9:00
0 */2 * * * # 每 2 小时
30 8 * * 1-5 # 工作日 8:30
0 0 1 * * # 每月 1 号
```
#### 固定间隔 (`interval`)
以秒为单位的间隔:
```
3600 # 每小时
86400 # 每天
1800 # 每 30 分钟
```
#### 一次性任务 (`once`)
指定具体时间ISO 格式):
```
2024-12-25T09:00:00
2024-12-31T23:59:59
```
### 3. 查询任务列表
```
用户: 查看我的定时任务
Agent: [调用 scheduler 工具]
action: list
```
### 4. 查看任务详情
```
用户: 查看任务 abc123 的详情
Agent: [调用 scheduler 工具]
action: get
task_id: abc123
```
### 5. 删除任务
```
用户: 删除任务 abc123
Agent: [调用 scheduler 工具]
action: delete
task_id: abc123
```
### 6. 启用/禁用任务
```
用户: 暂停任务 abc123
Agent: [调用 scheduler 工具]
action: disable
task_id: abc123
用户: 恢复任务 abc123
Agent: [调用 scheduler 工具]
action: enable
task_id: abc123
```
## 任务存储
任务保存在 JSON 文件中:
```
~/cow/scheduler/tasks.json
```
任务数据结构:
**静态消息任务:**
```json
{
"id": "abc123",
"name": "每日提醒",
"enabled": true,
"created_at": "2024-01-01T10:00:00",
"updated_at": "2024-01-01T10:00:00",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"action": {
"type": "send_message",
"content": "该开会了!",
"receiver": "wxid_xxx",
"receiver_name": "张三",
"is_group": false,
"channel_type": "wechat"
},
"next_run_at": "2024-01-02T09:00:00",
"last_run_at": "2024-01-01T09:00:00"
}
```
**动态工具调用任务:**
```json
{
"id": "def456",
"name": "每日新闻",
"enabled": true,
"created_at": "2024-01-01T10:00:00",
"updated_at": "2024-01-01T10:00:00",
"schedule": {
"type": "cron",
"expression": "0 8 * * *"
},
"action": {
"type": "tool_call",
"tool_name": "bocha_search",
"tool_params": {
"query": "今日新闻"
},
"result_prefix": "📰 今日新闻播报",
"receiver": "wxid_xxx",
"receiver_name": "张三",
"is_group": false,
"channel_type": "wechat"
},
"next_run_at": "2024-01-02T08:00:00"
}
```
## 后台服务
定时任务由后台服务 `SchedulerService` 管理:
- 每 30 秒检查一次到期任务
- 自动执行到期任务
- 计算下次执行时间
- 记录执行历史和错误
服务在 Agent 初始化时自动启动,无需手动配置。
## 接收者确定
定时任务会发送给**创建任务时的对话对象**
- 如果在私聊中创建,发送给该用户
- 如果在群聊中创建,发送到该群
- 接收者信息在创建时自动保存
## 常见用例
### 1. 每日提醒(静态消息)
```
用户: 每天早上8点提醒我吃药
Agent: ✅ 定时任务创建成功
任务ID: a1b2c3d4
调度: 每天 8:00
消息: 该吃药了!
```
### 2. 工作日提醒(静态消息)
```
用户: 工作日下午6点提醒我下班
Agent: [创建 cron: 0 18 * * 1-5]
消息: 该下班了!
```
### 3. 倒计时提醒(静态消息)
```
用户: 1小时后提醒我
Agent: [创建 interval: 3600]
```
### 4. 每日新闻推送(动态工具调用)
```
用户: 每天早上8点帮我搜索一下当前新闻
Agent: ✅ 定时任务创建成功
任务ID: news001
调度: 每天 8:00
工具: bocha_search(query='今日新闻')
前缀: 📰 今日新闻播报
```
### 5. 定时天气查询(动态工具调用)
```
用户: 每天早上7点查询今天的天气
Agent: [创建 cron: 0 7 * * *]
工具: bocha_search(query='今日天气')
前缀: 🌤️ 今日天气预报
```
### 6. 周报提醒(动态工具调用)
```
用户: 每周五下午5点搜索本周热点
Agent: [创建 cron: 0 17 * * 5]
工具: bocha_search(query='本周热点新闻')
前缀: 📊 本周热点回顾
```
### 4. 特定日期提醒
```
用户: 12月25日早上9点提醒我圣诞快乐
Agent: [创建 once: 2024-12-25T09:00:00]
```
## 注意事项
1. **时区**: 使用系统本地时区
2. **精度**: 检查间隔为 30 秒,实际执行可能有 ±30 秒误差
3. **持久化**: 任务保存在文件中,重启后自动恢复
4. **一次性任务**: 执行后自动禁用,不会删除(可手动删除)
5. **错误处理**: 执行失败会记录错误,不影响其他任务
## 技术实现
- **TaskStore**: 任务持久化存储
- **SchedulerService**: 后台调度服务
- **SchedulerTool**: Agent 工具接口
- **Integration**: 与 AgentBridge 集成
## 依赖
- `croniter`: Cron 表达式解析(轻量级,仅 ~50KB

View File

@@ -0,0 +1,7 @@
"""
Scheduler tool for managing scheduled tasks
"""
from .scheduler_tool import SchedulerTool
__all__ = ["SchedulerTool"]

View File

@@ -0,0 +1,239 @@
"""
Integration module for scheduler with AgentBridge
"""
import os
from typing import Optional
from config import conf
from common.log import logger
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
# Global scheduler service instance
_scheduler_service = None
_task_store = None
def init_scheduler(agent_bridge) -> bool:
"""
Initialize scheduler service
Args:
agent_bridge: AgentBridge instance
Returns:
True if initialized successfully
"""
global _scheduler_service, _task_store
try:
from agent.tools.scheduler.task_store import TaskStore
from agent.tools.scheduler.scheduler_service import SchedulerService
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
store_path = os.path.join(workspace_root, "scheduler", "tasks.json")
# Create task store
_task_store = TaskStore(store_path)
logger.info(f"[Scheduler] Task store initialized: {store_path}")
# Create execute callback
def execute_task_callback(task: dict):
"""Callback to execute a scheduled task"""
try:
action = task.get("action", {})
action_type = action.get("type")
if action_type == "send_message":
_execute_send_message(task, agent_bridge)
elif action_type == "tool_call":
_execute_tool_call(task, agent_bridge)
else:
logger.warning(f"[Scheduler] Unknown action type: {action_type}")
except Exception as e:
logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}")
# Create scheduler service
_scheduler_service = SchedulerService(_task_store, execute_task_callback)
_scheduler_service.start()
logger.info("[Scheduler] Scheduler service initialized and started")
return True
except Exception as e:
logger.error(f"[Scheduler] Failed to initialize scheduler: {e}")
return False
def get_task_store():
"""Get the global task store instance"""
return _task_store
def get_scheduler_service():
"""Get the global scheduler service instance"""
return _scheduler_service
def _execute_send_message(task: dict, agent_bridge):
"""
Execute a send_message action
Args:
task: Task dictionary
agent_bridge: AgentBridge instance
"""
try:
action = task.get("action", {})
content = action.get("content", "")
receiver = action.get("receiver")
is_group = action.get("is_group", False)
channel_type = action.get("channel_type", "unknown")
if not receiver:
logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
return
# Create context for sending message
context = Context(ContextType.TEXT, content)
context["receiver"] = receiver
context["isgroup"] = is_group
context["session_id"] = receiver
# For web channel, generate a virtual request_id
if channel_type == "web":
import uuid
request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
context["request_id"] = request_id
logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}")
# Create reply
reply = Reply(ReplyType.TEXT, content)
# Get channel and send
from channel.channel_factory import create_channel
try:
channel = create_channel(channel_type)
if channel:
# For web channel, register the request_id to session mapping
if channel_type == "web" and hasattr(channel, 'request_to_session'):
channel.request_to_session[request_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
channel.send(reply, context)
logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}")
else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
except Exception as e:
logger.error(f"[Scheduler] Failed to send message: {e}")
except Exception as e:
logger.error(f"[Scheduler] Error in _execute_send_message: {e}")
def _execute_tool_call(task: dict, agent_bridge):
"""
Execute a tool_call action
Args:
task: Task dictionary
agent_bridge: AgentBridge instance
"""
try:
action = task.get("action", {})
tool_name = action.get("tool_name")
tool_params = action.get("tool_params", {})
result_prefix = action.get("result_prefix", "")
receiver = action.get("receiver")
is_group = action.get("is_group", False)
channel_type = action.get("channel_type", "unknown")
if not tool_name:
logger.error(f"[Scheduler] Task {task['id']}: No tool_name specified")
return
if not receiver:
logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
return
# Get tool manager and create tool instance
from agent.tools.tool_manager import ToolManager
tool_manager = ToolManager()
tool = tool_manager.create_tool(tool_name)
if not tool:
logger.error(f"[Scheduler] Task {task['id']}: Tool '{tool_name}' not found")
return
# Execute tool
logger.info(f"[Scheduler] Task {task['id']}: Executing tool '{tool_name}' with params {tool_params}")
result = tool.execute(tool_params)
# Get result content
if hasattr(result, 'result'):
content = result.result
else:
content = str(result)
# Add prefix if specified
if result_prefix:
content = f"{result_prefix}\n\n{content}"
# Send result as message
context = Context(ContextType.TEXT, content)
context["receiver"] = receiver
context["isgroup"] = is_group
context["session_id"] = receiver
# For web channel, generate a virtual request_id
if channel_type == "web":
import uuid
request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
context["request_id"] = request_id
logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}")
reply = Reply(ReplyType.TEXT, content)
# Get channel and send
from channel.channel_factory import create_channel
try:
channel = create_channel(channel_type)
if channel:
# For web channel, register the request_id to session mapping
if channel_type == "web" and hasattr(channel, 'request_to_session'):
channel.request_to_session[request_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
channel.send(reply, context)
logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}")
else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
except Exception as e:
logger.error(f"[Scheduler] Failed to send tool result: {e}")
except Exception as e:
logger.error(f"[Scheduler] Error in _execute_tool_call: {e}")
def attach_scheduler_to_tool(tool, context: Context = None):
"""
Attach scheduler components to a SchedulerTool instance
Args:
tool: SchedulerTool instance
context: Current context (optional)
"""
if _task_store:
tool.task_store = _task_store
if context:
tool.current_context = context
# Also set channel_type from config
channel_type = conf().get("channel_type", "unknown")
if not tool.config:
tool.config = {}
tool.config["channel_type"] = channel_type

View File

@@ -0,0 +1,192 @@
"""
Background scheduler service for executing scheduled tasks
"""
import time
import threading
from datetime import datetime, timedelta
from typing import Callable, Optional
from croniter import croniter
from common.log import logger
class SchedulerService:
"""
Background service that executes scheduled tasks
"""
def __init__(self, task_store, execute_callback: Callable):
"""
Initialize scheduler service
Args:
task_store: TaskStore instance
execute_callback: Function to call when executing a task
"""
self.task_store = task_store
self.execute_callback = execute_callback
self.running = False
self.thread = None
self._lock = threading.Lock()
def start(self):
"""Start the scheduler service"""
with self._lock:
if self.running:
logger.warning("[Scheduler] Service already running")
return
self.running = True
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
logger.info("[Scheduler] Service started")
def stop(self):
"""Stop the scheduler service"""
with self._lock:
if not self.running:
return
self.running = False
if self.thread:
self.thread.join(timeout=5)
logger.info("[Scheduler] Service stopped")
def _run_loop(self):
"""Main scheduler loop"""
logger.info("[Scheduler] Scheduler loop started")
while self.running:
try:
self._check_and_execute_tasks()
except Exception as e:
logger.error(f"[Scheduler] Error in scheduler loop: {e}")
# Sleep for 30 seconds between checks
time.sleep(30)
def _check_and_execute_tasks(self):
"""Check for due tasks and execute them"""
now = datetime.now()
tasks = self.task_store.list_tasks(enabled_only=True)
for task in tasks:
try:
# Check if task is due
if self._is_task_due(task, now):
logger.info(f"[Scheduler] Executing task: {task['id']} - {task['name']}")
self._execute_task(task)
# Update next run time
next_run = self._calculate_next_run(task, now)
if next_run:
self.task_store.update_task(task['id'], {
"next_run_at": next_run.isoformat(),
"last_run_at": now.isoformat()
})
else:
# One-time task, disable it
self.task_store.update_task(task['id'], {
"enabled": False,
"last_run_at": now.isoformat()
})
logger.info(f"[Scheduler] One-time task completed and disabled: {task['id']}")
except Exception as e:
logger.error(f"[Scheduler] Error processing task {task.get('id')}: {e}")
def _is_task_due(self, task: dict, now: datetime) -> bool:
"""
Check if a task is due to run
Args:
task: Task dictionary
now: Current datetime
Returns:
True if task should run now
"""
next_run_str = task.get("next_run_at")
if not next_run_str:
# Calculate initial next_run_at
next_run = self._calculate_next_run(task, now)
if next_run:
self.task_store.update_task(task['id'], {
"next_run_at": next_run.isoformat()
})
return False
return False
try:
next_run = datetime.fromisoformat(next_run_str)
return now >= next_run
except:
return False
def _calculate_next_run(self, task: dict, from_time: datetime) -> Optional[datetime]:
"""
Calculate next run time for a task
Args:
task: Task dictionary
from_time: Calculate from this time
Returns:
Next run datetime or None for one-time tasks
"""
schedule = task.get("schedule", {})
schedule_type = schedule.get("type")
if schedule_type == "cron":
# Cron expression
expression = schedule.get("expression")
if not expression:
return None
try:
cron = croniter(expression, from_time)
return cron.get_next(datetime)
except Exception as e:
logger.error(f"[Scheduler] Invalid cron expression '{expression}': {e}")
return None
elif schedule_type == "interval":
# Interval in seconds
seconds = schedule.get("seconds", 0)
if seconds <= 0:
return None
return from_time + timedelta(seconds=seconds)
elif schedule_type == "once":
# One-time task at specific time
run_at_str = schedule.get("run_at")
if not run_at_str:
return None
try:
run_at = datetime.fromisoformat(run_at_str)
# Only return if in the future
if run_at > from_time:
return run_at
except:
pass
return None
return None
def _execute_task(self, task: dict):
"""
Execute a task
Args:
task: Task dictionary
"""
try:
# Call the execute callback
self.execute_callback(task)
except Exception as e:
logger.error(f"[Scheduler] Error executing task {task['id']}: {e}")
# Update task with error
self.task_store.update_task(task['id'], {
"last_error": str(e),
"last_error_at": datetime.now().isoformat()
})

View File

@@ -0,0 +1,439 @@
"""
Scheduler tool for creating and managing scheduled tasks
"""
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from croniter import croniter
from agent.tools.base_tool import BaseTool, ToolResult
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from common.log import logger
class SchedulerTool(BaseTool):
"""
Tool for managing scheduled tasks (reminders, notifications, etc.)
"""
name: str = "scheduler"
description: str = (
"创建、查询和管理定时任务。支持两种任务类型:\n"
"1. 静态消息任务:定时发送预定义的消息\n"
"2. 动态工具任务:定时执行工具调用并发送结果(如搜索新闻、查询天气等)\n\n"
"使用方法:\n"
"- 创建静态消息任务action='create', name='任务名', message='消息内容', schedule_type='interval'/'cron'/'once', schedule_value='间隔秒数/cron表达式/时间'\n"
"- 创建动态工具任务action='create', name='任务名', tool_call={'tool_name': '工具名', 'tool_params': {...}, 'result_prefix': '前缀'}, schedule_type='interval'/'cron'/'once', schedule_value=''\n"
"- 查询列表action='list'\n"
"- 查看详情action='get', task_id='任务ID'\n"
"- 删除任务action='delete', task_id='任务ID'\n"
"- 启用任务action='enable', task_id='任务ID'\n"
"- 禁用任务action='disable', task_id='任务ID'\n\n"
"调度类型说明:\n"
"- interval: 固定间隔秒数如3600表示每小时\n"
"- cron: cron表达式'0 9 * * *'表示每天9点'*/10 * * * *'表示每10分钟\n"
"- once: 一次性任务ISO时间格式'2024-12-25T09:00:00'\n\n"
"示例每天早上8点搜索新闻\n"
"action='create', name='每日新闻', tool_call={'tool_name': 'bocha_search', 'tool_params': {'query': '今日新闻'}, 'result_prefix': '📰 今日新闻播报'}, schedule_type='cron', schedule_value='0 8 * * *'"
)
params: dict = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["create", "list", "get", "delete", "enable", "disable"],
"description": "操作类型: create(创建), list(列表), get(查询), delete(删除), enable(启用), disable(禁用)"
},
"task_id": {
"type": "string",
"description": "任务ID (用于 get/delete/enable/disable 操作)"
},
"name": {
"type": "string",
"description": "任务名称 (用于 create 操作)"
},
"message": {
"type": "string",
"description": "要发送的静态消息内容 (用于 create 操作与tool_call二选一)"
},
"tool_call": {
"type": "object",
"description": "要执行的工具调用 (用于 create 操作与message二选一)",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称,如 'bocha_search'"
},
"tool_params": {
"type": "object",
"description": "工具参数"
},
"result_prefix": {
"type": "string",
"description": "结果前缀,如 '今日新闻:'"
}
},
"required": ["tool_name"]
},
"schedule_type": {
"type": "string",
"enum": ["cron", "interval", "once"],
"description": "调度类型 (用于 create 操作): cron(cron表达式), interval(固定间隔秒数), once(一次性)"
},
"schedule_value": {
"type": "string",
"description": (
"调度值 (用于 create 操作):\n"
"- cron类型: cron表达式'0 9 * * *' (每天9点)'*/10 * * * *' (每10分钟)\n"
"- interval类型: 间隔秒数,如 '3600' (每小时)'10' (每10秒)\n"
"- once类型: ISO时间'2024-12-25T09:00:00'"
)
}
},
"required": ["action"]
}
def __init__(self, config: dict = None):
super().__init__()
self.config = config or {}
# Will be set by agent bridge
self.task_store = None
self.current_context = None
def execute(self, params: dict) -> ToolResult:
"""
Execute scheduler operations
Args:
params: Dictionary containing:
- action: Operation type (create/list/get/delete/enable/disable)
- Other parameters depending on action
Returns:
ToolResult object
"""
# Extract parameters
action = params.get("action")
kwargs = params
if not self.task_store:
return ToolResult.fail("错误: 定时任务系统未初始化")
try:
if action == "create":
result = self._create_task(**kwargs)
return ToolResult.success(result)
elif action == "list":
result = self._list_tasks(**kwargs)
return ToolResult.success(result)
elif action == "get":
result = self._get_task(**kwargs)
return ToolResult.success(result)
elif action == "delete":
result = self._delete_task(**kwargs)
return ToolResult.success(result)
elif action == "enable":
result = self._enable_task(**kwargs)
return ToolResult.success(result)
elif action == "disable":
result = self._disable_task(**kwargs)
return ToolResult.success(result)
else:
return ToolResult.fail(f"未知操作: {action}")
except Exception as e:
logger.error(f"[SchedulerTool] Error: {e}")
return ToolResult.fail(f"操作失败: {str(e)}")
def _create_task(self, **kwargs) -> str:
"""Create a new scheduled task"""
name = kwargs.get("name")
message = kwargs.get("message")
tool_call = kwargs.get("tool_call")
schedule_type = kwargs.get("schedule_type")
schedule_value = kwargs.get("schedule_value")
# Validate required fields
if not name:
return "错误: 缺少任务名称 (name)"
if not message and not tool_call:
return "错误: 必须提供 message 或 tool_call 之一"
if message and tool_call:
return "错误: message 和 tool_call 不能同时提供,请选择其一"
if not schedule_type:
return "错误: 缺少调度类型 (schedule_type)"
if not schedule_value:
return "错误: 缺少调度值 (schedule_value)"
# Validate schedule
schedule = self._parse_schedule(schedule_type, schedule_value)
if not schedule:
return f"错误: 无效的调度配置 - type: {schedule_type}, value: {schedule_value}"
# Get context info for receiver
if not self.current_context:
return "错误: 无法获取当前对话上下文"
context = self.current_context
# Create task
task_id = str(uuid.uuid4())[:8]
# Build action based on message or tool_call
if message:
action = {
"type": "send_message",
"content": message,
"receiver": context.get("receiver"),
"receiver_name": self._get_receiver_name(context),
"is_group": context.get("isgroup", False),
"channel_type": self.config.get("channel_type", "unknown")
}
else: # tool_call
action = {
"type": "tool_call",
"tool_name": tool_call.get("tool_name"),
"tool_params": tool_call.get("tool_params", {}),
"result_prefix": tool_call.get("result_prefix", ""),
"receiver": context.get("receiver"),
"receiver_name": self._get_receiver_name(context),
"is_group": context.get("isgroup", False),
"channel_type": self.config.get("channel_type", "unknown")
}
task = {
"id": task_id,
"name": name,
"enabled": True,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"schedule": schedule,
"action": action
}
# Calculate initial next_run_at
next_run = self._calculate_next_run(task)
if next_run:
task["next_run_at"] = next_run.isoformat()
# Save task
self.task_store.add_task(task)
# Format response
schedule_desc = self._format_schedule_description(schedule)
receiver_desc = task["action"]["receiver_name"] or task["action"]["receiver"]
if message:
content_desc = f"💬 消息: {message}"
else:
tool_name = tool_call.get("tool_name")
tool_params_str = str(tool_call.get("tool_params", {}))
prefix = tool_call.get("result_prefix", "")
content_desc = f"🔧 工具调用: {tool_name}({tool_params_str})"
if prefix:
content_desc += f"\n📝 结果前缀: {prefix}"
return (
f"✅ 定时任务创建成功\n\n"
f"📋 任务ID: {task_id}\n"
f"📝 名称: {name}\n"
f"⏰ 调度: {schedule_desc}\n"
f"👤 接收者: {receiver_desc}\n"
f"{content_desc}\n"
f"🕐 下次执行: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else '未知'}"
)
def _list_tasks(self, **kwargs) -> str:
"""List all tasks"""
tasks = self.task_store.list_tasks()
if not tasks:
return "📋 暂无定时任务"
lines = [f"📋 定时任务列表 (共 {len(tasks)} 个)\n"]
for task in tasks:
status = "" if task.get("enabled", True) else ""
schedule_desc = self._format_schedule_description(task.get("schedule", {}))
next_run = task.get("next_run_at")
next_run_str = datetime.fromisoformat(next_run).strftime('%m-%d %H:%M') if next_run else "未知"
lines.append(
f"{status} [{task['id']}] {task['name']}\n"
f"{schedule_desc} | 下次: {next_run_str}"
)
return "\n".join(lines)
def _get_task(self, **kwargs) -> str:
"""Get task details"""
task_id = kwargs.get("task_id")
if not task_id:
return "错误: 缺少任务ID (task_id)"
task = self.task_store.get_task(task_id)
if not task:
return f"错误: 任务 '{task_id}' 不存在"
status = "启用" if task.get("enabled", True) else "禁用"
schedule_desc = self._format_schedule_description(task.get("schedule", {}))
action = task.get("action", {})
next_run = task.get("next_run_at")
next_run_str = datetime.fromisoformat(next_run).strftime('%Y-%m-%d %H:%M:%S') if next_run else "未知"
last_run = task.get("last_run_at")
last_run_str = datetime.fromisoformat(last_run).strftime('%Y-%m-%d %H:%M:%S') if last_run else "从未执行"
return (
f"📋 任务详情\n\n"
f"ID: {task['id']}\n"
f"名称: {task['name']}\n"
f"状态: {status}\n"
f"调度: {schedule_desc}\n"
f"接收者: {action.get('receiver_name', action.get('receiver'))}\n"
f"消息: {action.get('content')}\n"
f"下次执行: {next_run_str}\n"
f"上次执行: {last_run_str}\n"
f"创建时间: {datetime.fromisoformat(task['created_at']).strftime('%Y-%m-%d %H:%M:%S')}"
)
def _delete_task(self, **kwargs) -> str:
"""Delete a task"""
task_id = kwargs.get("task_id")
if not task_id:
return "错误: 缺少任务ID (task_id)"
task = self.task_store.get_task(task_id)
if not task:
return f"错误: 任务 '{task_id}' 不存在"
self.task_store.delete_task(task_id)
return f"✅ 任务 '{task['name']}' ({task_id}) 已删除"
def _enable_task(self, **kwargs) -> str:
"""Enable a task"""
task_id = kwargs.get("task_id")
if not task_id:
return "错误: 缺少任务ID (task_id)"
task = self.task_store.get_task(task_id)
if not task:
return f"错误: 任务 '{task_id}' 不存在"
self.task_store.enable_task(task_id, True)
return f"✅ 任务 '{task['name']}' ({task_id}) 已启用"
def _disable_task(self, **kwargs) -> str:
"""Disable a task"""
task_id = kwargs.get("task_id")
if not task_id:
return "错误: 缺少任务ID (task_id)"
task = self.task_store.get_task(task_id)
if not task:
return f"错误: 任务 '{task_id}' 不存在"
self.task_store.enable_task(task_id, False)
return f"✅ 任务 '{task['name']}' ({task_id}) 已禁用"
def _parse_schedule(self, schedule_type: str, schedule_value: str) -> Optional[dict]:
"""Parse and validate schedule configuration"""
try:
if schedule_type == "cron":
# Validate cron expression
croniter(schedule_value)
return {"type": "cron", "expression": schedule_value}
elif schedule_type == "interval":
# Parse interval in seconds
seconds = int(schedule_value)
if seconds <= 0:
return None
return {"type": "interval", "seconds": seconds}
elif schedule_type == "once":
# Parse datetime
datetime.fromisoformat(schedule_value)
return {"type": "once", "run_at": schedule_value}
except Exception as e:
logger.error(f"[SchedulerTool] Invalid schedule: {e}")
return None
return None
def _calculate_next_run(self, task: dict) -> Optional[datetime]:
"""Calculate next run time for a task"""
schedule = task.get("schedule", {})
schedule_type = schedule.get("type")
now = datetime.now()
if schedule_type == "cron":
expression = schedule.get("expression")
cron = croniter(expression, now)
return cron.get_next(datetime)
elif schedule_type == "interval":
seconds = schedule.get("seconds", 0)
from datetime import timedelta
return now + timedelta(seconds=seconds)
elif schedule_type == "once":
run_at_str = schedule.get("run_at")
return datetime.fromisoformat(run_at_str)
return None
def _format_schedule_description(self, schedule: dict) -> str:
"""Format schedule as human-readable description"""
schedule_type = schedule.get("type")
if schedule_type == "cron":
expr = schedule.get("expression", "")
# Try to provide friendly description
if expr == "0 9 * * *":
return "每天 9:00"
elif expr == "0 */1 * * *":
return "每小时"
elif expr == "*/30 * * * *":
return "每30分钟"
else:
return f"Cron: {expr}"
elif schedule_type == "interval":
seconds = schedule.get("seconds", 0)
if seconds >= 86400:
days = seconds // 86400
return f"{days}"
elif seconds >= 3600:
hours = seconds // 3600
return f"{hours} 小时"
elif seconds >= 60:
minutes = seconds // 60
return f"{minutes} 分钟"
else:
return f"{seconds}"
elif schedule_type == "once":
run_at = schedule.get("run_at", "")
try:
dt = datetime.fromisoformat(run_at)
return f"一次性 ({dt.strftime('%Y-%m-%d %H:%M')})"
except:
return "一次性"
return "未知"
def _get_receiver_name(self, context: Context) -> str:
"""Get receiver name from context"""
try:
msg = context.get("msg")
if msg:
if context.get("isgroup"):
return msg.other_user_nickname or "群聊"
else:
return msg.from_user_nickname or "用户"
except:
pass
return "未知"

View File

@@ -0,0 +1,200 @@
"""
Task storage management for scheduler
"""
import json
import os
import threading
from datetime import datetime
from typing import Dict, List, Optional
from pathlib import Path
class TaskStore:
"""
Manages persistent storage of scheduled tasks
"""
def __init__(self, store_path: str = None):
"""
Initialize task store
Args:
store_path: Path to tasks.json file. Defaults to ~/cow/scheduler/tasks.json
"""
if store_path is None:
# Default to ~/cow/scheduler/tasks.json
home = os.path.expanduser("~")
store_path = os.path.join(home, "cow", "scheduler", "tasks.json")
self.store_path = store_path
self.lock = threading.Lock()
self._ensure_store_dir()
def _ensure_store_dir(self):
"""Ensure the storage directory exists"""
store_dir = os.path.dirname(self.store_path)
os.makedirs(store_dir, exist_ok=True)
def load_tasks(self) -> Dict[str, dict]:
"""
Load all tasks from storage
Returns:
Dictionary of task_id -> task_data
"""
with self.lock:
if not os.path.exists(self.store_path):
return {}
try:
with open(self.store_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("tasks", {})
except Exception as e:
print(f"Error loading tasks: {e}")
return {}
def save_tasks(self, tasks: Dict[str, dict]):
"""
Save all tasks to storage
Args:
tasks: Dictionary of task_id -> task_data
"""
with self.lock:
try:
# Create backup
if os.path.exists(self.store_path):
backup_path = f"{self.store_path}.bak"
try:
with open(self.store_path, 'r') as src:
with open(backup_path, 'w') as dst:
dst.write(src.read())
except:
pass
# Save tasks
data = {
"version": 1,
"updated_at": datetime.now().isoformat(),
"tasks": tasks
}
with open(self.store_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Error saving tasks: {e}")
raise
def add_task(self, task: dict) -> bool:
"""
Add a new task
Args:
task: Task data dictionary
Returns:
True if successful
"""
tasks = self.load_tasks()
task_id = task.get("id")
if not task_id:
raise ValueError("Task must have an 'id' field")
if task_id in tasks:
raise ValueError(f"Task with id '{task_id}' already exists")
tasks[task_id] = task
self.save_tasks(tasks)
return True
def update_task(self, task_id: str, updates: dict) -> bool:
"""
Update an existing task
Args:
task_id: Task ID
updates: Dictionary of fields to update
Returns:
True if successful
"""
tasks = self.load_tasks()
if task_id not in tasks:
raise ValueError(f"Task '{task_id}' not found")
# Update fields
tasks[task_id].update(updates)
tasks[task_id]["updated_at"] = datetime.now().isoformat()
self.save_tasks(tasks)
return True
def delete_task(self, task_id: str) -> bool:
"""
Delete a task
Args:
task_id: Task ID
Returns:
True if successful
"""
tasks = self.load_tasks()
if task_id not in tasks:
raise ValueError(f"Task '{task_id}' not found")
del tasks[task_id]
self.save_tasks(tasks)
return True
def get_task(self, task_id: str) -> Optional[dict]:
"""
Get a specific task
Args:
task_id: Task ID
Returns:
Task data or None if not found
"""
tasks = self.load_tasks()
return tasks.get(task_id)
def list_tasks(self, enabled_only: bool = False) -> List[dict]:
"""
List all tasks
Args:
enabled_only: If True, only return enabled tasks
Returns:
List of task dictionaries
"""
tasks = self.load_tasks()
task_list = list(tasks.values())
if enabled_only:
task_list = [t for t in task_list if t.get("enabled", True)]
# Sort by next_run_at
task_list.sort(key=lambda t: t.get("next_run_at", float('inf')))
return task_list
def enable_task(self, task_id: str, enabled: bool = True) -> bool:
"""
Enable or disable a task
Args:
task_id: Task ID
enabled: True to enable, False to disable
Returns:
True if successful
"""
return self.update_task(task_id, {"enabled": enabled})

View File

@@ -180,6 +180,7 @@ class AgentBridge:
self.agents = {} # session_id -> Agent instance mapping
self.default_agent = None # For backward compatibility (no session_id)
self.agent: Optional[Agent] = None
self.scheduler_initialized = False
def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent:
"""
Create the super agent with COW integration
@@ -268,6 +269,21 @@ class AgentBridge:
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
# Load environment variables from workspace .env file
env_file = os.path.join(workspace_root, '.env')
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
load_dotenv(env_file, override=True)
logger.info(f"[AgentBridge] Loaded environment variables from {env_file}")
except ImportError:
logger.warning("[AgentBridge] python-dotenv not installed, skipping .env file loading")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load .env file: {e}")
# Migrate API keys from config.json to environment variables (if not already set)
self._migrate_config_to_env(workspace_root)
# Initialize workspace and create template files
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
@@ -357,7 +373,16 @@ class AgentBridge:
for tool_name in tool_manager.tool_classes.keys():
try:
tool = tool_manager.create_tool(tool_name)
# Special handling for EnvConfig tool - pass agent_bridge reference
if tool_name == "env_config":
from agent.tools import EnvConfig
tool = EnvConfig({
"workspace_dir": workspace_root,
"agent_bridge": self # Pass self reference for hot reload
})
else:
tool = tool_manager.create_tool(tool_name)
if tool:
# Apply workspace config to file operation tools
if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
@@ -381,6 +406,36 @@ class AgentBridge:
tools.extend(memory_tools)
logger.info(f"[AgentBridge] Added {len(memory_tools)} memory tools")
# Initialize scheduler service (once)
if not self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self):
self.scheduler_initialized = True
logger.info("[AgentBridge] Scheduler service initialized")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to initialize scheduler: {e}")
# Inject scheduler dependencies into SchedulerTool instances
if self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
from agent.tools import SchedulerTool
task_store = get_task_store()
scheduler_service = get_scheduler_service()
for tool in tools:
if isinstance(tool, SchedulerTool):
tool.task_store = task_store
tool.scheduler_service = scheduler_service
if not tool.config:
tool.config = {}
tool.config["channel_type"] = conf().get("channel_type", "unknown")
logger.debug("[AgentBridge] Injected scheduler dependencies into SchedulerTool")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies: {e}")
logger.info(f"[AgentBridge] Loaded {len(tools)} tools: {[t.name for t in tools]}")
# Load context files (SOUL.md, USER.md, etc.)
@@ -449,6 +504,21 @@ class AgentBridge:
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
# Load environment variables from workspace .env file
env_file = os.path.join(workspace_root, '.env')
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
load_dotenv(env_file, override=True)
logger.info(f"[AgentBridge] Loaded environment variables from {env_file} for session {session_id}")
except ImportError:
logger.warning(f"[AgentBridge] python-dotenv not installed, skipping .env file loading for session {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load .env file for session {session_id}: {e}")
# Migrate API keys from config.json to environment variables (if not already set)
self._migrate_config_to_env(workspace_root)
# Initialize workspace
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
@@ -550,6 +620,36 @@ class AgentBridge:
if memory_tools:
tools.extend(memory_tools)
# Initialize scheduler service (once, if not already initialized)
if not self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self):
self.scheduler_initialized = True
logger.info(f"[AgentBridge] Scheduler service initialized for session {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to initialize scheduler for session {session_id}: {e}")
# Inject scheduler dependencies into SchedulerTool instances
if self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
from agent.tools import SchedulerTool
task_store = get_task_store()
scheduler_service = get_scheduler_service()
for tool in tools:
if isinstance(tool, SchedulerTool):
tool.task_store = task_store
tool.scheduler_service = scheduler_service
if not tool.config:
tool.config = {}
tool.config["channel_type"] = conf().get("channel_type", "unknown")
logger.debug(f"[AgentBridge] Injected scheduler dependencies for session {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies for session {session_id}: {e}")
# Load context files
context_files = load_context_files(workspace_root)
@@ -667,6 +767,17 @@ class AgentBridge:
if not agent:
return Reply(ReplyType.ERROR, "Failed to initialize super agent")
# Attach context to scheduler tool if present
if context and agent.tools:
for tool in agent.tools:
if tool.name == "scheduler":
try:
from agent.tools.scheduler.integration import attach_scheduler_to_tool
attach_scheduler_to_tool(tool, context)
except Exception as e:
logger.warning(f"[AgentBridge] Failed to attach context to scheduler: {e}")
break
# Use agent's run_stream method
response = agent.run_stream(
user_message=query,
@@ -680,6 +791,72 @@ class AgentBridge:
logger.error(f"Agent reply error: {e}")
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
def _migrate_config_to_env(self, workspace_root: str):
"""
Migrate API keys from config.json to .env file if not already set
Args:
workspace_root: Workspace directory path
"""
from config import conf
import os
# Mapping from config.json keys to environment variable names
key_mapping = {
"open_ai_api_key": "OPENAI_API_KEY",
"open_ai_api_base": "OPENAI_API_BASE",
"gemini_api_key": "GEMINI_API_KEY",
"claude_api_key": "CLAUDE_API_KEY",
"linkai_api_key": "LINKAI_API_KEY",
}
env_file = os.path.join(workspace_root, '.env')
# Read existing env vars from .env file
existing_env_vars = {}
if os.path.exists(env_file):
try:
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, _ = line.split('=', 1)
existing_env_vars[key.strip()] = True
except Exception as e:
logger.warning(f"[AgentBridge] Failed to read .env file: {e}")
# Check which keys need to be migrated
keys_to_migrate = {}
for config_key, env_key in key_mapping.items():
# Skip if already in .env file
if env_key in existing_env_vars:
continue
# Get value from config.json
value = conf().get(config_key, "")
if value and value.strip(): # Only migrate non-empty values
keys_to_migrate[env_key] = value.strip()
# Write new keys to .env file
if keys_to_migrate:
try:
# Ensure .env file exists
if not os.path.exists(env_file):
os.makedirs(os.path.dirname(env_file), exist_ok=True)
open(env_file, 'a').close()
# Append new keys
with open(env_file, 'a', encoding='utf-8') as f:
f.write('\n# Auto-migrated from config.json\n')
for key, value in keys_to_migrate.items():
f.write(f'{key}={value}\n')
# Also set in current process
os.environ[key] = value
logger.info(f"[AgentBridge] Migrated {len(keys_to_migrate)} API keys from config.json to .env: {list(keys_to_migrate.keys())}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to migrate API keys: {e}")
def clear_session(self, session_id: str):
"""
Clear a specific session's agent and conversation history
@@ -695,4 +872,43 @@ class AgentBridge:
"""Clear all agent sessions"""
logger.info(f"[AgentBridge] Clearing all sessions ({len(self.agents)} total)")
self.agents.clear()
self.default_agent = None
self.default_agent = None
def refresh_all_skills(self) -> int:
"""
Refresh skills in all agent instances after environment variable changes.
This allows hot-reload of skills without restarting the agent.
Returns:
Number of agent instances refreshed
"""
import os
from dotenv import load_dotenv
from config import conf
# Reload environment variables from .env file
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
env_file = os.path.join(workspace_root, '.env')
if os.path.exists(env_file):
load_dotenv(env_file, override=True)
logger.info(f"[AgentBridge] Reloaded environment variables from {env_file}")
refreshed_count = 0
# Refresh default agent
if self.default_agent and hasattr(self.default_agent, 'skill_manager'):
self.default_agent.skill_manager.refresh_skills()
refreshed_count += 1
logger.info("[AgentBridge] Refreshed skills in default agent")
# Refresh all session agents
for session_id, agent in self.agents.items():
if hasattr(agent, 'skill_manager'):
agent.skill_manager.refresh_skills()
refreshed_count += 1
if refreshed_count > 0:
logger.info(f"[AgentBridge] Refreshed skills in {refreshed_count} agent instance(s)")
return refreshed_count

View File

@@ -9,6 +9,9 @@ pre-commit
web.py
linkai>=0.0.6.0
agentmesh-sdk>=0.1.3
python-dotenv>=1.0.0
PyYAML>=6.0
croniter>=2.0.0
# feishu websocket mode
lark-oapi

View File

@@ -0,0 +1,91 @@
---
name: bocha-search
description: High-quality web search with AI-optimized results. Use when user needs to search the internet for current information, news, or research topics.
homepage: https://open.bocha.cn/
metadata:
emoji: 🔍
requires:
bins: ["curl"]
env: ["BOCHA_API_KEY"]
primaryEnv: "BOCHA_API_KEY"
---
# Bocha Search
High-quality web search powered by Bocha AI, optimized for AI consumption. Returns web pages, images, and detailed metadata.
## Setup
This skill requires a Bocha API key. If not configured:
1. Visit https://open.bocha.cn to get an API key
2. Set the key using: `env_config(action="set", key="BOCHA_API_KEY", value="your-key")`
3. Or manually add to `~/cow/.env`: `BOCHA_API_KEY=your-key`
## Usage
**Important**: Scripts are located relative to this skill's base directory.
When you see this skill in `<available_skills>`, note the `<base_dir>` path.
```bash
# General pattern:
bash "<base_dir>/scripts/search.sh" "<query>" [count] [freshness] [summary]
# Parameters:
# - query: Search query (required)
# - count: Number of results (1-50, default: 10)
# - freshness: Time range filter (default: noLimit)
# Options: noLimit, oneDay, oneWeek, oneMonth, oneYear, YYYY-MM-DD..YYYY-MM-DD
# - summary: Include text summary (true/false, default: false)
```
## Examples
### Basic search
```bash
bash "<base_dir>/scripts/search.sh" "latest AI news"
```
### Search with more results
```bash
bash "<base_dir>/scripts/search.sh" "Python tutorials" 20
```
### Search recent content with summary
```bash
bash "<base_dir>/scripts/search.sh" "阿里巴巴ESG报告" 10 oneWeek true
```
### Search specific date range
```bash
bash "<base_dir>/scripts/search.sh" "tech news" 15 "2025-01-01..2025-02-01"
```
## Response Format
The API returns structured data compatible with Bing Search API:
**Web Pages** (in `data.webPages.value`):
- `name`: Page title
- `url`: Page URL
- `snippet`: Short description
- `summary`: Full text summary (if requested)
- `siteName`: Website name
- `siteIcon`: Website icon URL
- `datePublished`: Publication date (UTC+8)
- `language`: Page language
**Images** (in `data.images.value`):
- `contentUrl`: Image URL
- `hostPageUrl`: Source page URL
- `width`, `height`: Image dimensions
- `thumbnailUrl`: Thumbnail URL
## Notes
- **Optimized for AI**: Results include summaries and structured metadata
- **Time range**: Use `noLimit` for best results (algorithm auto-optimizes time range)
- **Timeout**: 30 seconds
- **Rate limits**: Check your API plan at https://open.bocha.cn
- **Response format**: Compatible with Bing Search API for easy integration

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env bash
# Bocha Web Search API wrapper
# API Docs: https://open.bocha.cn/
set -euo pipefail
query="${1:-}"
count="${2:-10}"
freshness="${3:-noLimit}"
summary="${4:-false}"
if [ -z "$query" ]; then
echo '{"error": "Query is required", "usage": "bash search.sh <query> [count] [freshness] [summary]"}'
exit 1
fi
if [ -z "${BOCHA_API_KEY:-}" ]; then
echo '{"error": "BOCHA_API_KEY environment variable is not set", "help": "Visit https://open.bocha.cn to get an API key"}'
exit 1
fi
# Validate count (1-50)
if ! [[ "$count" =~ ^[0-9]+$ ]] || [ "$count" -lt 1 ] || [ "$count" -gt 50 ]; then
count=10
fi
# Build JSON request body
request_body=$(cat <<EOF
{
"query": "$query",
"count": $count,
"freshness": "$freshness",
"summary": $summary
}
EOF
)
# Call Bocha API
response=$(curl -sS --max-time 30 \
-X POST \
-H "Authorization: Bearer $BOCHA_API_KEY" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-d "$request_body" \
"https://api.bocha.cn/v1/web-search" 2>&1)
curl_exit_code=$?
if [ $curl_exit_code -ne 0 ]; then
echo "{\"error\": \"Failed to call Bocha API\", \"details\": \"$response\"}"
exit 1
fi
# Simple JSON validation - check if response starts with { or [
if [[ ! "$response" =~ ^[[:space:]]*[\{\[] ]]; then
echo "{\"error\": \"Invalid JSON response from API\", \"response\": \"$response\"}"
exit 1
fi
# Extract API code using grep and sed (basic JSON parsing)
api_code=$(echo "$response" | grep -o '"code"[[:space:]]*:[[:space:]]*[0-9]*' | grep -o '[0-9]*' | head -1)
# If code extraction failed or code is not 200, check for error
if [ -n "$api_code" ] && [ "$api_code" != "200" ]; then
# Try to extract error message
api_msg=$(echo "$response" | grep -o '"msg"[[:space:]]*:[[:space:]]*"[^"]*"' | sed 's/"msg"[[:space:]]*:[[:space:]]*"\(.*\)"/\1/' | head -1)
if [ -z "$api_msg" ]; then
api_msg="Unknown error"
fi
echo "{\"error\": \"API returned error\", \"code\": $api_code, \"message\": \"$api_msg\"}"
exit 1
fi
# Return the full response
echo "$response"

View File

@@ -243,20 +243,87 @@ If you used `--examples`, delete any placeholder files that are not needed for t
##### Frontmatter
Write the YAML frontmatter with `name` and `description`:
Write the YAML frontmatter with `name`, `description`, and optional `metadata`:
- `name`: The skill name
- `description`: This is the primary triggering mechanism for your skill, and helps the agent understand when to use the skill.
- Include both what the Skill does and specific triggers/contexts for when to use it.
- Include all "when to use" information here - Not in the body. The body is only loaded after triggering, so "When to Use This Skill" sections in the body are not helpful to the agent.
- Example description for a `docx` skill: "Comprehensive document creation, editing, and analysis with support for tracked changes, comments, formatting preservation, and text extraction. Use when the agent needs to work with professional documents (.docx files) for: (1) Creating new documents, (2) Modifying or editing content, (3) Working with tracked changes, (4) Adding comments, or any other document tasks"
- `metadata`: (Optional) Specify requirements and configuration
- `requires.bins`: Required binaries (e.g., `["curl", "jq"]`)
- `requires.env`: Required environment variables (e.g., `["OPENAI_API_KEY"]`)
- `primaryEnv`: Primary environment variable name (for API keys)
- `always`: Set to `true` to always load regardless of requirements
- `emoji`: Skill icon (optional)
Do not include any other fields in YAML frontmatter.
**API Key Requirements**:
If your skill needs an API key, declare it in metadata:
```yaml
---
name: my-search
description: Search using MyAPI
metadata:
requires:
bins: ["curl"]
env: ["MYAPI_KEY"]
primaryEnv: "MYAPI_KEY"
---
```
**Auto-enable rule**: Skills are automatically enabled when required environment variables are set, and automatically disabled when missing. No manual configuration needed.
##### Body
Write instructions for using the skill and its bundled resources.
**If your skill requires an API key**, include setup instructions in the body:
```markdown
## Setup
This skill requires an API key from [Service Name].
1. Visit https://service.com to get an API key
2. Configure it using: `env_config(action="set", key="SERVICE_API_KEY", value="your-key")`
3. Or manually add to `~/cow/.env`: `SERVICE_API_KEY=your-key`
4. Restart the agent for changes to take effect
## Usage
...
```
The bash script should check for the key and provide helpful error messages:
```bash
#!/usr/bin/env bash
if [ -z "${SERVICE_API_KEY:-}" ]; then
echo "Error: SERVICE_API_KEY not set"
echo "Please configure your API key first (see SKILL.md)"
exit 1
fi
curl -H "Authorization: Bearer $SERVICE_API_KEY" ...
```
**Script Path Convention**:
When writing SKILL.md instructions, remember that:
- Skills are listed in `<available_skills>` with a `<base_dir>` path
- Scripts should be referenced as: `<base_dir>/scripts/script_name.sh`
- The AI will see the base_dir and can construct the full path
Example instruction in SKILL.md:
```markdown
## Usage
Scripts are in this skill's base directory (shown in skill listing).
bash "<base_dir>/scripts/my_script.sh" <args>
```
### Step 5: Validate (Optional)
Validate skill format:

View File

@@ -1,11 +1,12 @@
---
name: web-fetch
description: Fetch and extract readable content from web pages
description: Fetch and extract readable content from web pages. Use for lightweight page access without browser automation.
homepage: https://github.com/zhayujie/chatgpt-on-wechat
metadata:
emoji: 🌐
requires:
bins: ["curl"]
always: true
---
# Web Fetch
@@ -14,10 +15,16 @@ Fetch and extract readable content from web pages using curl and basic text proc
## Usage
Use the provided script to fetch a URL and extract its content:
**Important**: Scripts are located relative to this skill's base directory.
When you see this skill in `<available_skills>`, note the `<base_dir>` path.
```bash
bash scripts/fetch.sh <url> [output_file]
# General pattern:
bash "<base_dir>/scripts/fetch.sh" <url> [output_file]
# Example (replace <base_dir> with actual path from skill listing):
bash "~/chatgpt-on-wechat/skills/web-fetch/scripts/fetch.sh" "https://example.com"
```
**Parameters:**
@@ -31,18 +38,18 @@ bash scripts/fetch.sh <url> [output_file]
### Fetch a web page
```bash
bash scripts/fetch.sh "https://example.com"
bash "<base_dir>/scripts/fetch.sh" "https://example.com"
```
### Save to file
```bash
bash scripts/fetch.sh "https://example.com" output.txt
bash "<base_dir>/scripts/fetch.sh" "https://example.com" output.txt
cat output.txt
```
## Notes
- Uses curl for HTTP requests (timeout: 20s)
- Uses curl for HTTP requests (timeout: 10s)
- Extracts title and basic text content
- Removes HTML tags and scripts
- Works with any standard web page

View File

@@ -19,7 +19,7 @@ if [[ ! "$url" =~ ^https?:// ]]; then
fi
# Fetch the page with curl
html=$(curl -sS -L --max-time 20 \
html=$(curl -sS -L --max-time 10 \
-H "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" \
-H "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" \
"$url" 2>&1) || {