Skip to main content
Glama

discover_agents

Find and register available CLI agents on your system to enable intelligent task delegation through specialized tools.

Instructions

Discover and register available CLI agents on the system

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
force_refreshNoForce re-discovery even if cache exists

Implementation Reference

  • MCP tool handler for 'discover_agents': invokes AgentDiscovery, auto-registers new agents, formats and returns discovery summary as text.
    elif name == "discover_agents": # Discover available agents force_refresh = arguments.get("force_refresh", False) discovered = await self.agent_discovery.discover_agents(force_refresh=force_refresh) # Register newly discovered agents registered_count = 0 for agent_name, metadata in discovered.items(): if metadata.available and agent_name not in self.config.orchestrators: agent_config = OrchestratorConfig( name=agent_name, command=metadata.command, enabled=True, timeout=300, ) self.config.orchestrators[agent_name] = agent_config self.registry.register(agent_config) registered_count += 1 logger.info(f"Registered new agent: {agent_name}") # Build response summary = self.agent_discovery.get_discovery_summary() text = f"Agent Discovery Results:\n\n" text += f"Total agents scanned: {summary['total_agents']}\n" text += f"Available: {summary['available']}\n" text += f"Unavailable: {summary['unavailable']}\n" text += f"Newly registered: {registered_count}\n\n" if summary['available_agents']: text += "Available Agents:\n" for agent in summary['available_agents']: text += f" ✓ {agent['name']}: {agent['version']}\n" text += f" Path: {agent['path']}\n" if summary['unavailable_agents']: text += "\nUnavailable Agents:\n" for agent in summary['unavailable_agents']: text += f" ✗ {agent['name']}\n" text += f" {agent['error']}\n" return [TextContent(type="text", text=text)]
  • Tool registration in list_tools() handler, defining name, description, and input schema for 'discover_agents'.
    Tool( name="discover_agents", description="Discover and register available CLI agents on the system", inputSchema={ "type": "object", "properties": { "force_refresh": { "type": "boolean", "description": "Force re-discovery even if cache exists", "default": False, }, }, }, ),
  • Input schema definition for 'discover_agents' tool: optional boolean force_refresh.
    Tool( name="discover_agents", description="Discover and register available CLI agents on the system", inputSchema={ "type": "object", "properties": { "force_refresh": { "type": "boolean", "description": "Force re-discovery even if cache exists", "default": False, }, }, }, ),
  • Core implementation of agent discovery logic (AgentDiscovery.discover_agents method), performing parallel discovery, verification, caching; invoked by MCP tool handler.
    async def discover_agents( self, force_refresh: bool = False, agents_to_check: list[str] | None = None, ) -> dict[str, AgentMetadata]: """Discover available agents on the system. Args: force_refresh: Force re-discovery even if cache exists agents_to_check: Specific agents to check (default: all known agents) Returns: Dictionary of agent name to metadata """ if not force_refresh and self._discovered_agents: logger.info("Using cached agent discovery results") return self._discovered_agents logger.info("Starting agent discovery...") # Determine which agents to check agents = agents_to_check or list(self.KNOWN_AGENTS.keys()) # Use a semaphore to limit concurrency on all platforms to avoid resource spikes # On Windows this is critical, on others it's just good practice concurrency_limit = 5 if platform.system() == "Windows" else 10 semaphore = asyncio.Semaphore(concurrency_limit) async def _bounded_discover(name: str, config: dict[str, Any]) -> AgentMetadata | Exception: async with semaphore: try: return await self._discover_single_agent(name, config) except Exception as e: logger.error(f"Discovery task failed for {name}: {e}") return e logger.debug(f"Running agent discovery in parallel (limit={concurrency_limit})") tasks = [] for name in agents: if name not in self.KNOWN_AGENTS: logger.warning(f"Unknown agent: {name}") continue config = self.KNOWN_AGENTS[name] tasks.append(_bounded_discover(name, config)) results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for result in results: if isinstance(result, AgentMetadata): self._discovered_agents[result.name] = result elif isinstance(result, Exception): # Already logged in _bounded_discover pass # Save to cache self._save_cache() logger.info( f"Discovery complete: {sum(1 for a in self._discovered_agents.values() if a.available)}/{len(self._discovered_agents)} agents available" ) return self._discovered_agents
  • Dataclass defining AgentMetadata structure used in discovery results.
    @dataclass class AgentMetadata: """Metadata for a discovered agent.""" name: str command: str | list[str] version: str | None = None available: bool = False path: str | None = None error_message: str | None = None capabilities: list[str] | None = None verified_at: str | None = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/carlosduplar/multi-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server