Skip to main content
Glama
server.py14.9 kB
"""Main MCP server implementation. Lightweight discovery-only service that provides: - Routing guidance via capability-based task classification - On-demand agent discovery and registration - Agent availability listing """ from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.server.models import InitializationOptions from mcp.server.lowlevel import NotificationOptions from mcp.types import Tool, TextContent import asyncio import logging from pathlib import Path from typing import Any from .config import DelegationConfig, OrchestratorConfig from .orchestrator import OrchestratorRegistry from .delegation import DelegationEngine from .agent_discovery import AgentDiscovery logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DelegationMCPServer: """MCP server for multi-orchestrator delegation.""" def __init__( self, config_path: Path | None = None, enable_auto_discovery: bool = True, ): """Initialize lightweight MCP server. Args: config_path: Path to delegation rules config enable_auto_discovery: Enable automatic agent discovery on startup """ self.config_path = config_path or self._resolve_config_path() self.config = self._load_config() self.registry = OrchestratorRegistry() self.engine = DelegationEngine(self.config, self.registry) self.server = Server("delegation-mcp") # Agent discovery system for auto-detecting installed agents self.agent_discovery = AgentDiscovery() self.enable_auto_discovery = enable_auto_discovery self._setup_handlers() self._register_orchestrators() def _resolve_config_path(self) -> Path: """Resolve config path with priority: project → user → defaults. Priority matches CLAUDE.md behavior: 1. Project-level config (can override user-level) 2. User-level config (fallback for global installs) 3. Project path (triggers default config creation) """ # Check project-level config first (can override user-level) project_config = Path("config/delegation_rules.yaml") if project_config.exists(): return project_config # Fall back to user-level config user_config = Path.home() / ".delegation-mcp" / "config" / "delegation_rules.yaml" if user_config.exists(): return user_config # Return project path (will trigger default config creation) return project_config def _load_config(self) -> DelegationConfig: """Load configuration from file.""" if self.config_path.exists(): return DelegationConfig.from_yaml(self.config_path) return self._create_default_config() def _create_default_config(self) -> DelegationConfig: """Create default configuration.""" return DelegationConfig( orchestrator="claude", orchestrators={ "claude": OrchestratorConfig( name="claude", command="claude", args=["-p"], # Non-interactive mode enabled=True ), "gemini": OrchestratorConfig( name="gemini", command="gemini", args=[], # Gemini uses positional args by default enabled=True, ), "copilot": OrchestratorConfig( name="copilot", command="copilot", enabled=False ), "aider": OrchestratorConfig( name="aider", command="aider", args=["--yes", "--no-auto-commits"], # Auto-approve, no commits enabled=False ), }, ) async def _discover_and_register_agents(self) -> None: """Discover available agents and register them with the registry.""" if not self.enable_auto_discovery: logger.info("Agent auto-discovery disabled") return logger.info("Starting agent auto-discovery...") discovered = await self.agent_discovery.discover_agents() # Register discovered agents that aren't already in config for name, metadata in discovered.items(): if metadata.available and name not in self.config.orchestrators: # Create config from discovered metadata agent_config = OrchestratorConfig( name=name, command=metadata.command, enabled=True, timeout=300, ) self.config.orchestrators[name] = agent_config self.registry.register(agent_config) logger.info(f"Auto-registered discovered agent: {name} ({metadata.version})") # Report discovery summary summary = self.agent_discovery.get_discovery_summary() logger.info( f"Agent discovery complete: {summary['available']}/{summary['total_agents']} agents available" ) # Log unavailable agents with install instructions for agent in self.agent_discovery.get_unavailable_agents(): logger.info(f" {agent.name}: {agent.error_message}") def _register_orchestrators(self) -> None: """Register all orchestrators from config.""" for name, config in self.config.orchestrators.items(): self.registry.register(config) logger.info(f"Registered orchestrator: {name} (enabled={config.enabled})") # Validate availability availability = self.registry.validate_all() for name, available in availability.items(): if not available: logger.warning(f"Orchestrator '{name}' not available in PATH") def _setup_handlers(self) -> None: """Setup MCP server handlers with on-demand tool loading.""" @self.server.list_tools() async def list_tools() -> list[Tool]: """List available lightweight tools for routing guidance and discovery. Tools: - get_routing_guidance: Returns which agent should handle a task (no execution) - discover_agents: Discover and register available CLI agents - list_agents: List registered agents and their availability """ tools = [ Tool( name="get_routing_guidance", description="Get routing guidance for a task - returns which agent should handle it and the exact CLI command to run (guidance only, no execution)", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The task query to get routing guidance for", }, }, "required": ["query"], }, ), Tool( name="discover_agents", description="Discover and register available CLI agents on the system", inputSchema={ "type": "object", "properties": { "force_refresh": { "type": "boolean", "description": "Force re-discovery even if cache exists", "default": False, }, }, }, ), Tool( name="list_agents", description="List all registered agents and their availability status", inputSchema={ "type": "object", "properties": {}, }, ), ] logger.info(f"Listed {len(tools)} lightweight tools") return tools @self.server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle lightweight tool calls for routing guidance and discovery.""" try: if name == "get_routing_guidance": # Get routing guidance without executing the task query = arguments["query"] # Classify the task to determine routing task_info = self.engine._classify_task(query) task_type = task_info[0] if isinstance(task_info, tuple) else task_info timeout = task_info[1] if isinstance(task_info, tuple) and len(task_info) > 1 else 300 # Determine which agent should handle it agent, _ = self.engine._determine_delegation(query, None) # KISS: Just return the agent name # - "gemini" / "aider" / "copilot" → delegate to that agent # - "claude" → orchestrator handles directly (since Claude is the orchestrator) response = agent if agent else "claude" return [TextContent(type="text", text=response)] elif name == "discover_agents": # Discover available agents force_refresh = arguments.get("force_refresh", False) discovered = await self.agent_discovery.discover_agents(force_refresh=force_refresh) # Register newly discovered agents registered_count = 0 for agent_name, metadata in discovered.items(): if metadata.available and agent_name not in self.config.orchestrators: agent_config = OrchestratorConfig( name=agent_name, command=metadata.command, enabled=True, timeout=300, ) self.config.orchestrators[agent_name] = agent_config self.registry.register(agent_config) registered_count += 1 logger.info(f"Registered new agent: {agent_name}") # Build response summary = self.agent_discovery.get_discovery_summary() text = f"Agent Discovery Results:\n\n" text += f"Total agents scanned: {summary['total_agents']}\n" text += f"Available: {summary['available']}\n" text += f"Unavailable: {summary['unavailable']}\n" text += f"Newly registered: {registered_count}\n\n" if summary['available_agents']: text += "Available Agents:\n" for agent in summary['available_agents']: text += f" ✓ {agent['name']}: {agent['version']}\n" text += f" Path: {agent['path']}\n" if summary['unavailable_agents']: text += "\nUnavailable Agents:\n" for agent in summary['unavailable_agents']: text += f" ✗ {agent['name']}\n" text += f" {agent['error']}\n" return [TextContent(type="text", text=text)] elif name == "list_agents": # List registered agents and their availability enabled = self.registry.list_enabled() all_agents = list(self.registry.orchestrators.keys()) availability = self.registry.validate_all() text = "Registered Agents:\n\n" for agent_name in all_agents: config = self.registry.get(agent_name) status = "✓ Enabled" if agent_name in enabled else "✗ Disabled" avail = "Available" if availability.get(agent_name) else "Not found in PATH" text += f"{agent_name}:\n" text += f" Status: {status}\n" text += f" Availability: {avail}\n" text += f" Command: {config.command}\n" if config.args: text += f" Args: {' '.join(config.args)}\n" text += "\n" return [TextContent(type="text", text=text)] else: logger.error(f"Unknown tool: {name}") return [TextContent(type="text", text=f"Error: Unknown tool '{name}'")] except Exception as e: logger.error(f"Tool call failed: {name} - {e}", exc_info=True) return [TextContent(type="text", text=f"Error: {str(e)}")] async def run(self) -> None: """Run the lightweight MCP server.""" logger.info("Starting delegation MCP server (lightweight mode)") logger.info("- Mode: Routing guidance only (no execution)") logger.info(f"- Agent auto-discovery: {'ON' if self.enable_auto_discovery else 'OFF'}") logger.info("- Tools: get_routing_guidance, discover_agents, list_agents") # Discover and register available agents if self.enable_auto_discovery: await self._discover_and_register_agents() try: async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="delegation-mcp", server_version="0.4.0", # Updated version for lightweight architecture capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={ "agent_discovery": {}, "routing_guidance": {}, }, ), ), ) finally: logger.info("Server stopped") def main(): """Main entry point.""" import sys config_path = Path(sys.argv[1]) if len(sys.argv) > 1 else None server = DelegationMCPServer(config_path) try: asyncio.run(server.run()) except KeyboardInterrupt: logger.info("Server stopped") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/carlosduplar/multi-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server