Skip to main content
Glama
mcp_handlers.py36.2 kB
# # MCP Foxxy Bridge - MCP Server Management Handlers # # Copyright (C) 2024 Billy Bryant # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # """MCP server management command handlers.""" import argparse import json import logging import os from datetime import UTC, datetime from pathlib import Path from typing import Any import aiohttp import yaml from rich.console import Console from rich.prompt import Confirm from rich.table import Table from mcp_foxxy_bridge.cli.formatters import ConfigFormatter from mcp_foxxy_bridge.config.config_loader import load_bridge_config_from_file from mcp_foxxy_bridge.oauth.utils import _validate_server_name from .config import _load_config_safe, _save_config async def handle_mcp_command( args: argparse.Namespace, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Handle MCP server management commands.""" # Check if no subcommand was provided (this shouldn't happen with Click) if not hasattr(args, "mcp_command") or args.mcp_command is None: console.print("[yellow]Usage: foxxy-bridge mcp <command>[/yellow]") console.print("Available commands: add, remove, list, show, status, restart") return if args.mcp_command == "add": await handle_mcp_add(args, config_path, config_dir, console, logger) elif args.mcp_command == "remove": await handle_mcp_remove(args, config_path, config_dir, console, logger) elif args.mcp_command == "list": await handle_mcp_list(args, config_path, config_dir, console, logger) elif args.mcp_command == "show": await handle_mcp_show(args, config_path, config_dir, console, logger) elif args.mcp_command == "status": await handle_mcp_status(args, config_path, config_dir, console, logger) elif args.mcp_command == "restart": await handle_mcp_restart(args, config_path, config_dir, console, logger) else: console.print(f"[red]Unknown mcp command: {args.mcp_command}[/red]") async def _try_get_servers_from_api(config_path: Path, logger: logging.Logger) -> dict[str, Any] | None: """Try to get server configurations from running bridge API. Returns: Server configurations dict if API is available, None otherwise """ try: # Load bridge config to get port config = load_bridge_config_from_file(str(config_path), dict(os.environ)) if config is None or config.bridge is None: return None bridge_port = config.bridge.port # Try to connect to bridge API configured_host = config.bridge.host if config.bridge else "127.0.0.1" # If server binds to 0.0.0.0 (all interfaces), connect via localhost bridge_host = "127.0.0.1" if configured_host == "0.0.0.0" else configured_host # noqa: S104 timeout = aiohttp.ClientTimeout(total=3) # Quick timeout for API check async with aiohttp.ClientSession(timeout=timeout) as session: # Get server configurations from bridge API url = f"http://{bridge_host}:{bridge_port}/sse/servers" async with session.get(url) as response: if response.status == 200: data = await response.json() servers = data.get("servers", []) # Load config to get actual command/URL details config = load_bridge_config_from_file(str(config_path), dict(os.environ)) config_servers = config.servers if config else {} # Convert server list to config format, merging API status with config details server_configs = {} for server in servers: name = server.get("name", "unknown") transport_type = server.get("transport", "stdio") # Get config details for this server server_config = config_servers.get(name) server_configs[name] = { "transport": transport_type, "enabled": server.get("status") != "disabled", "tags": server.get("tags", []), } # Add actual command/URL from config if server_config: if transport_type in ("sse", "http", "streamablehttp"): server_configs[name]["url"] = getattr(server_config, "url", "Unknown") else: server_configs[name]["command"] = getattr(server_config, "command", "Unknown") server_configs[name]["args"] = getattr(server_config, "args", []) # Get OAuth config from actual config oauth_config = getattr(server_config, "oauth_config", None) if oauth_config and getattr(oauth_config, "enabled", False): server_configs[name]["oauth_config"] = {"enabled": True} # Fallback if config not found elif transport_type in ("sse", "http", "streamablehttp"): server_configs[name]["url"] = "Unknown" else: server_configs[name]["command"] = "Unknown" server_configs[name]["args"] = [] transports = [(name, cfg.get("transport")) for name, cfg in server_configs.items()] logger.debug(f"Retrieved {len(server_configs)} servers from bridge API, transports: {transports}") return server_configs logger.debug(f"Bridge API returned status {response.status}") return None except (aiohttp.ClientError, OSError) as e: logger.debug(f"Bridge API not available: {type(e).__name__}: {e}") return None except Exception as e: logger.debug(f"Failed to get servers from bridge API: {e}") return None async def handle_mcp_add( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Add a new MCP server to the configuration. Args: args: Command line arguments containing server configuration config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Prompts for confirmation if server already exists, unless force flag is used. """ try: # Load existing configuration config = _load_config_safe(config_path, logger) # Normalize server name for consistency from mcp_foxxy_bridge.oauth.utils import _validate_server_name # noqa: PLC0415 normalized_name = _validate_server_name(args.name) if normalized_name != args.name: logger.debug(f"Normalized server name '{args.name}' -> '{normalized_name}'") # Check if server already exists (case-insensitive) servers = config.get("mcpServers", {}) if normalized_name in servers: try: if not Confirm.ask(f"Server '{args.name}' already exists. Overwrite?"): console.print("[yellow]Operation cancelled[/yellow]") return except EOFError: console.print( f"[red]Server '{args.name}' already exists. Use --force to overwrite in non-interactive mode.[/red]" ) return # Build server configuration based on transport type if args.transport in ("sse", "http", "streamablehttp"): if not args.url: console.print(f"[red]--url is required for {args.transport} transport[/red]") return server_config = { "transport": args.transport, "url": args.url, } else: # stdio transport server_config = { "transport": "stdio", "command": args.server_command, } if args.server_args: server_config["args"] = args.server_args # Add common configuration if args.env: server_config["env"] = dict(args.env) if args.cwd: server_config["cwd"] = args.cwd if args.tags: server_config["tags"] = args.tags # OAuth configuration if args.oauth: oauth_config = {"enabled": True} if args.oauth_issuer: oauth_config["issuer"] = args.oauth_issuer server_config["oauth_config"] = oauth_config # Additional server configuration options if hasattr(args, "enabled") and args.enabled is not None: server_config["enabled"] = args.enabled if hasattr(args, "timeout") and args.timeout is not None: server_config["timeout"] = args.timeout if hasattr(args, "retry_attempts") and args.retry_attempts is not None: server_config["retryAttempts"] = args.retry_attempts if hasattr(args, "retry_delay") and args.retry_delay is not None: server_config["retryDelay"] = args.retry_delay if hasattr(args, "health_check") and args.health_check is not None: server_config["healthCheck"] = {"enabled": args.health_check} if hasattr(args, "tool_namespace") and args.tool_namespace is not None: server_config["toolNamespace"] = args.tool_namespace if hasattr(args, "resource_namespace") and args.resource_namespace is not None: server_config["resourceNamespace"] = args.resource_namespace if hasattr(args, "priority") and args.priority is not None: server_config["priority"] = args.priority if hasattr(args, "log_level") and args.log_level is not None: server_config["log_level"] = args.log_level # Headers for HTTP/SSE transports if hasattr(args, "headers") and args.headers and args.transport in ("sse", "http", "streamablehttp"): server_config["headers"] = dict(args.headers) # Security configuration security_config = {} # Read-only mode override if hasattr(args, "read_only") and args.read_only is not None: security_config["read_only_mode"] = args.read_only # Tool security configuration tool_security_config = {} has_tool_security = False if hasattr(args, "allow_patterns") and args.allow_patterns: tool_security_config["allow_patterns"] = args.allow_patterns has_tool_security = True if hasattr(args, "block_patterns") and args.block_patterns: tool_security_config["block_patterns"] = args.block_patterns has_tool_security = True if hasattr(args, "allow_tools") and args.allow_tools: tool_security_config["allow_tools"] = args.allow_tools has_tool_security = True if hasattr(args, "block_tools") and args.block_tools: tool_security_config["block_tools"] = args.block_tools has_tool_security = True if hasattr(args, "classify_tools") and args.classify_tools: classification_overrides = {} for tool_name, tool_type in args.classify_tools: # Input sanitization for tool classifications clean_tool_name = str(tool_name).strip() clean_tool_type = str(tool_type).strip().lower() if clean_tool_name and clean_tool_type in ["read", "write", "unknown"]: classification_overrides[clean_tool_name] = clean_tool_type if classification_overrides: tool_security_config["classification_overrides"] = classification_overrides has_tool_security = True if has_tool_security: security_config["tool_security"] = tool_security_config if security_config: server_config["security"] = security_config # Add server to configuration servers[normalized_name] = server_config config["mcpServers"] = servers # Save configuration _save_config(config, config_path, console, logger) console.print(f"[green]✓[/green] Added MCP server '[cyan]{normalized_name}[/cyan]'") if normalized_name != args.name: console.print(f"[dim]Server name normalized from '{args.name}' to '{normalized_name}'[/dim]") logger.info(f"Added MCP server '{normalized_name}' with transport '{args.transport}'") except Exception as e: console.print(f"[red]Error adding MCP server: {e}[/red]") logger.exception("Failed to add MCP server configuration") async def handle_mcp_remove( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Remove an MCP server from the configuration. Args: args: Command line arguments containing server name and options config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Prompts for confirmation unless force flag is used. """ """Remove an MCP server from configuration.""" try: # Load existing configuration config = _load_config_safe(config_path, logger) servers = config.get("mcpServers", {}) if args.name not in servers: console.print(f"[red]MCP server '{args.name}' not found[/red]") return # Confirm removal if not args.force: try: if not Confirm.ask(f"Remove MCP server '[cyan]{args.name}[/cyan]'?"): console.print("[yellow]Operation cancelled[/yellow]") return except EOFError: console.print("[red]Use --force to remove in non-interactive mode[/red]") return # Remove server del servers[args.name] config["mcpServers"] = servers # Save configuration _save_config(config, config_path, console, logger) console.print(f"[green]✓[/green] Removed MCP server '[cyan]{args.name}[/cyan]'") logger.info(f"Removed MCP server '{args.name}' from configuration") except Exception as e: console.print(f"[red]Error removing MCP server: {e}[/red]") logger.exception("Failed to remove MCP server configuration") async def handle_mcp_list( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """List all configured MCP servers. Args: args: Command line arguments containing format option config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Supports table, JSON, and YAML output formats. """ """List configured MCP servers.""" # First try to get server list from running bridge API servers = await _try_get_servers_from_api(config_path, logger) # Fall back to config file if bridge API is not available if servers is None: logger.debug("Bridge API unavailable, reading from config file") try: config = _load_config_safe(config_path, logger) servers = config.get("mcpServers", {}) except Exception as e: console.print(f"[red]Error loading config file: {e}[/red]") logger.exception("Failed to load server configurations from config file") return if args.format == "json": console.print(json.dumps(servers, indent=2)) elif args.format == "yaml": console.print(yaml.dump(servers, default_flow_style=False)) # type: ignore[no-untyped-call] else: ConfigFormatter.format_servers_table(servers, console) async def handle_mcp_show( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Show detailed configuration for a specific MCP server. Args: args: Command line arguments containing server name and format config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Supports JSON and YAML output formats for detailed server configuration. """ """Show MCP server configuration details.""" try: config = _load_config_safe(config_path, logger) if args.name: # Show specific server servers = config.get("mcpServers", {}) if args.name not in servers: console.print(f"[red]MCP server '{args.name}' not found[/red]") return server_config = {args.name: servers[args.name]} else: # Show all MCP servers server_config = {"mcpServers": config.get("mcpServers", {})} if args.format == "json": ConfigFormatter.format_config_json(server_config, console) else: ConfigFormatter.format_config_yaml(server_config, console) except Exception as e: console.print(f"[red]Error showing MCP server configuration: {e}[/red]") logger.exception("Failed to show MCP server configuration") async def handle_config_show( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Show the complete bridge configuration. Args: args: Command line arguments containing format option config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Shows the full bridge configuration including all servers and bridge settings. """ """Show bridge configuration (excluding MCP servers).""" try: config = _load_config_safe(config_path, logger) # Show only bridge configuration, not MCP servers bridge_config = {k: v for k, v in config.items() if k != "mcpServers"} if args.format == "json": ConfigFormatter.format_config_json(bridge_config, console) else: ConfigFormatter.format_config_yaml(bridge_config, console) except Exception as e: console.print(f"[red]Error showing bridge configuration: {e}[/red]") logger.exception("Failed to show bridge configuration") async def handle_config_validate( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Validate the bridge configuration file. Args: args: Command line arguments (unused for basic validation) config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Attempts to load and parse the configuration file, reporting any errors. Shows a summary of configured servers if validation succeeds. """ """Validate configuration file.""" try: # Try to load configuration bridge_config = load_bridge_config_from_file(str(config_path), {}) console.print("[green]✓[/green] Configuration is valid") # Show summary servers = bridge_config.servers console.print(f"Found {len(servers)} MCP server(s) configured") for name, server_config in servers.items(): status_icon = ( "🔐" if hasattr(server_config, "oauth_config") and server_config.oauth_config and getattr(server_config.oauth_config, "enabled", False) else "🔓" ) transport_type = getattr(server_config, "transport_type", "stdio") console.print(f" {status_icon} {name} ({transport_type})") except Exception as e: console.print(f"[red]✗[/red] Configuration validation failed: {e}") if args.fix: console.print("[yellow]Attempting to fix configuration...[/yellow]") console.print("[yellow]Auto-fix not yet implemented[/yellow]") async def handle_config_init( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Initialize a new bridge configuration file with default settings. Args: args: Command line arguments containing force flag config_path: Path where the configuration file will be created config_dir: Configuration directory for schema reference console: Rich console for output logger: Logger for error reporting Creates a default configuration with a filesystem server example. Prompts for confirmation if file already exists unless force flag is used. """ """Initialize configuration with defaults.""" try: if config_path.exists() and not args.force: try: if not Confirm.ask("Configuration already exists. Overwrite?"): console.print("[yellow]Operation cancelled[/yellow]") return except EOFError: console.print( "[red]Configuration already exists. Use --force to overwrite in non-interactive mode[/red]" ) return # Create default configuration default_config = { "mcpServers": { "filesystem": { "transport": "stdio", "command": "npx", "args": ["-y", "@modelcontextprotocol/server-filesystem", "./"], "tags": ["local", "development"], } }, "bridge": { "conflictResolution": "namespace", "defaultNamespace": True, "aggregation": {"tools": True, "resources": True, "prompts": True}, "host": "127.0.0.1", "port": 9000, }, } # Ensure config directory exists config_path.parent.mkdir(parents=True, exist_ok=True) # Save configuration _save_config(default_config, config_path, console, logger) console.print(f"[green]✓[/green] Initialized configuration at [cyan]{config_path}[/cyan]") console.print("Edit the configuration file to add your MCP servers.") except Exception as e: console.print(f"[red]Error initializing configuration: {e}[/red]") logger.exception("Failed to initialize configuration") async def handle_mcp_restart( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Restart a specific MCP server connection. Args: args: Command line arguments containing server name config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Note: This is a placeholder implementation. Full server restart functionality requires bridge API integration. """ try: # Load configuration to get bridge port config = load_bridge_config_from_file(str(config_path), dict(os.environ)) if config is None or config.bridge is None: console.print("[red]Error: Invalid or missing bridge configuration[/red]") return bridge_port = config.bridge.port # Normalize server name for case-insensitive matching server_name = _validate_server_name(args.server_name) # Make API call to restart the server (config-agnostic) configured_host = config.bridge.host if config.bridge else "127.0.0.1" bridge_host = "127.0.0.1" if configured_host == "0.0.0.0" else configured_host # noqa: S104 url = f"http://{bridge_host}:{bridge_port}/sse/mcp/{server_name}/reconnect" console.print(f"[blue]Restarting MCP server '[cyan]{server_name}[/cyan]'...[/blue]") try: timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session, session.post(url) as response: if response.status == 200: result = await response.json() console.print(f"[green]✓[/green] {result.get('message', 'Server restart initiated')}") console.print(f"Server status: [cyan]{result.get('status', 'unknown')}[/cyan]") elif response.status == 404: # Try to get list of running servers for better error message try: servers_url = f"http://{bridge_host}:{bridge_port}/sse/servers" async with session.get(servers_url) as servers_response: if servers_response.status == 200: servers_data = await servers_response.json() available_servers = [s.get("name", "unknown") for s in servers_data.get("servers", [])] if available_servers: console.print(f"[red]Error: Server '{server_name}' not found or not running[/red]") console.print(f"Available running servers: {', '.join(available_servers)}") else: console.print(f"[red]Error: Server '{server_name}' not found[/red]") console.print("[yellow]No servers are currently running[/yellow]") else: console.print(f"[red]Error: Server '{server_name}' not found or not running[/red]") except Exception: console.print(f"[red]Error: Server '{server_name}' not found or not running[/red]") else: error_text = await response.text() console.print(f"[red]Error restarting server: HTTP {response.status}[/red]") console.print(f"[red]{error_text}[/red]") except aiohttp.ClientError as e: console.print(f"[red]Error connecting to bridge server on port {bridge_port}: {e}[/red]") console.print("[yellow]Make sure the bridge server is running[/yellow]") except Exception as e: console.print(f"[red]Unexpected error during server restart: {e}[/red]") logger.exception("Failed to restart MCP server") except Exception as e: console.print(f"[red]Error restarting MCP server: {e}[/red]") logger.exception("Failed to restart MCP server") async def handle_mcp_status( args: Any, config_path: Path, config_dir: Path, console: Console, logger: logging.Logger, ) -> None: """Show MCP server status, connection state, and discovered tools. Args: args: Command line arguments containing optional server name and format config_path: Path to the configuration file config_dir: Configuration directory (unused) console: Rich console for output logger: Logger for error reporting Shows detailed status information including connection state, tool counts, and security information about suppressed tools. """ try: # Load configuration to get bridge port config = load_bridge_config_from_file(str(config_path), dict(os.environ)) if config is None or config.bridge is None: console.print("[red]Error: Invalid or missing bridge configuration[/red]") return bridge_port = config.bridge.port # Normalize server name if provided server_name = _validate_server_name(args.name) if args.name else None # Get server status from bridge API configured_host = config.bridge.host if config.bridge else "127.0.0.1" bridge_host = "127.0.0.1" if configured_host == "0.0.0.0" else configured_host # noqa: S104 timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: # Get servers list servers_url = f"http://{bridge_host}:{bridge_port}/sse/servers" try: async with session.get(servers_url) as response: if response.status != 200: console.print(f"[red]Error getting server status: HTTP {response.status}[/red]") return servers_data = await response.json() except aiohttp.ClientError as e: console.print(f"[red]Error connecting to bridge server on port {bridge_port}: {e}[/red]") console.print("[yellow]Make sure the bridge server is running[/yellow]") return servers = servers_data.get("servers", []) # Filter by server name if specified if server_name: servers = [s for s in servers if s.get("name") == server_name] if not servers: console.print(f"[red]Server '{server_name}' not found or not running[/red]") available_servers = [s.get("name", "unknown") for s in servers_data.get("servers", [])] if available_servers: console.print(f"Available servers: {', '.join(available_servers)}") return if not servers: console.print("[yellow]No servers are currently running[/yellow]") return # Get detailed information for each server detailed_servers = [] for server in servers: name = server.get("name", "unknown") transport = server.get("transport", "stdio") try: # Get tools list for this server tools_url = f"http://{bridge_host}:{bridge_port}/sse/mcp/{name}/list_tools" async with session.get(tools_url) as tools_response: if tools_response.status == 200: tools_data = await tools_response.json() tools = tools_data.get("tools", []) tool_count = len(tools) tool_names = [t.get("name", "") for t in tools] else: tool_count = 0 tool_names = [] # Get server health status status_url = f"http://{bridge_host}:{bridge_port}/sse/mcp/{name}/status" async with session.get(status_url) as status_response: if status_response.status == 200: status_data = await status_response.json() connection_state = status_data.get("status", "unknown") last_seen = status_data.get("last_seen") if last_seen: dt = datetime.fromtimestamp(last_seen, tz=UTC) last_connected = dt.strftime("%Y-%m-%d %H:%M:%S") else: last_connected = "never" error_count = status_data.get("failure_count", 0) tool_count = status_data.get("tools_count", tool_count) else: connection_state = "error" last_connected = "unknown" error_count = 0 detailed_servers.append( { "name": name, "connection_state": connection_state, "last_connected": last_connected, "error_count": error_count, "tool_count": tool_count, "tools": tool_names, "tags": server.get("tags", []), "transport": transport, } ) except Exception as e: logger.debug(f"Error getting details for server {name}: {e}") detailed_servers.append( { "name": name, "connection_state": "error", "last_connected": "unknown", "error_count": 0, "tool_count": 0, "tools": [], "tags": server.get("tags", []), "transport": server.get("transport", "stdio"), } ) # Output the results if args.format == "json": console.print(json.dumps(detailed_servers, indent=2)) elif args.format == "yaml": console.print(yaml.dump(detailed_servers, default_flow_style=False)) # type: ignore[no-untyped-call] else: # Table format table = Table(title="MCP Server Status") table.add_column("Server", style="cyan") table.add_column("State", style="green") table.add_column("Transport", style="blue") table.add_column("Tools", justify="right", style="yellow") table.add_column("Errors", justify="right", style="red") table.add_column("Last Connected", style="dim") for server in detailed_servers: state_style = "green" if server["connection_state"] == "connected" else "red" state = f"[{state_style}]{server['connection_state']}[/{state_style}]" error_style = "red" if server["error_count"] > 0 else "dim" errors = f"[{error_style}]{server['error_count']}[/{error_style}]" table.add_row( server["name"], state, server["transport"], str(server["tool_count"]), errors, server["last_connected"], ) console.print(table) # Show tool details if single server if len(detailed_servers) == 1: server = detailed_servers[0] if server["tools"]: console.print(f"\n[cyan]Available tools for {server['name']}:[/cyan]") for tool in server["tools"]: console.print(f" • {tool}") if server["tags"]: console.print(f"\n[blue]Tags:[/blue] {', '.join(server['tags'])}") except Exception as e: console.print(f"[red]Error getting MCP server status: {e}[/red]") logger.exception("Failed to get MCP server status")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billyjbryant/mcp-foxxy-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server