discover_agents
Find and register available CLI agents on your system to enable intelligent task delegation through specialized tools.
Instructions
Discover and register available CLI agents on the system
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| force_refresh | No | Force re-discovery even if cache exists |
Implementation Reference
- src/delegation_mcp/server.py:234-275 (handler)MCP tool handler for 'discover_agents': invokes AgentDiscovery, auto-registers new agents, formats and returns discovery summary as text.
elif name == "discover_agents": # Discover available agents force_refresh = arguments.get("force_refresh", False) discovered = await self.agent_discovery.discover_agents(force_refresh=force_refresh) # Register newly discovered agents registered_count = 0 for agent_name, metadata in discovered.items(): if metadata.available and agent_name not in self.config.orchestrators: agent_config = OrchestratorConfig( name=agent_name, command=metadata.command, enabled=True, timeout=300, ) self.config.orchestrators[agent_name] = agent_config self.registry.register(agent_config) registered_count += 1 logger.info(f"Registered new agent: {agent_name}") # Build response summary = self.agent_discovery.get_discovery_summary() text = f"Agent Discovery Results:\n\n" text += f"Total agents scanned: {summary['total_agents']}\n" text += f"Available: {summary['available']}\n" text += f"Unavailable: {summary['unavailable']}\n" text += f"Newly registered: {registered_count}\n\n" if summary['available_agents']: text += "Available Agents:\n" for agent in summary['available_agents']: text += f" ✓ {agent['name']}: {agent['version']}\n" text += f" Path: {agent['path']}\n" if summary['unavailable_agents']: text += "\nUnavailable Agents:\n" for agent in summary['unavailable_agents']: text += f" ✗ {agent['name']}\n" text += f" {agent['error']}\n" return [TextContent(type="text", text=text)] - src/delegation_mcp/server.py:184-197 (registration)Tool registration in list_tools() handler, defining name, description, and input schema for 'discover_agents'.
Tool( name="discover_agents", description="Discover and register available CLI agents on the system", inputSchema={ "type": "object", "properties": { "force_refresh": { "type": "boolean", "description": "Force re-discovery even if cache exists", "default": False, }, }, }, ), - src/delegation_mcp/server.py:184-197 (schema)Input schema definition for 'discover_agents' tool: optional boolean force_refresh.
Tool( name="discover_agents", description="Discover and register available CLI agents on the system", inputSchema={ "type": "object", "properties": { "force_refresh": { "type": "boolean", "description": "Force re-discovery even if cache exists", "default": False, }, }, }, ), - Core implementation of agent discovery logic (AgentDiscovery.discover_agents method), performing parallel discovery, verification, caching; invoked by MCP tool handler.
async def discover_agents( self, force_refresh: bool = False, agents_to_check: list[str] | None = None, ) -> dict[str, AgentMetadata]: """Discover available agents on the system. Args: force_refresh: Force re-discovery even if cache exists agents_to_check: Specific agents to check (default: all known agents) Returns: Dictionary of agent name to metadata """ if not force_refresh and self._discovered_agents: logger.info("Using cached agent discovery results") return self._discovered_agents logger.info("Starting agent discovery...") # Determine which agents to check agents = agents_to_check or list(self.KNOWN_AGENTS.keys()) # Use a semaphore to limit concurrency on all platforms to avoid resource spikes # On Windows this is critical, on others it's just good practice concurrency_limit = 5 if platform.system() == "Windows" else 10 semaphore = asyncio.Semaphore(concurrency_limit) async def _bounded_discover(name: str, config: dict[str, Any]) -> AgentMetadata | Exception: async with semaphore: try: return await self._discover_single_agent(name, config) except Exception as e: logger.error(f"Discovery task failed for {name}: {e}") return e logger.debug(f"Running agent discovery in parallel (limit={concurrency_limit})") tasks = [] for name in agents: if name not in self.KNOWN_AGENTS: logger.warning(f"Unknown agent: {name}") continue config = self.KNOWN_AGENTS[name] tasks.append(_bounded_discover(name, config)) results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for result in results: if isinstance(result, AgentMetadata): self._discovered_agents[result.name] = result elif isinstance(result, Exception): # Already logged in _bounded_discover pass # Save to cache self._save_cache() logger.info( f"Discovery complete: {sum(1 for a in self._discovered_agents.values() if a.available)}/{len(self._discovered_agents)} agents available" ) return self._discovered_agents - Dataclass defining AgentMetadata structure used in discovery results.
@dataclass class AgentMetadata: """Metadata for a discovered agent.""" name: str command: str | list[str] version: str | None = None available: bool = False path: str | None = None error_message: str | None = None capabilities: list[str] | None = None verified_at: str | None = None