diff --git a/agent/memory/embedding.py b/agent/memory/embedding.py
index 509370b..db182f1 100644
--- a/agent/memory/embedding.py
+++ b/agent/memory/embedding.py
@@ -45,8 +45,9 @@ class OpenAIEmbeddingProvider(EmbeddingProvider):
self.api_key = api_key
self.api_base = api_base or "https://api.openai.com/v1"
- if not self.api_key:
- raise ValueError("OpenAI API key is required")
+ # Validate API key
+ if not self.api_key or self.api_key in ["", "YOUR API KEY", "YOUR_API_KEY"]:
+ raise ValueError("OpenAI API key is not configured. Please set 'open_ai_api_key' in config.json")
# Set dimensions based on model
self._dimensions = 1536 if "small" in model else 3072
@@ -65,9 +66,21 @@ class OpenAIEmbeddingProvider(EmbeddingProvider):
"model": self.model
}
- response = requests.post(url, headers=headers, json=data, timeout=30)
- response.raise_for_status()
- return response.json()
+ try:
+ response = requests.post(url, headers=headers, json=data, timeout=5)
+ response.raise_for_status()
+ return response.json()
+ except requests.exceptions.ConnectionError as e:
+ raise ConnectionError(f"Failed to connect to OpenAI API at {url}. Please check your network connection and api_base configuration. Error: {str(e)}")
+ except requests.exceptions.Timeout as e:
+ raise TimeoutError(f"OpenAI API request timed out after 10s. Please check your network connection. Error: {str(e)}")
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 401:
+ raise ValueError(f"Invalid OpenAI API key. Please check your 'open_ai_api_key' in config.json")
+ elif e.response.status_code == 429:
+ raise ValueError(f"OpenAI API rate limit exceeded. Please try again later.")
+ else:
+ raise ValueError(f"OpenAI API request failed: {e.response.status_code} - {e.response.text}")
def embed(self, text: str) -> List[float]:
"""Generate embedding for text"""
diff --git a/agent/prompt/builder.py b/agent/prompt/builder.py
index c9ae35b..c48989a 100644
--- a/agent/prompt/builder.py
+++ b/agent/prompt/builder.py
@@ -279,11 +279,16 @@ def _build_skills_section(skill_manager: Any, tools: Optional[List[Any]], langua
# 添加技能列表(通过skill_manager获取)
try:
skills_prompt = skill_manager.build_skills_prompt()
+ logger.debug(f"[PromptBuilder] Skills prompt length: {len(skills_prompt) if skills_prompt else 0}")
if skills_prompt:
lines.append(skills_prompt.strip())
lines.append("")
+ else:
+ logger.warning("[PromptBuilder] No skills prompt generated - skills_prompt is empty")
except Exception as e:
logger.warning(f"Failed to build skills prompt: {e}")
+ import traceback
+ logger.debug(f"Skills prompt error traceback: {traceback.format_exc()}")
return lines
@@ -404,15 +409,17 @@ def _build_workspace_section(workspace_dir: str, language: str, is_first_convers
"这是你的第一次对话!进行以下流程:",
"",
"1. **表达初次启动的感觉** - 像是第一次睁开眼看到世界,带着好奇和期待",
- "2. **简短打招呼后,询问核心问题**:",
+ "2. **简短介绍能力**:一行说明你能帮助解答问题、管理计算机、创造技能,且拥有长期记忆能不断成长",
+ "3. **询问核心问题**:",
" - 你希望给我起个什么名字?",
" - 我该怎么称呼你?",
- " - 你希望我们是什么样的交流风格?(需要举例,如:专业严谨、轻松幽默、温暖友好等)",
- "3. **语言风格**:温暖但不过度诗意,带点科技感,保持清晰",
- "4. **问题格式**:用分点或换行,让问题清晰易读",
+ " - 你希望我们是什么样的交流风格?(一行列举选项:如专业严谨、轻松幽默、温暖友好、简洁高效等)",
+ "4. **风格要求**:温暖自然、简洁清晰,整体控制在 100 字以内",
"5. 收到回复后,用 `write` 工具保存到 USER.md 和 SOUL.md",
"",
- "**注意事项**:",
+ "**重要提醒**:",
+ "- SOUL.md 和 USER.md 已经在系统提示词中加载,无需再次读取",
+ "- 能力介绍和交流风格选项都只要一行,保持精简",
"- 不要问太多其他信息(职业、时区等可以后续自然了解)",
"",
])
diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py
index 2b1c883..9b3f1a6 100644
--- a/agent/protocol/agent_stream.py
+++ b/agent/protocol/agent_stream.py
@@ -248,9 +248,14 @@ class AgentStreamExecutor:
# Log tool calls with arguments
tool_calls_str = []
for tc in tool_calls:
- args_str = ', '.join([f"{k}={v}" for k, v in tc['arguments'].items()])
- if args_str:
- tool_calls_str.append(f"{tc['name']}({args_str})")
+ # Safely handle None or missing arguments
+ args = tc.get('arguments') or {}
+ if isinstance(args, dict):
+ args_str = ', '.join([f"{k}={v}" for k, v in args.items()])
+ if args_str:
+ tool_calls_str.append(f"{tc['name']}({args_str})")
+ else:
+ tool_calls_str.append(tc['name'])
else:
tool_calls_str.append(tc['name'])
logger.info(f"🔧 {', '.join(tool_calls_str)}")
@@ -511,13 +516,13 @@ class AgentStreamExecutor:
stop_reason = finish_reason
# Handle text content
- if "content" in delta and delta["content"]:
- content_delta = delta["content"]
+ content_delta = delta.get("content") or ""
+ if content_delta:
full_content += content_delta
self._emit_event("message_update", {"delta": content_delta})
# Handle tool calls
- if "tool_calls" in delta:
+ if "tool_calls" in delta and delta["tool_calls"]:
for tc_delta in delta["tool_calls"]:
index = tc_delta.get("index", 0)
@@ -577,7 +582,10 @@ class AgentStreamExecutor:
"抱歉,之前的对话出现了问题。我已清空历史记录,请重新发送你的消息。"
)
- # Check if error is retryable (timeout, connection, rate limit, server busy, etc.)
+ # Check if error is rate limit (429)
+ is_rate_limit = '429' in error_str_lower or 'rate limit' in error_str_lower
+
+ # Check if error is retryable (timeout, connection, server busy, etc.)
is_retryable = any(keyword in error_str_lower for keyword in [
'timeout', 'timed out', 'connection', 'network',
'rate limit', 'overloaded', 'unavailable', 'busy', 'retry',
@@ -585,7 +593,12 @@ class AgentStreamExecutor:
])
if is_retryable and retry_count < max_retries:
- wait_time = (retry_count + 1) * 2 # Exponential backoff: 2s, 4s, 6s
+ # Rate limit needs longer wait time
+ if is_rate_limit:
+ wait_time = 30 + (retry_count * 15) # 30s, 45s, 60s for rate limit
+ else:
+ wait_time = (retry_count + 1) * 2 # 2s, 4s, 6s for other errors
+
logger.warning(f"⚠️ LLM API error (attempt {retry_count + 1}/{max_retries}): {e}")
logger.info(f"Retrying in {wait_time}s...")
time.sleep(wait_time)
@@ -606,11 +619,15 @@ class AgentStreamExecutor:
for idx in sorted(tool_calls_buffer.keys()):
tc = tool_calls_buffer[idx]
try:
- arguments = json.loads(tc["arguments"]) if tc["arguments"] else {}
+ # Safely get arguments, handle None case
+ args_str = tc.get("arguments") or ""
+ arguments = json.loads(args_str) if args_str else {}
except json.JSONDecodeError as e:
- args_preview = tc['arguments'][:200] if len(tc['arguments']) > 200 else tc['arguments']
+ # Handle None or invalid arguments safely
+ args_str = tc.get('arguments') or ""
+ args_preview = args_str[:200] if len(args_str) > 200 else args_str
logger.error(f"Failed to parse tool arguments for {tc['name']}")
- logger.error(f"Arguments length: {len(tc['arguments'])} chars")
+ logger.error(f"Arguments length: {len(args_str)} chars")
logger.error(f"Arguments preview: {args_preview}...")
logger.error(f"JSON decode error: {e}")
@@ -661,9 +678,9 @@ class AgentStreamExecutor:
for tc in tool_calls:
assistant_msg["content"].append({
"type": "tool_use",
- "id": tc["id"],
- "name": tc["name"],
- "input": tc["arguments"]
+ "id": tc.get("id", ""),
+ "name": tc.get("name", ""),
+ "input": tc.get("arguments", {})
})
# Only append if content is not empty
diff --git a/agent/skills/loader.py b/agent/skills/loader.py
index cc77d32..db4fd03 100644
--- a/agent/skills/loader.py
+++ b/agent/skills/loader.py
@@ -137,6 +137,18 @@ class SkillLoader:
name = frontmatter.get('name', parent_dir_name)
description = frontmatter.get('description', '')
+ # Normalize name (handle both string and list)
+ if isinstance(name, list):
+ name = name[0] if name else parent_dir_name
+ elif not isinstance(name, str):
+ name = str(name) if name else parent_dir_name
+
+ # Normalize description (handle both string and list)
+ if isinstance(description, list):
+ description = ' '.join(str(d) for d in description if d)
+ elif not isinstance(description, str):
+ description = str(description) if description else ''
+
# Special handling for linkai-agent: dynamically load apps from config.json
if name == 'linkai-agent':
description = self._load_linkai_agent_description(skill_dir, description)
diff --git a/agent/skills/manager.py b/agent/skills/manager.py
index bf9593f..adb4b01 100644
--- a/agent/skills/manager.py
+++ b/agent/skills/manager.py
@@ -103,7 +103,21 @@ class SkillManager:
# Apply skill filter
if skill_filter is not None:
- normalized = [name.strip() for name in skill_filter if name.strip()]
+ # Flatten and normalize skill names (handle both strings and nested lists)
+ normalized = []
+ for item in skill_filter:
+ if isinstance(item, str):
+ name = item.strip()
+ if name:
+ normalized.append(name)
+ elif isinstance(item, list):
+ # Handle nested lists
+ for subitem in item:
+ if isinstance(subitem, str):
+ name = subitem.strip()
+ if name:
+ normalized.append(name)
+
if normalized:
entries = [e for e in entries if e.skill.name in normalized]
@@ -123,8 +137,15 @@ class SkillManager:
:param skill_filter: Optional list of skill names to include
:return: Formatted skills prompt
"""
+ from common.log import logger
entries = self.filter_skills(skill_filter=skill_filter, include_disabled=False)
- return format_skill_entries_for_prompt(entries)
+ logger.debug(f"[SkillManager] Filtered {len(entries)} skills for prompt (total: {len(self.skills)})")
+ if entries:
+ skill_names = [e.skill.name for e in entries]
+ logger.debug(f"[SkillManager] Skills to include: {skill_names}")
+ result = format_skill_entries_for_prompt(entries)
+ logger.debug(f"[SkillManager] Generated prompt length: {len(result)}")
+ return result
def build_skill_snapshot(
self,
diff --git a/app.py b/app.py
index ad0cfca..022fdd7 100644
--- a/app.py
+++ b/app.py
@@ -59,23 +59,6 @@ def run():
os.environ["WECHATY_LOG"] = "warn"
start_channel(channel_name)
-
- # 打印系统运行成功信息
- logger.info("")
- logger.info("=" * 50)
- if conf().get("agent", False):
- logger.info("✅ System started successfully!")
- logger.info("🐮 Cow Agent is running")
- logger.info(f" Channel: {channel_name}")
- logger.info(f" Model: {conf().get('model', 'unknown')}")
- logger.info(f" Workspace: {conf().get('agent_workspace', '~/cow')}")
- else:
- logger.info("✅ System started successfully!")
- logger.info("🤖 ChatBot is running")
- logger.info(f" Channel: {channel_name}")
- logger.info(f" Model: {conf().get('model', 'unknown')}")
- logger.info("=" * 50)
- logger.info("")
while True:
time.sleep(1)
diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py
index 43aefc7..17b653b 100644
--- a/bridge/agent_bridge.py
+++ b/bridge/agent_bridge.py
@@ -10,6 +10,8 @@ from models.openai_compatible_bot import OpenAICompatibleBot
from bridge.bridge import Bridge
from bridge.context import Context
from bridge.reply import Reply, ReplyType
+from bridge.agent_event_handler import AgentEventHandler
+from bridge.agent_initializer import AgentInitializer
from common import const
from common.log import logger
@@ -127,9 +129,6 @@ class AgentLLMModel(LLMModel):
try:
if hasattr(self.bot, 'call_with_tools'):
# Use tool-enabled streaming call if available
- # Ensure max_tokens is an integer, use default if None
- max_tokens = request.max_tokens if request.max_tokens is not None else 4096
-
# Extract system prompt if present
system_prompt = getattr(request, 'system', None)
@@ -138,10 +137,13 @@ class AgentLLMModel(LLMModel):
'messages': request.messages,
'tools': getattr(request, 'tools', None),
'stream': True,
- 'max_tokens': max_tokens,
'model': self.model # Pass model parameter
}
+ # Only pass max_tokens if explicitly set, let the bot use its default
+ if request.max_tokens is not None:
+ kwargs['max_tokens'] = request.max_tokens
+
# Add system prompt if present
if system_prompt:
kwargs['system'] = system_prompt
@@ -182,6 +184,9 @@ class AgentBridge:
self.default_agent = None # For backward compatibility (no session_id)
self.agent: Optional[Agent] = None
self.scheduler_initialized = False
+
+ # Create helper instances
+ self.initializer = AgentInitializer(bridge, self)
def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent:
"""
Create the super agent with COW integration
@@ -252,492 +257,19 @@ class AgentBridge:
# Check if agent exists for this session
if session_id not in self.agents:
- logger.info(f"[AgentBridge] Creating new agent for session: {session_id}")
self._init_agent_for_session(session_id)
return self.agents[session_id]
def _init_default_agent(self):
- """Initialize default super agent with new prompt system"""
- from config import conf
- import os
-
- # Get workspace from config
- workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
-
- # Migrate API keys from config.json to environment variables (if not already set)
- self._migrate_config_to_env(workspace_root)
-
- # Load environment variables from secure .env file location
- env_file = os.path.expanduser("~/.cow/.env")
- if os.path.exists(env_file):
- try:
- from dotenv import load_dotenv
- load_dotenv(env_file, override=True)
- logger.info(f"[AgentBridge] Loaded environment variables from {env_file}")
- except ImportError:
- logger.warning("[AgentBridge] python-dotenv not installed, skipping .env file loading")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to load .env file: {e}")
-
- # Initialize workspace and create template files
- from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
-
- workspace_files = ensure_workspace(workspace_root, create_templates=True)
- logger.info(f"[AgentBridge] Workspace initialized at: {workspace_root}")
-
- # Setup memory system
- memory_manager = None
- memory_tools = []
-
- try:
- # Try to initialize memory system
- from agent.memory import MemoryManager, MemoryConfig
- from agent.tools import MemorySearchTool, MemoryGetTool
-
- # 从 config.json 读取 OpenAI 配置
- openai_api_key = conf().get("open_ai_api_key", "")
- openai_api_base = conf().get("open_ai_api_base", "")
-
- # 尝试初始化 OpenAI embedding provider
- embedding_provider = None
- if openai_api_key:
- try:
- from agent.memory import create_embedding_provider
- embedding_provider = create_embedding_provider(
- provider="openai",
- model="text-embedding-3-small",
- api_key=openai_api_key,
- api_base=openai_api_base or "https://api.openai.com/v1"
- )
- logger.info(f"[AgentBridge] OpenAI embedding initialized")
- except Exception as embed_error:
- logger.warning(f"[AgentBridge] OpenAI embedding failed: {embed_error}")
- logger.info(f"[AgentBridge] Using keyword-only search")
- else:
- logger.info(f"[AgentBridge] No OpenAI API key, using keyword-only search")
-
- # 创建 memory config
- memory_config = MemoryConfig(workspace_root=workspace_root)
-
- # 创建 memory manager
- memory_manager = MemoryManager(memory_config, embedding_provider=embedding_provider)
-
- # 初始化时执行一次 sync,确保数据库有数据
- import asyncio
- try:
- # 尝试在当前事件循环中执行
- loop = asyncio.get_event_loop()
- if loop.is_running():
- # 如果事件循环正在运行,创建任务
- asyncio.create_task(memory_manager.sync())
- logger.info("[AgentBridge] Memory sync scheduled")
- else:
- # 如果没有运行的循环,直接执行
- loop.run_until_complete(memory_manager.sync())
- logger.info("[AgentBridge] Memory synced successfully")
- except RuntimeError:
- # 没有事件循环,创建新的
- asyncio.run(memory_manager.sync())
- logger.info("[AgentBridge] Memory synced successfully")
- except Exception as e:
- logger.warning(f"[AgentBridge] Memory sync failed: {e}")
-
- # Create memory tools
- memory_tools = [
- MemorySearchTool(memory_manager),
- MemoryGetTool(memory_manager)
- ]
-
- logger.info(f"[AgentBridge] Memory system initialized")
-
- except Exception as e:
- logger.warning(f"[AgentBridge] Memory system not available: {e}")
- logger.info("[AgentBridge] Continuing without memory features")
-
- # Use ToolManager to dynamically load all available tools
- from agent.tools import ToolManager
- tool_manager = ToolManager()
- tool_manager.load_tools()
-
- # Create tool instances for all available tools
- tools = []
- file_config = {
- "cwd": workspace_root,
- "memory_manager": memory_manager
- } if memory_manager else {"cwd": workspace_root}
-
- for tool_name in tool_manager.tool_classes.keys():
- try:
- # Special handling for EnvConfig tool - pass agent_bridge reference
- if tool_name == "env_config":
- from agent.tools import EnvConfig
- tool = EnvConfig({
- "agent_bridge": self # Pass self reference for hot reload
- })
- else:
- tool = tool_manager.create_tool(tool_name)
-
- if tool:
- # Apply workspace config to file operation tools
- if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
- tool.config = file_config
- tool.cwd = file_config.get("cwd", tool.cwd if hasattr(tool, 'cwd') else None)
- if 'memory_manager' in file_config:
- tool.memory_manager = file_config['memory_manager']
- tools.append(tool)
- logger.debug(f"[AgentBridge] Loaded tool: {tool_name}")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to load tool {tool_name}: {e}")
-
- # Add memory tools
- if memory_tools:
- tools.extend(memory_tools)
- logger.info(f"[AgentBridge] Added {len(memory_tools)} memory tools")
-
- # Initialize scheduler service (once)
- if not self.scheduler_initialized:
- try:
- from agent.tools.scheduler.integration import init_scheduler
- if init_scheduler(self):
- self.scheduler_initialized = True
- logger.info("[AgentBridge] Scheduler service initialized")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to initialize scheduler: {e}")
-
- # Inject scheduler dependencies into SchedulerTool instances
- if self.scheduler_initialized:
- try:
- from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
- from agent.tools import SchedulerTool
-
- task_store = get_task_store()
- scheduler_service = get_scheduler_service()
-
- for tool in tools:
- if isinstance(tool, SchedulerTool):
- tool.task_store = task_store
- tool.scheduler_service = scheduler_service
- if not tool.config:
- tool.config = {}
- tool.config["channel_type"] = conf().get("channel_type", "unknown")
- logger.debug("[AgentBridge] Injected scheduler dependencies into SchedulerTool")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies: {e}")
-
- logger.info(f"[AgentBridge] Loaded {len(tools)} tools: {[t.name for t in tools]}")
-
- # Load context files (SOUL.md, USER.md, etc.)
- context_files = load_context_files(workspace_root)
- logger.info(f"[AgentBridge] Loaded {len(context_files)} context files: {[f.path for f in context_files]}")
-
- # Check if this is the first conversation
- from agent.prompt.workspace import is_first_conversation, mark_conversation_started
- is_first = is_first_conversation(workspace_root)
- if is_first:
- logger.info("[AgentBridge] First conversation detected")
-
- # Build system prompt using new prompt builder
- prompt_builder = PromptBuilder(
- workspace_dir=workspace_root,
- language="zh"
- )
-
- # Get runtime info
- runtime_info = {
- "model": conf().get("model", "unknown"),
- "workspace": workspace_root,
- "channel": conf().get("channel_type", "unknown") # Get from config
- }
-
- system_prompt = prompt_builder.build(
- tools=tools,
- context_files=context_files,
- memory_manager=memory_manager,
- runtime_info=runtime_info,
- is_first_conversation=is_first
- )
-
- # Mark conversation as started (will be saved after first user message)
- if is_first:
- mark_conversation_started(workspace_root)
-
- logger.info("[AgentBridge] System prompt built successfully")
-
- # Get cost control parameters from config
- max_steps = conf().get("agent_max_steps", 20)
- max_context_tokens = conf().get("agent_max_context_tokens", 50000)
-
- # Create agent with configured tools and workspace
- agent = self.create_agent(
- system_prompt=system_prompt,
- tools=tools,
- max_steps=max_steps,
- output_mode="logger",
- workspace_dir=workspace_root, # Pass workspace to agent for skills loading
- enable_skills=True, # Enable skills auto-loading
- max_context_tokens=max_context_tokens
- )
-
- # Attach memory manager to agent if available
- if memory_manager:
- agent.memory_manager = memory_manager
- logger.info(f"[AgentBridge] Memory manager attached to agent")
-
- # Store as default agent
+ """Initialize default super agent"""
+ agent = self.initializer.initialize_agent(session_id=None)
self.default_agent = agent
def _init_agent_for_session(self, session_id: str):
- """
- Initialize agent for a specific session
- Reuses the same configuration as default agent
- """
- from config import conf
- import os
-
- # Get workspace from config
- workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
-
- # Migrate API keys from config.json to environment variables (if not already set)
- self._migrate_config_to_env(workspace_root)
-
- # Load environment variables from secure .env file location
- env_file = os.path.expanduser("~/.cow/.env")
- if os.path.exists(env_file):
- try:
- from dotenv import load_dotenv
- load_dotenv(env_file, override=True)
- logger.debug(f"[AgentBridge] Loaded environment variables from {env_file} for session {session_id}")
- except ImportError:
- logger.warning(f"[AgentBridge] python-dotenv not installed, skipping .env file loading for session {session_id}")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to load .env file for session {session_id}: {e}")
-
- # Migrate API keys from config.json to environment variables (if not already set)
- self._migrate_config_to_env(workspace_root)
-
- # Initialize workspace
- from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
-
- workspace_files = ensure_workspace(workspace_root, create_templates=True)
-
- # Setup memory system
- memory_manager = None
- memory_tools = []
-
- try:
- from agent.memory import MemoryManager, MemoryConfig, create_embedding_provider
- from agent.tools import MemorySearchTool, MemoryGetTool
-
- # 从 config.json 读取 OpenAI 配置
- openai_api_key = conf().get("open_ai_api_key", "")
- openai_api_base = conf().get("open_ai_api_base", "")
-
- # 尝试初始化 OpenAI embedding provider
- embedding_provider = None
- if openai_api_key:
- try:
- embedding_provider = create_embedding_provider(
- provider="openai",
- model="text-embedding-3-small",
- api_key=openai_api_key,
- api_base=openai_api_base or "https://api.openai.com/v1"
- )
- logger.debug(f"[AgentBridge] OpenAI embedding initialized for session {session_id}")
- except Exception as embed_error:
- logger.warning(f"[AgentBridge] OpenAI embedding failed for session {session_id}: {embed_error}")
- logger.info(f"[AgentBridge] Using keyword-only search for session {session_id}")
- else:
- logger.debug(f"[AgentBridge] No OpenAI API key, using keyword-only search for session {session_id}")
-
- # 创建 memory config
- memory_config = MemoryConfig(workspace_root=workspace_root)
-
- # 创建 memory manager
- memory_manager = MemoryManager(memory_config, embedding_provider=embedding_provider)
-
- # 初始化时执行一次 sync,确保数据库有数据
- import asyncio
- try:
- # 尝试在当前事件循环中执行
- loop = asyncio.get_event_loop()
- if loop.is_running():
- # 如果事件循环正在运行,创建任务
- asyncio.create_task(memory_manager.sync())
- logger.debug(f"[AgentBridge] Memory sync scheduled for session {session_id}")
- else:
- # 如果没有运行的循环,直接执行
- loop.run_until_complete(memory_manager.sync())
- logger.debug(f"[AgentBridge] Memory synced successfully for session {session_id}")
- except RuntimeError:
- # 没有事件循环,创建新的
- asyncio.run(memory_manager.sync())
- logger.debug(f"[AgentBridge] Memory synced successfully for session {session_id}")
- except Exception as sync_error:
- logger.warning(f"[AgentBridge] Memory sync failed for session {session_id}: {sync_error}")
-
- memory_tools = [
- MemorySearchTool(memory_manager),
- MemoryGetTool(memory_manager)
- ]
-
- except Exception as e:
- logger.warning(f"[AgentBridge] Memory system not available for session {session_id}: {e}")
- import traceback
- logger.warning(f"[AgentBridge] Memory init traceback: {traceback.format_exc()}")
-
- # Load tools
- from agent.tools import ToolManager
- tool_manager = ToolManager()
- tool_manager.load_tools()
-
- tools = []
- file_config = {
- "cwd": workspace_root,
- "memory_manager": memory_manager
- } if memory_manager else {"cwd": workspace_root}
-
- for tool_name in tool_manager.tool_classes.keys():
- try:
- tool = tool_manager.create_tool(tool_name)
- if tool:
- if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
- tool.config = file_config
- tool.cwd = file_config.get("cwd", tool.cwd if hasattr(tool, 'cwd') else None)
- if 'memory_manager' in file_config:
- tool.memory_manager = file_config['memory_manager']
- tools.append(tool)
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to load tool {tool_name} for session {session_id}: {e}")
-
- if memory_tools:
- tools.extend(memory_tools)
-
- # Initialize scheduler service (once, if not already initialized)
- if not self.scheduler_initialized:
- try:
- from agent.tools.scheduler.integration import init_scheduler
- if init_scheduler(self):
- self.scheduler_initialized = True
- logger.debug(f"[AgentBridge] Scheduler service initialized for session {session_id}")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to initialize scheduler for session {session_id}: {e}")
-
- # Inject scheduler dependencies into SchedulerTool instances
- if self.scheduler_initialized:
- try:
- from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
- from agent.tools import SchedulerTool
-
- task_store = get_task_store()
- scheduler_service = get_scheduler_service()
-
- for tool in tools:
- if isinstance(tool, SchedulerTool):
- tool.task_store = task_store
- tool.scheduler_service = scheduler_service
- if not tool.config:
- tool.config = {}
- tool.config["channel_type"] = conf().get("channel_type", "unknown")
- logger.debug(f"[AgentBridge] Injected scheduler dependencies for session {session_id}")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies for session {session_id}: {e}")
-
- # Load context files
- context_files = load_context_files(workspace_root)
-
- # Initialize skill manager
- skill_manager = None
- try:
- from agent.skills import SkillManager
- skill_manager = SkillManager(workspace_dir=workspace_root)
- logger.debug(f"[AgentBridge] Initialized SkillManager with {len(skill_manager.skills)} skills for session {session_id}")
- except Exception as e:
- logger.warning(f"[AgentBridge] Failed to initialize SkillManager for session {session_id}: {e}")
-
- # Check if this is the first conversation
- from agent.prompt.workspace import is_first_conversation, mark_conversation_started
- is_first = is_first_conversation(workspace_root)
-
- # Build system prompt
- prompt_builder = PromptBuilder(
- workspace_dir=workspace_root,
- language="zh"
- )
-
- # Get current time and timezone info
- import datetime
- import time
-
- now = datetime.datetime.now()
-
- # Get timezone info
- try:
- offset = -time.timezone if not time.daylight else -time.altzone
- hours = offset // 3600
- minutes = (offset % 3600) // 60
- if minutes:
- timezone_name = f"UTC{hours:+03d}:{minutes:02d}"
- else:
- timezone_name = f"UTC{hours:+03d}"
- except Exception:
- timezone_name = "UTC"
-
- # Chinese weekday mapping
- weekday_map = {
- 'Monday': '星期一',
- 'Tuesday': '星期二',
- 'Wednesday': '星期三',
- 'Thursday': '星期四',
- 'Friday': '星期五',
- 'Saturday': '星期六',
- 'Sunday': '星期日'
- }
- weekday_zh = weekday_map.get(now.strftime("%A"), now.strftime("%A"))
-
- runtime_info = {
- "model": conf().get("model", "unknown"),
- "workspace": workspace_root,
- "channel": conf().get("channel_type", "unknown"),
- "current_time": now.strftime("%Y-%m-%d %H:%M:%S"),
- "weekday": weekday_zh,
- "timezone": timezone_name
- }
-
- system_prompt = prompt_builder.build(
- tools=tools,
- context_files=context_files,
- skill_manager=skill_manager,
- memory_manager=memory_manager,
- runtime_info=runtime_info,
- is_first_conversation=is_first
- )
-
- if is_first:
- mark_conversation_started(workspace_root)
-
- # Get cost control parameters from config
- max_steps = conf().get("agent_max_steps", 20)
- max_context_tokens = conf().get("agent_max_context_tokens", 50000)
-
- # Create agent for this session
- agent = self.create_agent(
- system_prompt=system_prompt,
- tools=tools,
- max_steps=max_steps,
- output_mode="logger",
- workspace_dir=workspace_root,
- skill_manager=skill_manager,
- enable_skills=True,
- max_context_tokens=max_context_tokens
- )
-
- if memory_manager:
- agent.memory_manager = memory_manager
-
- # Store agent for this session
+ """Initialize agent for a specific session"""
+ agent = self.initializer.initialize_agent(session_id=session_id)
self.agents[session_id] = agent
- logger.info(f"[AgentBridge] Agent created for session: {session_id}")
def agent_reply(self, query: str, context: Context = None,
on_event=None, clear_history: bool = False) -> Reply:
@@ -764,6 +296,9 @@ class AgentBridge:
if not agent:
return Reply(ReplyType.ERROR, "Failed to initialize super agent")
+ # Create event handler for logging and channel communication
+ event_handler = AgentEventHandler(context=context, original_callback=on_event)
+
# Filter tools based on context
original_tools = agent.tools
filtered_tools = original_tools
@@ -786,16 +321,19 @@ class AgentBridge:
break
try:
- # Use agent's run_stream method
+ # Use agent's run_stream method with event handler
response = agent.run_stream(
user_message=query,
- on_event=on_event,
+ on_event=event_handler.handle_event,
clear_history=clear_history
)
finally:
# Restore original tools
if context and context.get("is_scheduled_task"):
agent.tools = original_tools
+
+ # Log execution summary
+ event_handler.log_summary()
# Check if there are files to send (from read tool)
if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
@@ -843,17 +381,18 @@ class AgentBridge:
reply.text_content = text_response # Store accompanying text
return reply
- # For documents (PDF, Excel, Word, PPT), use FILE type
- if file_type == "document":
+ # For all file types (document, video, audio), use FILE type
+ if file_type in ["document", "video", "audio"]:
file_url = f"file://{file_path}"
- logger.info(f"[AgentBridge] Sending document: {file_url}")
+ logger.info(f"[AgentBridge] Sending {file_type}: {file_url}")
reply = Reply(ReplyType.FILE, file_url)
reply.file_name = file_info.get("file_name", os.path.basename(file_path))
+ # Attach text message if present
+ if text_response:
+ reply.text_content = text_response
return reply
- # For other files (video, audio), we need channel-specific handling
- # For now, return text with file info
- # TODO: Implement video/audio sending when channel supports it
+ # For other unknown file types, return text with file info
message = text_response or file_info.get("message", "文件已准备")
message += f"\n\n[文件: {file_info.get('file_name', file_path)}]"
return Reply(ReplyType.TEXT, message)
diff --git a/bridge/agent_event_handler.py b/bridge/agent_event_handler.py
new file mode 100644
index 0000000..17a0920
--- /dev/null
+++ b/bridge/agent_event_handler.py
@@ -0,0 +1,115 @@
+"""
+Agent Event Handler - Handles agent events and thinking process output
+"""
+
+from common.log import logger
+
+
+class AgentEventHandler:
+ """
+ Handles agent events and optionally sends intermediate messages to channel
+ """
+
+ def __init__(self, context=None, original_callback=None):
+ """
+ Initialize event handler
+
+ Args:
+ context: COW context (for accessing channel)
+ original_callback: Original event callback to chain
+ """
+ self.context = context
+ self.original_callback = original_callback
+
+ # Get channel for sending intermediate messages
+ self.channel = None
+ if context:
+ self.channel = context.kwargs.get("channel") if hasattr(context, "kwargs") else None
+
+ # Track current thinking for channel output
+ self.current_thinking = ""
+ self.turn_number = 0
+
+ def handle_event(self, event):
+ """
+ Main event handler
+
+ Args:
+ event: Event dict with type and data
+ """
+ event_type = event.get("type")
+ data = event.get("data", {})
+
+ # Dispatch to specific handlers
+ if event_type == "turn_start":
+ self._handle_turn_start(data)
+ elif event_type == "message_update":
+ self._handle_message_update(data)
+ elif event_type == "message_end":
+ self._handle_message_end(data)
+ elif event_type == "tool_execution_start":
+ self._handle_tool_execution_start(data)
+ elif event_type == "tool_execution_end":
+ self._handle_tool_execution_end(data)
+
+ # Call original callback if provided
+ if self.original_callback:
+ self.original_callback(event)
+
+ def _handle_turn_start(self, data):
+ """Handle turn start event"""
+ self.turn_number = data.get("turn", 0)
+ self.has_tool_calls_in_turn = False
+ self.current_thinking = ""
+
+ def _handle_message_update(self, data):
+ """Handle message update event (streaming text)"""
+ delta = data.get("delta", "")
+ self.current_thinking += delta
+
+ def _handle_message_end(self, data):
+ """Handle message end event"""
+ tool_calls = data.get("tool_calls", [])
+
+ # Only send thinking process if followed by tool calls
+ if tool_calls:
+ if self.current_thinking.strip():
+ logger.debug(f"💭 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}")
+ # Send thinking process to channel
+ self._send_to_channel(f"{self.current_thinking.strip()}")
+ else:
+ # No tool calls = final response (logged at agent_stream level)
+ if self.current_thinking.strip():
+ logger.debug(f"💬 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}")
+
+ self.current_thinking = ""
+
+ def _handle_tool_execution_start(self, data):
+ """Handle tool execution start event - logged by agent_stream.py"""
+ pass
+
+ def _handle_tool_execution_end(self, data):
+ """Handle tool execution end event - logged by agent_stream.py"""
+ pass
+
+ def _send_to_channel(self, message):
+ """
+ Try to send message to channel
+
+ Args:
+ message: Message to send
+ """
+ if self.channel:
+ try:
+ from bridge.reply import Reply, ReplyType
+ # Create a Reply object for the message
+ reply = Reply(ReplyType.TEXT, message)
+ self.channel._send(reply, self.context)
+ except Exception as e:
+ logger.debug(f"[AgentEventHandler] Failed to send to channel: {e}")
+
+ def log_summary(self):
+ """Log execution summary - simplified"""
+ # Summary removed as per user request
+ # Real-time logging during execution is sufficient
+ pass
diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py
new file mode 100644
index 0000000..27739b5
--- /dev/null
+++ b/bridge/agent_initializer.py
@@ -0,0 +1,375 @@
+"""
+Agent Initializer - Handles agent initialization logic
+"""
+
+import os
+import asyncio
+import datetime
+import time
+from typing import Optional, List
+
+from agent.protocol import Agent
+from agent.tools import ToolManager
+from common.log import logger
+
+
+class AgentInitializer:
+ """
+ Handles agent initialization including:
+ - Workspace setup
+ - Memory system initialization
+ - Tool loading
+ - System prompt building
+ """
+
+ def __init__(self, bridge, agent_bridge):
+ """
+ Initialize agent initializer
+
+ Args:
+ bridge: COW bridge instance
+ agent_bridge: AgentBridge instance (for create_agent method)
+ """
+ self.bridge = bridge
+ self.agent_bridge = agent_bridge
+
+ def initialize_agent(self, session_id: Optional[str] = None) -> Agent:
+ """
+ Initialize agent for a session
+
+ Args:
+ session_id: Session ID (None for default agent)
+
+ Returns:
+ Initialized agent instance
+ """
+ from config import conf
+
+ # Get workspace from config
+ workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
+
+ # Migrate API keys
+ self._migrate_config_to_env(workspace_root)
+
+ # Load environment variables
+ self._load_env_file()
+
+ # Initialize workspace
+ from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
+ workspace_files = ensure_workspace(workspace_root, create_templates=True)
+
+ if session_id is None:
+ logger.info(f"[AgentInitializer] Workspace initialized at: {workspace_root}")
+
+ # Setup memory system
+ memory_manager, memory_tools = self._setup_memory_system(workspace_root, session_id)
+
+ # Load tools
+ tools = self._load_tools(workspace_root, memory_manager, memory_tools, session_id)
+
+ # Initialize scheduler if needed
+ self._initialize_scheduler(tools, session_id)
+
+ # Load context files
+ context_files = load_context_files(workspace_root)
+
+ # Initialize skill manager
+ skill_manager = self._initialize_skill_manager(workspace_root, session_id)
+
+ # Check if first conversation
+ from agent.prompt.workspace import is_first_conversation, mark_conversation_started
+ is_first = is_first_conversation(workspace_root)
+
+ # Build system prompt
+ prompt_builder = PromptBuilder(workspace_dir=workspace_root, language="zh")
+ runtime_info = self._get_runtime_info(workspace_root)
+
+ system_prompt = prompt_builder.build(
+ tools=tools,
+ context_files=context_files,
+ skill_manager=skill_manager,
+ memory_manager=memory_manager,
+ runtime_info=runtime_info,
+ is_first_conversation=is_first
+ )
+
+ if is_first:
+ mark_conversation_started(workspace_root)
+
+ # Get cost control parameters
+ from config import conf
+ max_steps = conf().get("agent_max_steps", 20)
+ max_context_tokens = conf().get("agent_max_context_tokens", 50000)
+
+ # Create agent
+ agent = self.agent_bridge.create_agent(
+ system_prompt=system_prompt,
+ tools=tools,
+ max_steps=max_steps,
+ output_mode="logger",
+ workspace_dir=workspace_root,
+ skill_manager=skill_manager,
+ enable_skills=True,
+ max_context_tokens=max_context_tokens
+ )
+
+ # Attach memory manager
+ if memory_manager:
+ agent.memory_manager = memory_manager
+
+ return agent
+
+ def _load_env_file(self):
+ """Load environment variables from .env file"""
+ env_file = os.path.expanduser("~/.cow/.env")
+ if os.path.exists(env_file):
+ try:
+ from dotenv import load_dotenv
+ load_dotenv(env_file, override=True)
+ except ImportError:
+ logger.warning("[AgentInitializer] python-dotenv not installed")
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Failed to load .env file: {e}")
+
+ def _setup_memory_system(self, workspace_root: str, session_id: Optional[str] = None):
+ """
+ Setup memory system
+
+ Returns:
+ (memory_manager, memory_tools) tuple
+ """
+ memory_manager = None
+ memory_tools = []
+
+ try:
+ from agent.memory import MemoryManager, MemoryConfig, create_embedding_provider
+ from agent.tools import MemorySearchTool, MemoryGetTool
+ from config import conf
+
+ # Get OpenAI config
+ openai_api_key = conf().get("open_ai_api_key", "")
+ openai_api_base = conf().get("open_ai_api_base", "")
+
+ # Initialize embedding provider
+ embedding_provider = None
+ if openai_api_key and openai_api_key not in ["", "YOUR API KEY", "YOUR_API_KEY"]:
+ try:
+ embedding_provider = create_embedding_provider(
+ provider="openai",
+ model="text-embedding-3-small",
+ api_key=openai_api_key,
+ api_base=openai_api_base or "https://api.openai.com/v1"
+ )
+ if session_id is None:
+ logger.info("[AgentInitializer] OpenAI embedding initialized")
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] OpenAI embedding failed: {e}")
+
+ # Create memory manager
+ memory_config = MemoryConfig(workspace_root=workspace_root)
+ memory_manager = MemoryManager(memory_config, embedding_provider=embedding_provider)
+
+ # Sync memory
+ self._sync_memory(memory_manager, session_id)
+
+ # Create memory tools
+ memory_tools = [
+ MemorySearchTool(memory_manager),
+ MemoryGetTool(memory_manager)
+ ]
+
+ if session_id is None:
+ logger.info("[AgentInitializer] Memory system initialized")
+
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Memory system not available: {e}")
+
+ return memory_manager, memory_tools
+
+ def _sync_memory(self, memory_manager, session_id: Optional[str] = None):
+ """Sync memory database"""
+ try:
+ loop = asyncio.get_event_loop()
+ if loop.is_closed():
+ raise RuntimeError("Event loop is closed")
+ except RuntimeError:
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ try:
+ if loop.is_running():
+ asyncio.create_task(memory_manager.sync())
+ else:
+ loop.run_until_complete(memory_manager.sync())
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Memory sync failed: {e}")
+
+ def _load_tools(self, workspace_root: str, memory_manager, memory_tools: List, session_id: Optional[str] = None):
+ """Load all tools"""
+ tool_manager = ToolManager()
+ tool_manager.load_tools()
+
+ tools = []
+ file_config = {
+ "cwd": workspace_root,
+ "memory_manager": memory_manager
+ } if memory_manager else {"cwd": workspace_root}
+
+ for tool_name in tool_manager.tool_classes.keys():
+ try:
+ # Special handling for EnvConfig tool
+ if tool_name == "env_config":
+ from agent.tools import EnvConfig
+ tool = EnvConfig({"agent_bridge": self.agent_bridge})
+ else:
+ tool = tool_manager.create_tool(tool_name)
+
+ if tool:
+ # Apply workspace config to file operation tools
+ if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
+ tool.config = file_config
+ tool.cwd = file_config.get("cwd", getattr(tool, 'cwd', None))
+ if 'memory_manager' in file_config:
+ tool.memory_manager = file_config['memory_manager']
+ tools.append(tool)
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Failed to load tool {tool_name}: {e}")
+
+ # Add memory tools
+ if memory_tools:
+ tools.extend(memory_tools)
+ if session_id is None:
+ logger.info(f"[AgentInitializer] Added {len(memory_tools)} memory tools")
+
+ if session_id is None:
+ logger.info(f"[AgentInitializer] Loaded {len(tools)} tools: {[t.name for t in tools]}")
+
+ return tools
+
+ def _initialize_scheduler(self, tools: List, session_id: Optional[str] = None):
+ """Initialize scheduler service if needed"""
+ if not self.agent_bridge.scheduler_initialized:
+ try:
+ from agent.tools.scheduler.integration import init_scheduler
+ if init_scheduler(self.agent_bridge):
+ self.agent_bridge.scheduler_initialized = True
+ if session_id is None:
+ logger.info("[AgentInitializer] Scheduler service initialized")
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Failed to initialize scheduler: {e}")
+
+ # Inject scheduler dependencies
+ if self.agent_bridge.scheduler_initialized:
+ try:
+ from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
+ from agent.tools import SchedulerTool
+ from config import conf
+
+ task_store = get_task_store()
+ scheduler_service = get_scheduler_service()
+
+ for tool in tools:
+ if isinstance(tool, SchedulerTool):
+ tool.task_store = task_store
+ tool.scheduler_service = scheduler_service
+ if not tool.config:
+ tool.config = {}
+ tool.config["channel_type"] = conf().get("channel_type", "unknown")
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Failed to inject scheduler dependencies: {e}")
+
+ def _initialize_skill_manager(self, workspace_root: str, session_id: Optional[str] = None):
+ """Initialize skill manager"""
+ try:
+ from agent.skills import SkillManager
+ skill_manager = SkillManager(workspace_dir=workspace_root)
+ return skill_manager
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Failed to initialize SkillManager: {e}")
+ return None
+
+ def _get_runtime_info(self, workspace_root: str):
+ """Get runtime information"""
+ from config import conf
+
+ now = datetime.datetime.now()
+
+ # Get timezone info
+ try:
+ offset = -time.timezone if not time.daylight else -time.altzone
+ hours = offset // 3600
+ minutes = (offset % 3600) // 60
+ timezone_name = f"UTC{hours:+03d}:{minutes:02d}" if minutes else f"UTC{hours:+03d}"
+ except Exception:
+ timezone_name = "UTC"
+
+ # Chinese weekday mapping
+ weekday_map = {
+ 'Monday': '星期一', 'Tuesday': '星期二', 'Wednesday': '星期三',
+ 'Thursday': '星期四', 'Friday': '星期五', 'Saturday': '星期六', 'Sunday': '星期日'
+ }
+ weekday_zh = weekday_map.get(now.strftime("%A"), now.strftime("%A"))
+
+ return {
+ "model": conf().get("model", "unknown"),
+ "workspace": workspace_root,
+ "channel": conf().get("channel_type", "unknown"),
+ "current_time": now.strftime("%Y-%m-%d %H:%M:%S"),
+ "weekday": weekday_zh,
+ "timezone": timezone_name
+ }
+
+ def _migrate_config_to_env(self, workspace_root: str):
+ """Migrate API keys from config.json to .env file"""
+ from config import conf
+
+ key_mapping = {
+ "open_ai_api_key": "OPENAI_API_KEY",
+ "open_ai_api_base": "OPENAI_API_BASE",
+ "gemini_api_key": "GEMINI_API_KEY",
+ "claude_api_key": "CLAUDE_API_KEY",
+ "linkai_api_key": "LINKAI_API_KEY",
+ }
+
+ env_file = os.path.expanduser("~/.cow/.env")
+
+ # Read existing env vars
+ existing_env_vars = {}
+ if os.path.exists(env_file):
+ try:
+ with open(env_file, 'r', encoding='utf-8') as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith('#') and '=' in line:
+ key, _ = line.split('=', 1)
+ existing_env_vars[key.strip()] = True
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Failed to read .env file: {e}")
+
+ # Check which keys need migration
+ keys_to_migrate = {}
+ for config_key, env_key in key_mapping.items():
+ if env_key in existing_env_vars:
+ continue
+ value = conf().get(config_key, "")
+ if value and value.strip():
+ keys_to_migrate[env_key] = value.strip()
+
+ # Write new keys
+ if keys_to_migrate:
+ try:
+ env_dir = os.path.dirname(env_file)
+ if not os.path.exists(env_dir):
+ os.makedirs(env_dir, exist_ok=True)
+ if not os.path.exists(env_file):
+ open(env_file, 'a').close()
+
+ with open(env_file, 'a', encoding='utf-8') as f:
+ f.write('\n# Auto-migrated from config.json\n')
+ for key, value in keys_to_migrate.items():
+ f.write(f'{key}={value}\n')
+ os.environ[key] = value
+
+ logger.info(f"[AgentInitializer] Migrated {len(keys_to_migrate)} API keys to .env: {list(keys_to_migrate.keys())}")
+ except Exception as e:
+ logger.warning(f"[AgentInitializer] Failed to migrate API keys: {e}")
diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py
index 0197c93..6bf04ae 100644
--- a/channel/feishu/feishu_channel.py
+++ b/channel/feishu/feishu_channel.py
@@ -251,6 +251,14 @@ class FeiShuChanel(ChatChannel):
msg_type = "image"
content_key = "image_key"
elif reply.type == ReplyType.FILE:
+ # 如果有附加的文本内容,先发送文本
+ if hasattr(reply, 'text_content') and reply.text_content:
+ logger.info(f"[FeiShu] Sending text before file: {reply.text_content[:50]}...")
+ text_reply = Reply(ReplyType.TEXT, reply.text_content)
+ self._send(text_reply, context)
+ import time
+ time.sleep(0.3) # 短暂延迟,确保文本先到达
+
# 判断是否为视频文件
file_path = reply.content
if file_path.startswith("file://"):
@@ -259,20 +267,18 @@ class FeiShuChanel(ChatChannel):
is_video = file_path.lower().endswith(('.mp4', '.avi', '.mov', '.wmv', '.flv'))
if is_video:
- # 视频使用 media 类型,需要上传并获取 file_key 和 duration
- video_info = self._upload_video_url(reply.content, access_token)
- if not video_info or not video_info.get('file_key'):
+ # 视频上传(包含duration信息)
+ upload_data = self._upload_video_url(reply.content, access_token)
+ if not upload_data or not upload_data.get('file_key'):
logger.warning("[FeiShu] upload video failed")
return
- # media 类型需要特殊的 content 格式
+ # 视频使用 media 类型(根据官方文档)
+ # 错误码 230055 说明:上传 mp4 时必须使用 msg_type="media"
msg_type = "media"
- # 注意:media 类型的 content 不使用 content_key,而是完整的 JSON 对象
- reply_content = {
- "file_key": video_info['file_key'],
- "duration": video_info.get('duration', 0) # 视频时长(毫秒)
- }
- content_key = None # media 类型不使用单一的 key
+ reply_content = upload_data # 完整的上传响应数据(包含file_key和duration)
+ logger.info(f"[FeiShu] Sending video: file_key={upload_data.get('file_key')}, duration={upload_data.get('duration')}ms")
+ content_key = None # 直接序列化整个对象
else:
# 其他文件使用 file 类型
file_key = self._upload_file_url(reply.content, access_token)
@@ -286,12 +292,16 @@ class FeiShuChanel(ChatChannel):
# Check if we can reply to an existing message (need msg_id)
can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id
+ # Build content JSON
+ content_json = json.dumps(reply_content) if content_key is None else json.dumps({content_key: reply_content})
+ logger.debug(f"[FeiShu] Sending message: msg_type={msg_type}, content={content_json[:200]}")
+
if can_reply:
# 群聊中回复已有消息
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.msg_id}/reply"
data = {
"msg_type": msg_type,
- "content": json.dumps(reply_content) if content_key is None else json.dumps({content_key: reply_content})
+ "content": content_json
}
res = requests.post(url=url, headers=headers, json=data, timeout=(5, 10))
else:
@@ -301,7 +311,7 @@ class FeiShuChanel(ChatChannel):
data = {
"receive_id": context.get("receiver"),
"msg_type": msg_type,
- "content": json.dumps(reply_content) if content_key is None else json.dumps({content_key: reply_content})
+ "content": content_json
}
res = requests.post(url=url, headers=headers, params=params, json=data, timeout=(5, 10))
res = res.json()
@@ -471,9 +481,18 @@ class FeiShuChanel(ChatChannel):
file_type = file_type_map.get(file_ext, 'mp4')
upload_url = "https://open.feishu.cn/open-apis/im/v1/files"
- data = {'file_type': file_type, 'file_name': file_name}
+ data = {
+ 'file_type': file_type,
+ 'file_name': file_name
+ }
+ # Add duration only if available (required for video/audio)
+ if duration:
+ data['duration'] = duration # Must be int, not string
+
headers = {'Authorization': f'Bearer {access_token}'}
+ logger.info(f"[FeiShu] Uploading video: file_name={file_name}, duration={duration}ms")
+
with open(local_path, "rb") as file:
upload_response = requests.post(
upload_url,
@@ -486,11 +505,11 @@ class FeiShuChanel(ChatChannel):
response_data = upload_response.json()
if response_data.get("code") == 0:
- file_key = response_data.get("data").get("file_key")
- return {
- 'file_key': file_key,
- 'duration': duration
- }
+ # Add duration to the response data (API doesn't return it)
+ upload_data = response_data.get("data")
+ upload_data['duration'] = duration # Add our calculated duration
+ logger.info(f"[FeiShu] Upload complete: file_key={upload_data.get('file_key')}, duration={duration}ms")
+ return upload_data
else:
logger.error(f"[FeiShu] upload video failed: {response_data}")
return None
diff --git a/channel/web/chat.html b/channel/web/chat.html
index 50e81fa..3134f3d 100644
--- a/channel/web/chat.html
+++ b/channel/web/chat.html
@@ -762,7 +762,7 @@
-
@@ -771,21 +771,21 @@
我可以回答问题、提供信息或者帮助您完成各种任务
+我可以回答问题、提供信息或者帮助您完成各种任务