""" Agent Initializer - Handles agent initialization logic """ import os import asyncio import datetime import time from typing import Optional, List from agent.protocol import Agent from agent.tools import ToolManager from common.log import logger class AgentInitializer: """ Handles agent initialization including: - Workspace setup - Memory system initialization - Tool loading - System prompt building """ def __init__(self, bridge, agent_bridge): """ Initialize agent initializer Args: bridge: COW bridge instance agent_bridge: AgentBridge instance (for create_agent method) """ self.bridge = bridge self.agent_bridge = agent_bridge def initialize_agent(self, session_id: Optional[str] = None) -> Agent: """ Initialize agent for a session Args: session_id: Session ID (None for default agent) Returns: Initialized agent instance """ from config import conf # Get workspace from config workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow")) # Migrate API keys self._migrate_config_to_env(workspace_root) # Load environment variables self._load_env_file() # Initialize workspace from agent.prompt import ensure_workspace, load_context_files, PromptBuilder workspace_files = ensure_workspace(workspace_root, create_templates=True) if session_id is None: logger.info(f"[AgentInitializer] Workspace initialized at: {workspace_root}") # Setup memory system memory_manager, memory_tools = self._setup_memory_system(workspace_root, session_id) # Load tools tools = self._load_tools(workspace_root, memory_manager, memory_tools, session_id) # Initialize scheduler if needed self._initialize_scheduler(tools, session_id) # Load context files context_files = load_context_files(workspace_root) # Initialize skill manager skill_manager = self._initialize_skill_manager(workspace_root, session_id) # Check if first conversation from agent.prompt.workspace import is_first_conversation, mark_conversation_started is_first = is_first_conversation(workspace_root) # Build system prompt prompt_builder = PromptBuilder(workspace_dir=workspace_root, language="zh") runtime_info = self._get_runtime_info(workspace_root) system_prompt = prompt_builder.build( tools=tools, context_files=context_files, skill_manager=skill_manager, memory_manager=memory_manager, runtime_info=runtime_info, is_first_conversation=is_first ) if is_first: mark_conversation_started(workspace_root) # Get cost control parameters from config import conf max_steps = conf().get("agent_max_steps", 20) max_context_tokens = conf().get("agent_max_context_tokens", 50000) # Create agent agent = self.agent_bridge.create_agent( system_prompt=system_prompt, tools=tools, max_steps=max_steps, output_mode="logger", workspace_dir=workspace_root, skill_manager=skill_manager, enable_skills=True, max_context_tokens=max_context_tokens ) # Attach memory manager if memory_manager: agent.memory_manager = memory_manager return agent def _load_env_file(self): """Load environment variables from .env file""" env_file = os.path.expanduser("~/.cow/.env") if os.path.exists(env_file): try: from dotenv import load_dotenv load_dotenv(env_file, override=True) except ImportError: logger.warning("[AgentInitializer] python-dotenv not installed") except Exception as e: logger.warning(f"[AgentInitializer] Failed to load .env file: {e}") def _setup_memory_system(self, workspace_root: str, session_id: Optional[str] = None): """ Setup memory system Returns: (memory_manager, memory_tools) tuple """ memory_manager = None memory_tools = [] try: from agent.memory import MemoryManager, MemoryConfig, create_embedding_provider from agent.tools import MemorySearchTool, MemoryGetTool from config import conf # Get OpenAI config openai_api_key = conf().get("open_ai_api_key", "") openai_api_base = conf().get("open_ai_api_base", "") # Initialize embedding provider embedding_provider = None if openai_api_key and openai_api_key not in ["", "YOUR API KEY", "YOUR_API_KEY"]: try: embedding_provider = create_embedding_provider( provider="openai", model="text-embedding-3-small", api_key=openai_api_key, api_base=openai_api_base or "https://api.openai.com/v1" ) if session_id is None: logger.info("[AgentInitializer] OpenAI embedding initialized") except Exception as e: logger.warning(f"[AgentInitializer] OpenAI embedding failed: {e}") # Create memory manager memory_config = MemoryConfig(workspace_root=workspace_root) memory_manager = MemoryManager(memory_config, embedding_provider=embedding_provider) # Sync memory self._sync_memory(memory_manager, session_id) # Create memory tools memory_tools = [ MemorySearchTool(memory_manager), MemoryGetTool(memory_manager) ] if session_id is None: logger.info("[AgentInitializer] Memory system initialized") except Exception as e: logger.warning(f"[AgentInitializer] Memory system not available: {e}") return memory_manager, memory_tools def _sync_memory(self, memory_manager, session_id: Optional[str] = None): """Sync memory database""" try: loop = asyncio.get_event_loop() if loop.is_closed(): raise RuntimeError("Event loop is closed") except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: if loop.is_running(): asyncio.create_task(memory_manager.sync()) else: loop.run_until_complete(memory_manager.sync()) except Exception as e: logger.warning(f"[AgentInitializer] Memory sync failed: {e}") def _load_tools(self, workspace_root: str, memory_manager, memory_tools: List, session_id: Optional[str] = None): """Load all tools""" tool_manager = ToolManager() tool_manager.load_tools() tools = [] file_config = { "cwd": workspace_root, "memory_manager": memory_manager } if memory_manager else {"cwd": workspace_root} for tool_name in tool_manager.tool_classes.keys(): try: # Special handling for EnvConfig tool if tool_name == "env_config": from agent.tools import EnvConfig tool = EnvConfig({"agent_bridge": self.agent_bridge}) else: tool = tool_manager.create_tool(tool_name) if tool: # Apply workspace config to file operation tools if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']: tool.config = file_config tool.cwd = file_config.get("cwd", getattr(tool, 'cwd', None)) if 'memory_manager' in file_config: tool.memory_manager = file_config['memory_manager'] tools.append(tool) except Exception as e: logger.warning(f"[AgentInitializer] Failed to load tool {tool_name}: {e}") # Add memory tools if memory_tools: tools.extend(memory_tools) if session_id is None: logger.info(f"[AgentInitializer] Added {len(memory_tools)} memory tools") if session_id is None: logger.info(f"[AgentInitializer] Loaded {len(tools)} tools: {[t.name for t in tools]}") return tools def _initialize_scheduler(self, tools: List, session_id: Optional[str] = None): """Initialize scheduler service if needed""" if not self.agent_bridge.scheduler_initialized: try: from agent.tools.scheduler.integration import init_scheduler if init_scheduler(self.agent_bridge): self.agent_bridge.scheduler_initialized = True if session_id is None: logger.info("[AgentInitializer] Scheduler service initialized") except Exception as e: logger.warning(f"[AgentInitializer] Failed to initialize scheduler: {e}") # Inject scheduler dependencies if self.agent_bridge.scheduler_initialized: try: from agent.tools.scheduler.integration import get_task_store, get_scheduler_service from agent.tools import SchedulerTool from config import conf task_store = get_task_store() scheduler_service = get_scheduler_service() for tool in tools: if isinstance(tool, SchedulerTool): tool.task_store = task_store tool.scheduler_service = scheduler_service if not tool.config: tool.config = {} tool.config["channel_type"] = conf().get("channel_type", "unknown") except Exception as e: logger.warning(f"[AgentInitializer] Failed to inject scheduler dependencies: {e}") def _initialize_skill_manager(self, workspace_root: str, session_id: Optional[str] = None): """Initialize skill manager""" try: from agent.skills import SkillManager skill_manager = SkillManager(workspace_dir=workspace_root) return skill_manager except Exception as e: logger.warning(f"[AgentInitializer] Failed to initialize SkillManager: {e}") return None def _get_runtime_info(self, workspace_root: str): """Get runtime information""" from config import conf now = datetime.datetime.now() # Get timezone info try: offset = -time.timezone if not time.daylight else -time.altzone hours = offset // 3600 minutes = (offset % 3600) // 60 timezone_name = f"UTC{hours:+03d}:{minutes:02d}" if minutes else f"UTC{hours:+03d}" except Exception: timezone_name = "UTC" # Chinese weekday mapping weekday_map = { 'Monday': '星期一', 'Tuesday': '星期二', 'Wednesday': '星期三', 'Thursday': '星期四', 'Friday': '星期五', 'Saturday': '星期六', 'Sunday': '星期日' } weekday_zh = weekday_map.get(now.strftime("%A"), now.strftime("%A")) return { "model": conf().get("model", "unknown"), "workspace": workspace_root, "channel": conf().get("channel_type", "unknown"), "current_time": now.strftime("%Y-%m-%d %H:%M:%S"), "weekday": weekday_zh, "timezone": timezone_name } def _migrate_config_to_env(self, workspace_root: str): """Migrate API keys from config.json to .env file""" from config import conf key_mapping = { "open_ai_api_key": "OPENAI_API_KEY", "open_ai_api_base": "OPENAI_API_BASE", "gemini_api_key": "GEMINI_API_KEY", "claude_api_key": "CLAUDE_API_KEY", "linkai_api_key": "LINKAI_API_KEY", } env_file = os.path.expanduser("~/.cow/.env") # Read existing env vars existing_env_vars = {} if os.path.exists(env_file): try: with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, _ = line.split('=', 1) existing_env_vars[key.strip()] = True except Exception as e: logger.warning(f"[AgentInitializer] Failed to read .env file: {e}") # Check which keys need migration keys_to_migrate = {} for config_key, env_key in key_mapping.items(): if env_key in existing_env_vars: continue value = conf().get(config_key, "") if value and value.strip(): keys_to_migrate[env_key] = value.strip() # Write new keys if keys_to_migrate: try: env_dir = os.path.dirname(env_file) if not os.path.exists(env_dir): os.makedirs(env_dir, exist_ok=True) if not os.path.exists(env_file): open(env_file, 'a').close() with open(env_file, 'a', encoding='utf-8') as f: f.write('\n# Auto-migrated from config.json\n') for key, value in keys_to_migrate.items(): f.write(f'{key}={value}\n') os.environ[key] = value logger.info(f"[AgentInitializer] Migrated {len(keys_to_migrate)} API keys to .env: {list(keys_to_migrate.keys())}") except Exception as e: logger.warning(f"[AgentInitializer] Failed to migrate API keys: {e}")