Skip to main content
Glama
server.py68.5 kB
""" FastMCP Docker SSH Manager Server A production-ready FastMCP server for managing Docker containers and stacks across multiple remote hosts via SSH connections. """ import argparse import importlib import os import sys import tempfile from pathlib import Path from typing import TYPE_CHECKING, Annotated, Any, Literal if TYPE_CHECKING: from docker_mcp.core.docker_context import DockerContextManager from docker_mcp.services.container import ContainerService from docker_mcp.services.host import HostService from docker_mcp.services.stack_service import StackService from fastmcp import FastMCP from fastmcp.resources.resource import FunctionResource from fastmcp.resources.template import FunctionResourceTemplate from fastmcp.tools.tool import ToolResult from pydantic import Field try: from .core.config_loader import DockerMCPConfig, load_config from .core.docker_context import DockerContextManager from .core.file_watcher import HotReloadManager from .core.logging_config import get_server_logger from .middleware import ( ErrorHandlingMiddleware, LoggingMiddleware, RateLimitingMiddleware, TimingMiddleware, ) from .models.params import DockerComposeParams, DockerContainerParams, DockerHostsParams # All tool_params removed - they were unused from .resources import ( ContainerDetailsResource, ContainerListResource, DockerInfoResource, PortMappingResource, StackDetailsResource, StackListResource, ) from .services import ConfigService, ContainerService, HostService, StackService from .services.cleanup import CleanupService except ImportError: from docker_mcp.core.config_loader import DockerMCPConfig, load_config from docker_mcp.core.docker_context import DockerContextManager from docker_mcp.core.file_watcher import HotReloadManager from docker_mcp.core.logging_config import get_server_logger from docker_mcp.middleware import ( ErrorHandlingMiddleware, LoggingMiddleware, RateLimitingMiddleware, TimingMiddleware, ) from docker_mcp.resources import ( ContainerDetailsResource, ContainerListResource, DockerInfoResource, PortMappingResource, StackDetailsResource, StackListResource, ) from docker_mcp.services import ConfigService, ContainerService, HostService, StackService from docker_mcp.services.cleanup import CleanupService # Import enum definitions try: from .models.enums import ( ComposeAction, ContainerAction, HostAction, ) except ImportError: from docker_mcp.models.enums import ( ComposeAction, ContainerAction, HostAction, ) def get_data_dir() -> Path: """Get data directory based on environment with comprehensive validation. Priority order: 1. FASTMCP_DATA_DIR (explicit override) 2. DOCKER_MCP_DATA_DIR (application-specific) 3. XDG_DATA_HOME (Linux/Unix standard) 4. Container detection (/app/data) 5. User home fallback (~/.docker-mcp/data) 6. System temp fallback (/tmp/docker-mcp) """ # Environment variable candidates in priority order (normalized to Path) env_candidates: list[Path | None] = [ (Path(p) if (p := os.getenv("FASTMCP_DATA_DIR")) else None), (Path(p) if (p := os.getenv("DOCKER_MCP_DATA_DIR")) else None), (Path(xdg_path) / "docker-mcp") if (xdg_path := os.getenv("XDG_DATA_HOME")) else None, ] # Check explicit environment overrides for candidate in env_candidates: if candidate: candidate_path = candidate # Validate the path can be created and is writable try: candidate_path.mkdir(parents=True, exist_ok=True) # Test write permissions test_file = candidate_path / ".write_test" test_file.touch() test_file.unlink() return candidate_path except (OSError, PermissionError, FileNotFoundError): # If we can't create/write, continue to next candidate continue # Check if running in container with comprehensive detection container_indicators = [ os.getenv("DOCKER_CONTAINER", "").lower() in ("1", "true", "yes", "on"), os.path.exists("/.dockerenv"), os.path.exists("/app"), os.getenv("container") is not None, # systemd container detection ] if any(container_indicators): container_path = Path("/app/data") try: container_path.mkdir(parents=True, exist_ok=True) return container_path except (OSError, PermissionError): # Container path failed, fall through to other options pass # Standard user data directory fallbacks fallback_candidates = [ Path.home() / ".docker-mcp" / "data", # Primary user directory Path.home() / ".local" / "share" / "docker-mcp", # XDG-style fallback Path(tempfile.gettempdir()) / "docker-mcp" / str(os.getuid() if hasattr(os, "getuid") else "user"), # Temp with user isolation Path(tempfile.gettempdir()) / "docker-mcp", # Final fallback ] for fallback_path in fallback_candidates: try: fallback_path.mkdir(parents=True, exist_ok=True) # Test write permissions test_file = fallback_path / ".write_test" test_file.touch() test_file.unlink() return fallback_path except (OSError, PermissionError, FileNotFoundError): continue # If all else fails, return the primary fallback even if not writable # Let the calling code handle the permission error return Path.home() / ".docker-mcp" / "data" def get_config_dir() -> Path: """Get config directory based on environment with comprehensive validation. Priority order: 1. FASTMCP_CONFIG_DIR (explicit override) 2. DOCKER_MCP_CONFIG_DIR (application-specific) 3. XDG_CONFIG_HOME (Linux/Unix standard) 4. Container detection (/app/config) 5. Local project config (./config) 6. User config fallback (~/.config/docker-mcp) 7. System config fallback (/etc/docker-mcp) """ # Try environment variables first if env_path := _try_environment_config_dirs(): return env_path if env_path.is_absolute() else (Path.cwd() / env_path) # Check for container environment if container_path := _try_container_config_dir(): return container_path if container_path.is_absolute() else (Path.cwd() / container_path) # Try fallback directories if fallback_path := _try_fallback_config_dirs(): return fallback_path if fallback_path.is_absolute() else (Path.cwd() / fallback_path) # Final fallback — prefer absolute path for consistency return Path.cwd() / "config" def _try_environment_config_dirs() -> Path | None: """Try environment variable config directories in priority order.""" env_candidates = [ os.getenv("FASTMCP_CONFIG_DIR"), os.getenv("DOCKER_MCP_CONFIG_DIR"), _get_xdg_config_dir(), ] for candidate in env_candidates: if candidate: if path := _try_create_config_dir(Path(candidate)): return path return None def _get_xdg_config_dir() -> str | None: """Get XDG config directory path if available.""" xdg_home = os.getenv("XDG_CONFIG_HOME") return str(Path(xdg_home) / "docker-mcp") if xdg_home else None def _try_container_config_dir() -> Path | None: """Try container config directory if running in container.""" container_indicators = [ os.getenv("DOCKER_CONTAINER", "").lower() in ("1", "true", "yes", "on"), os.path.exists("/.dockerenv"), os.path.exists("/app"), os.getenv("container") is not None, # systemd container detection ] if any(container_indicators): container_path = Path("/app/config") return _try_create_config_dir(container_path) return None def _try_fallback_config_dirs() -> Path | None: """Try fallback config directories, preferring existing ones.""" fallback_candidates = [ Path("config"), # Local project config (for development) Path.cwd() / "config", # Current working directory config Path.home() / ".config" / "docker-mcp", # User config directory Path.home() / ".docker-mcp" / "config", # Alternative user config Path("/etc/docker-mcp"), # System-wide config (read-only usually) ] # First pass: look for existing readable directories for candidate_path in fallback_candidates: if _is_readable_config_dir(candidate_path): return candidate_path # Second pass: try to create directories (skip system dir) for candidate_path in fallback_candidates[:-1]: if path := _try_create_config_dir(candidate_path): return path return None def _try_create_config_dir(candidate_path: Path) -> Path | None: """Try to create or validate a config directory.""" try: if candidate_path.exists() and candidate_path.is_dir(): return candidate_path # Try to create if it doesn't exist candidate_path.mkdir(parents=True, exist_ok=True) return candidate_path except (OSError, PermissionError): return None def _is_readable_config_dir(candidate_path: Path) -> bool: """Check if a path is an existing readable config directory.""" if not (candidate_path.exists() and candidate_path.is_dir()): return False try: # Test if we can read from the directory list(candidate_path.iterdir()) return True except (OSError, PermissionError): return False class DockerMCPServer: """FastMCP server for Docker management via Docker contexts.""" def _parse_env_float(self, var_name: str, default: float) -> float: """Safely parse environment variable as float with default fallback.""" try: value = os.getenv(var_name) if value is None: return default return float(value) except ValueError: self.logger.warning( f"Invalid {var_name}; using default", value=os.getenv(var_name), default=default ) return default def _parse_env_int(self, var_name: str, default: int) -> int: """Safely parse environment variable as int with default fallback.""" try: value = os.getenv(var_name) if value is None: return default return int(value) except ValueError: self.logger.warning( f"Invalid {var_name}; using default", value=os.getenv(var_name), default=default ) return default def _parse_env_bool(self, var_name: str, default: bool) -> bool: """Safely parse environment variable as bool with default fallback.""" try: value = os.getenv(var_name) if value is None: return default return value.strip().lower() in ("1", "true", "yes", "on") except (ValueError, AttributeError): self.logger.warning( f"Invalid {var_name}; using default", value=os.getenv(var_name), default=default ) return default def __init__(self, config: DockerMCPConfig, config_path: str | None = None): self.config = config self._config_path: str = ( config_path or os.getenv("DOCKER_HOSTS_CONFIG") or str(get_config_dir() / "hosts.yml") ) # Use server logger (writes to mcp_server.log) self.logger = get_server_logger() # Initialize core managers self.context_manager = DockerContextManager(config) # Initialize service layer from .services.logs import LogsService self.logs_service: LogsService = LogsService(config, self.context_manager) self.host_service: HostService = HostService(config, self.context_manager) self.container_service: ContainerService = ContainerService( config, self.context_manager, self.logs_service ) self.stack_service: StackService = StackService( config, self.context_manager, self.logs_service ) self.config_service = ConfigService(config, self.context_manager) self.cleanup_service = CleanupService(config) # No legacy log tools; logs handled via LogsService # Initialize hot reload manager with race condition fix self.hot_reload_manager = HotReloadManager() self.hot_reload_manager.setup_hot_reload(self._config_path, self) # FastMCP app will be created later to prevent auto-start self.app: FastMCP | None = None self.logger.info( "Docker MCP Server initialized", hosts=list(config.hosts.keys()), server_config=config.server.model_dump(), hot_reload_enabled=True, config_path=self._config_path, ) def _initialize_app(self) -> None: """Initialize FastMCP app, middleware, and register tools.""" # Create FastMCP server (attach Google OAuth if configured) auth_provider = self._build_auth_provider() if auth_provider is not None: provider_name = auth_provider.__class__.__name__ self.app = FastMCP("Docker Context Manager", auth=auth_provider) self.logger.info("Authentication provider enabled", provider=provider_name) self._register_auth_diagnostic_tools() else: self.app = FastMCP("Docker Context Manager") self.logger.info("Authentication provider disabled") # Set up test compatibility wrapper self._setup_test_compatibility() # Configure middleware stack self._configure_middleware() # Parse environment values safely at runtime rate_limit_val = self._parse_env_float("RATE_LIMIT_PER_SECOND", 50.0) threshold_val = self._parse_env_float("SLOW_REQUEST_THRESHOLD_MS", 5000.0) rate_limit_display = f"{rate_limit_val} req/sec" threshold_display = f"{threshold_val}ms threshold" self.logger.info( "FastMCP middleware initialized", error_handling=True, rate_limiting=rate_limit_display, timing_monitoring=threshold_display, logging="dual output (console + files)", ) if auth_provider is None: self.logger.info("OAuth diagnostic tools unavailable (authentication disabled)") # Register consolidated tools (3 tools replace 13 individual tools) self.app.tool( self.docker_hosts, annotations={ "title": "Docker Host Management", "readOnlyHint": False, # Some actions (list, ports) read-only, others modify "destructiveHint": False, # Most actions are safe, cleanup can be destructive "idempotentHint": False, # Varies by action (add is not, list is) "openWorldHint": True, # Connects to external Docker hosts via SSH }, ) self.app.tool( self.docker_container, annotations={ "title": "Docker Container Management", "readOnlyHint": False, # Some actions (list, info, logs) read-only, others modify "destructiveHint": False, # Containers are ephemeral, operations are non-destructive "idempotentHint": False, # Varies by action (start/stop not idempotent, list is) "openWorldHint": True, # Connects to external Docker hosts }, ) self.app.tool( self.docker_compose, annotations={ "title": "Docker Compose Stack Management", "readOnlyHint": False, # Some actions (list, discover, logs) read-only, others modify "destructiveHint": True, # down action destroys containers, migrate can remove source "idempotentHint": False, # Varies by action (deploy can be, up/down are not) "openWorldHint": True, # Connects to external Docker hosts and file systems }, ) # Register MCP resources for data access (complement tools with clean URI-based data retrieval) self._register_resources() def _setup_test_compatibility(self) -> None: """Set up test compatibility wrapper for list_tools.""" if self.app is None: return try: app_ref = self.app def _list_tools_sync(): """Synchronous wrapper for list_tools.""" return self._get_tools_from_app(app_ref) # Attach wrapper only if list_tools is absent if not hasattr(self.app, "list_tools"): self.app.list_tools = _list_tools_sync except Exception as e: # Log the exception but continue self.logger.debug("Failed to set up test compatibility wrapper", error=str(e)) def _get_tools_from_app(self, app_ref) -> list: """Extract tools from FastMCP app with proper async handling.""" getter = self._get_tool_getter(app_ref) if getter is None: return [] result = getter() result = self._handle_async_result(result) if result is None: return [] tools_iterable = self._extract_tools_iterable(result) return self._build_compatibility_tools(tools_iterable) def _get_tool_getter(self, app_ref): """Get the appropriate tool getter method from the app.""" getter = getattr(app_ref, "get_tools", None) if getter is None: # Some FastMCP versions might already have list_tools getter = getattr(app_ref, "list_tools", None) return getter def _handle_async_result(self, result): """Handle async coroutine results properly.""" import inspect if not inspect.iscoroutine(result): return result import asyncio from concurrent.futures import ThreadPoolExecutor try: # If no loop is currently running in this thread, run directly asyncio.get_running_loop() except RuntimeError: result = asyncio.run(result) else: # When a loop is already active (pytest-asyncio, etc.), execute the # coroutine in a helper thread to avoid re-entry issues. with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(asyncio.run, result) result = future.result() return result def _extract_tools_iterable(self, result): """Extract tools from result object.""" if isinstance(result, dict): return result.values() return result or [] def _build_compatibility_tools(self, tools_iterable) -> list: """Build compatibility tool objects.""" from copy import deepcopy from types import SimpleNamespace tools_list = list(tools_iterable) compat_tools = [] for tool in tools_list: input_schema = getattr(tool, "input_schema", None) if input_schema is None: input_schema = getattr(tool, "parameters", None) if isinstance(input_schema, dict | list): schema_copy = deepcopy(input_schema) else: schema_copy = input_schema compat_tools.append( SimpleNamespace( name=getattr(tool, "name", ""), description=getattr(tool, "description", ""), inputSchema=schema_copy, raw_tool=tool, ) ) return compat_tools def _configure_middleware(self) -> None: """Configure FastMCP middleware stack.""" if self.app is None: return # Add middleware in logical order (first added = first executed) # Error handling first to catch all errors self.app.add_middleware( ErrorHandlingMiddleware( include_traceback=os.getenv("LOG_LEVEL", "INFO").upper() == "DEBUG", track_error_stats=True, ) ) # Rate limiting to protect against abuse (robust parsing) rate_limit = self._parse_env_float("RATE_LIMIT_PER_SECOND", 50.0) burst_capacity = int(rate_limit * 2) self.app.add_middleware( RateLimitingMiddleware( max_requests_per_second=rate_limit, burst_capacity=burst_capacity, enable_global_limit=True, ) ) # Timing middleware to monitor performance (robust parsing) slow_threshold = self._parse_env_float("SLOW_REQUEST_THRESHOLD_MS", 5000.0) self.app.add_middleware( TimingMiddleware(slow_request_threshold_ms=slow_threshold, track_statistics=True) ) # Logging middleware last to log everything (including middleware processing) include_payloads = self._parse_env_bool("LOG_INCLUDE_PAYLOADS", True) max_payload_length = self._parse_env_int("LOG_MAX_PAYLOAD_LENGTH", 1000) self.app.add_middleware( LoggingMiddleware( include_payloads=include_payloads, max_payload_length=max_payload_length, ) ) def _register_auth_diagnostic_tools(self) -> None: """Register small diagnostic tools when OAuth authentication is active.""" if self.app is None: return try: from fastmcp.server.dependencies import get_access_token except Exception: self.logger.debug( "Auth dependencies unavailable; skipping whoami/get_user_info tools", exc_info=True, ) return @self.app.tool async def whoami() -> dict[str, Any]: """Return identity claims for the authenticated user.""" token = get_access_token() if token is None: return {} return { "iss": token.claims.get("iss"), "sub": token.claims.get("sub"), "email": token.claims.get("email"), "name": token.claims.get("name"), "picture": token.claims.get("picture"), } self.logger.info("OAuth diagnostic tools enabled", tools=["whoami"]) def _build_auth_provider(self) -> Any | None: """Build OAuth provider from environment if configured.""" enable_flag = os.getenv("FASTMCP_ENABLE_OAUTH", "").strip().lower() provider_path = os.getenv("FASTMCP_SERVER_AUTH", "").strip() flag_is_enabled = enable_flag in {"1", "true", "yes", "on"} if not flag_is_enabled: if provider_path: self.logger.info( "Skipping OAuth provider because FASTMCP_ENABLE_OAUTH is not enabled", requested_provider=provider_path, ) else: self.logger.info("OAuth authentication disabled") return None if not provider_path: self.logger.info( "FASTMCP_ENABLE_OAUTH set but no provider configured; OAuth disabled" ) return None try: module_path, class_name = provider_path.rsplit(".", 1) except ValueError: self.logger.error( "Invalid FASTMCP_SERVER_AUTH value", provider=provider_path ) return None try: module = importlib.import_module(module_path) except Exception as exc: self.logger.error( "Failed to import auth provider module", provider=provider_path, error=str(exc), ) return None provider_cls = getattr(module, class_name, None) if provider_cls is None: self.logger.error( "Auth provider class not found", provider=provider_path ) return None try: if provider_path.endswith("GoogleProvider"): base_url = self._get_auth_base_url() redirect_path = os.getenv( "FASTMCP_SERVER_AUTH_GOOGLE_REDIRECT_PATH", "/auth/callback" ) required_scopes = self._parse_auth_scopes() timeout = self._parse_auth_timeout() kwargs = self._build_provider_kwargs( base_url, redirect_path, required_scopes, timeout ) provider = provider_cls(**kwargs) self._configure_allowed_redirects(provider) else: provider = provider_cls() except Exception as exc: self.logger.error( "Failed to initialize auth provider", provider=provider_path, error=str(exc), ) return None return provider def _get_auth_base_url(self) -> str: """Get the base URL for auth callbacks.""" base_url = os.getenv("FASTMCP_SERVER_AUTH_GOOGLE_BASE_URL") if not base_url: # Check for TLS environment variable scheme = ( "https" if os.getenv("FASTMCP_ENABLE_TLS", "").lower() in ("1", "true", "yes") else "http" ) host = self.config.server.host port = self.config.server.port base_url = f"{scheme}://{host}:{port}" return base_url def _parse_auth_scopes(self) -> list[str]: """Parse required auth scopes from environment with enhanced validation.""" scopes_raw = os.getenv("FASTMCP_SERVER_AUTH_GOOGLE_REQUIRED_SCOPES", "") if scopes_raw.strip().startswith("["): required_scopes = self._parse_json_scopes(scopes_raw) else: required_scopes = self._parse_comma_scopes(scopes_raw) # Provide sensible defaults if not required_scopes: required_scopes = [ "openid", "https://www.googleapis.com/auth/userinfo.email", ] return required_scopes def _parse_json_scopes(self, scopes_raw: str) -> list[str]: """Parse JSON array format scopes.""" try: import json parsed_scopes = json.loads(scopes_raw) return self._validate_parsed_scopes(parsed_scopes) except (json.JSONDecodeError, TypeError) as e: self.logger.warning( "Failed to parse FASTMCP_SERVER_AUTH_GOOGLE_REQUIRED_SCOPES as JSON", error=str(e), raw_value=scopes_raw[:100] + "..." if len(scopes_raw) > 100 else scopes_raw, ) return [] def _parse_comma_scopes(self, scopes_raw: str) -> list[str]: """Parse comma/space-separated format scopes.""" parts = [p.strip() for p in scopes_raw.replace(" ", ",").split(",") if p.strip()] validated_scopes = [] for scope in parts: if self._validate_oauth_scope(scope): validated_scopes.append(scope) else: self.logger.warning("Invalid OAuth scope format: %s - skipping", scope) return validated_scopes def _validate_parsed_scopes(self, parsed_scopes: Any) -> list[str]: """Validate and filter parsed scopes.""" if not isinstance(parsed_scopes, list): self.logger.warning( "Invalid scope format - expected JSON array, got %s", type(parsed_scopes).__name__ ) return [] if len(parsed_scopes) > 50: # Reasonable limit self.logger.warning("Too many scopes - maximum 50 allowed, got %d", len(parsed_scopes)) return [] if not all( isinstance(s, str) and len(s.strip()) > 0 and len(s) < 500 for s in parsed_scopes ): self.logger.warning( "Invalid scope entries - all entries must be non-empty strings under 500 characters" ) return [] # Validate each scope validated_scopes = [] for raw_scope in parsed_scopes: scope = raw_scope.strip() if self._validate_oauth_scope(scope): validated_scopes.append(scope) else: self.logger.warning("Invalid OAuth scope format: %s - skipping", scope) return validated_scopes def _validate_oauth_scope(self, scope: str) -> bool: """Validate OAuth scope format.""" import re # OAuth scope should be either: # 1. Simple identifier (letters, numbers, underscores, dots) # 2. Full URL (https://...) # 3. Known standard scopes (openid, profile, email, etc.) if not scope or len(scope) > 500: # Reasonable length limit return False # Known standard OpenID Connect scopes standard_scopes = {"openid", "profile", "email", "address", "phone", "offline_access"} if scope in standard_scopes: return True # Valid URL pattern for Google API scopes url_pattern = r"^https://www\.googleapis\.com/auth/[a-zA-Z0-9._-]+$" if re.match(url_pattern, scope): return True # Simple identifier pattern (letters, numbers, dots, underscores, hyphens) simple_pattern = r"^[a-zA-Z][a-zA-Z0-9._-]*$" if re.match(simple_pattern, scope): return True return False def _parse_auth_timeout(self) -> int | None: """Parse auth timeout from environment.""" timeout = None try: timeout_val = os.getenv("FASTMCP_SERVER_AUTH_GOOGLE_TIMEOUT_SECONDS") if timeout_val: timeout = int(timeout_val) except ValueError: timeout = None return timeout def _build_provider_kwargs( self, base_url: str, redirect_path: str, required_scopes: list[str], timeout: int | None ) -> dict[str, Any]: """Build kwargs for GoogleProvider initialization.""" kwargs: dict[str, Any] = { "base_url": base_url, "required_scopes": required_scopes, "redirect_path": redirect_path, } client_id = os.getenv("FASTMCP_SERVER_AUTH_GOOGLE_CLIENT_ID") client_secret = os.getenv("FASTMCP_SERVER_AUTH_GOOGLE_CLIENT_SECRET") if client_id: kwargs["client_id"] = client_id if client_secret: kwargs["client_secret"] = client_secret if timeout is not None: kwargs["timeout_seconds"] = timeout return kwargs def _configure_allowed_redirects(self, provider) -> None: """Configure allowed client redirect URIs if specified.""" allowed_redirects = os.getenv( "FASTMCP_SERVER_AUTH_GOOGLE_ALLOWED_CLIENT_REDIRECT_URIS", "" ).strip() if allowed_redirects: try: if allowed_redirects.startswith("["): import json parsed_patterns = json.loads(allowed_redirects) if not isinstance(parsed_patterns, list): raise ValueError("Expected list for redirect patterns") patterns = [str(p).strip() for p in parsed_patterns if str(p).strip()] else: patterns = [p.strip() for p in allowed_redirects.split(",") if p.strip()] # property exists in FastMCP 2.12.x provider.allowed_client_redirect_uris = patterns except Exception: # Older FastMCP versions may not support this; log and continue self.logger.debug( "Skipping allowed_client_redirect_uris; provider does not support or failed to set", exc_info=True, ) def _resource_to_template(self, resource: FunctionResource) -> FunctionResourceTemplate: """Convert a function-backed resource into a template for URI parameters.""" # Extract the raw URI template string before URL encoding transforms it # This preserves template parameters like {host_id} and {stack_name} uri_template = resource.uri if hasattr(resource.uri, '__str__'): # If the URI was originally a template string, get the raw template # For AnyUrl objects, we need to preserve the original template format uri_str = str(resource.uri) # Decode URL-encoded template parameters back to their original form import urllib.parse uri_str = urllib.parse.unquote(uri_str) uri_template = uri_str else: uri_template = str(resource.uri) return FunctionResourceTemplate.from_function( fn=resource.fn, uri_template=uri_template, name=resource.name, title=resource.title, description=resource.description, mime_type=resource.mime_type, tags=resource.tags, enabled=resource.enabled, annotations=resource.annotations, meta=resource.meta, ) def _register_resources(self) -> None: """Register MCP resources for data access. Resources provide clean, URI-based access to data without side effects. They complement tools by offering cacheable, parametrized data retrieval. """ if self.app is None: return try: # Port mapping resource - ports://{host_id} port_template = self._resource_to_template( PortMappingResource(self.container_service, self) ) self.app.add_template(port_template) # Docker host info resource - docker://{host_id}/info info_template = self._resource_to_template( DockerInfoResource(self.context_manager, self.host_service) ) self.app.add_template(info_template) # Compose stack listing resource - stacks://{host_id} stack_list_template = self._resource_to_template( StackListResource(self.stack_service) ) self.app.add_template(stack_list_template) # Compose stack detail resource - stacks://{host_id}/{stack_name} stack_detail_template = self._resource_to_template( StackDetailsResource(self.stack_service) ) self.app.add_template(stack_detail_template) # Container listing resource - containers://{host_id} container_list_template = self._resource_to_template( ContainerListResource(self.container_service) ) self.app.add_template(container_list_template) # Container detail resource - containers://{host_id}/{container_id} container_detail_template = self._resource_to_template( ContainerDetailsResource(self.container_service) ) self.app.add_template(container_detail_template) self.logger.info( "MCP resource templates registered successfully", templates_count=6, uri_schemes=["ports://", "docker://", "stacks://", "containers://"], ) except Exception as e: self.logger.error("Failed to register MCP resources", error=str(e)) # Don't fail the server startup, just log the error # Resources are optional enhancements to the tool-based API # Consolidated Tools Implementation async def docker_hosts( self, action: Annotated[ str | HostAction | None, Field(default=None, description="Action to perform (defaults to list if not provided)"), ] = None, ssh_host: Annotated[str, Field(default="", description="SSH hostname or IP address")] = "", ssh_user: Annotated[str, Field(default="", description="SSH username")] = "", ssh_port: Annotated[ int, Field(default=22, ge=1, le=65535, description="SSH port number") ] = 22, ssh_key_path: Annotated[ str, Field(default="", description="Path to SSH private key file") ] = "", description: Annotated[str, Field(default="", description="Host description")] = "", tags: Annotated[list[str] | None, Field(default=None, description="Host tags")] = None, compose_path: Annotated[ str, Field(default="", description="Docker Compose file path") ] = "", appdata_path: Annotated[ str, Field(default="", description="Application data storage path") ] = "", enabled: Annotated[bool, Field(default=True, description="Whether host is enabled")] = True, ssh_config_path: Annotated[ str, Field(default="", description="Path to SSH config file") ] = "", selected_hosts: Annotated[ str, Field(default="", description="Comma-separated list of hosts to select") ] = "", cleanup_type: Annotated[ Literal["check", "safe", "moderate", "aggressive"] | None, Field(default=None, description="Type of cleanup to perform"), ] = None, port: Annotated[ int, Field(default=0, ge=0, le=65535, description="Port number to check availability") ] = 0, host_id: Annotated[str, Field(default="", description="Host identifier")] = "", ) -> ToolResult | dict[str, Any]: """Simplified Docker hosts management tool. Actions: • list: List all configured Docker hosts - Required: none • add: Add a new Docker host (auto-runs test_connection and discover) - Required: ssh_host, ssh_user, host_id - Optional: ssh_port (default: 22), ssh_key_path, description, tags, enabled (default: true) • ports: List or check port usage on a host - Required: host_id - Optional: port (for availability check) • import_ssh: Import hosts from SSH config (auto-runs test_connection and discover for each) - Required: none - Optional: ssh_config_path, selected_hosts • cleanup: Docker system cleanup - Required: cleanup_type, host_id - Valid cleanup_type: "check" | "safe" | "moderate" | "aggressive" • test_connection: Test host connectivity (also runs discover) - Required: host_id • discover: Discover paths and capabilities on hosts - Required: host_id (use 'all' to discover all hosts sequentially) - Discovers: compose_path, appdata_path - Single host: Fast discovery (5-15 seconds) - All hosts: Sequential discovery (30-60 seconds total) - Auto-tags: Adds discovery status tags • edit: Modify host configuration - Required: host_id - Optional: ssh_host, ssh_user, ssh_port, ssh_key_path, description, tags, compose_path, appdata_path, enabled • remove: Remove host from configuration - Required: host_id """ # Parse and validate parameters using the parameter model try: # Convert string action to enum if isinstance(action, str): action_enum = HostAction(action) elif action is None: action_enum = HostAction.LIST else: action_enum = action params = DockerHostsParams( action=action_enum, ssh_host=ssh_host, ssh_user=ssh_user, ssh_port=ssh_port, ssh_key_path=ssh_key_path if ssh_key_path else None, description=description, tags=tags or [], compose_path=compose_path if compose_path else None, appdata_path=appdata_path if appdata_path else None, enabled=enabled, port=port, cleanup_type=cleanup_type, ssh_config_path=ssh_config_path if ssh_config_path else None, selected_hosts=selected_hosts if selected_hosts else None, host_id=host_id, ) # Use validated enum from parameter model action = params.action except Exception as e: return { "success": False, "error": f"Parameter validation failed: {str(e)}", "action": str(action) if action else "unknown", } # Delegate to service layer for business logic service_result = await self.host_service.handle_action( action, **params.model_dump(exclude={"action"}) ) # Check if service returned formatted output and convert to ToolResult # This preserves the token-efficient formatting created by ContainerService if isinstance(service_result, dict) and "formatted_output" in service_result: from fastmcp.tools.tool import ToolResult from mcp.types import TextContent formatted_text = service_result.get("formatted_output", "") if formatted_text: return ToolResult( content=[TextContent(type="text", text=formatted_text)], structured_content=service_result ) # Return service result as-is (dict for unformatted actions) return service_result async def docker_container( self, action: Annotated[str | ContainerAction, Field(description="Action to perform")], container_id: Annotated[str, Field(default="", description="Container identifier")] = "", image_name: Annotated[str, Field(default="", description="Image name for pull action")] = "", all_containers: Annotated[ bool, Field(default=False, description="Include all containers, not just running") ] = False, limit: Annotated[ int, Field(default=20, ge=1, le=1000, description="Maximum number of results to return") ] = 20, offset: Annotated[int, Field(default=0, ge=0, description="Number of results to skip")] = 0, follow: Annotated[bool, Field(default=False, description="Follow log output")] = False, lines: Annotated[ int, Field(default=100, ge=1, le=10000, description="Number of log lines to retrieve") ] = 100, force: Annotated[bool, Field(default=False, description="Force the operation")] = False, timeout: Annotated[ int, Field(default=10, ge=1, le=300, description="Operation timeout in seconds") ] = 10, host_id: Annotated[str, Field(default="", description="Host identifier")] = "", ) -> ToolResult | dict[str, Any]: """Consolidated Docker container management tool. Actions: • list: List containers on a host - Required: host_id - Optional: all_containers, limit, offset • info: Get container information - Required: container_id, host_id • start: Start a container - Required: container_id, host_id - Optional: force, timeout • stop: Stop a container - Required: container_id, host_id - Optional: force, timeout • restart: Restart a container - Required: container_id, host_id - Optional: force, timeout • remove: Remove a container - Required: container_id, host_id - Optional: force • logs: Get container logs - Required: container_id, host_id - Optional: follow, lines • pull: Pull a container image - Required: image_name, host_id """ # Parse and validate parameters using the parameter model try: # Convert string action to enum if isinstance(action, str): action_enum = ContainerAction(action) else: action_enum = action params = DockerContainerParams( action=action_enum, container_id=container_id, image_name=image_name, all_containers=all_containers, limit=limit, offset=offset, follow=follow, lines=lines, force=force, timeout=timeout, host_id=host_id, ) # Use validated enum from parameter model action = params.action except Exception as e: return { "success": False, "error": f"Parameter validation failed: {str(e)}", "action": str(action) if action else "unknown", } # Delegate to service layer for business logic return await self.container_service.handle_action( action, **params.model_dump(exclude={"action"}) ) async def docker_compose( self, action: Annotated[str | ComposeAction, Field(description="Action to perform")], stack_name: Annotated[str, Field(default="", description="Stack name")] = "", compose_content: Annotated[ str, Field(default="", description="Docker Compose file content") ] = "", environment: Annotated[ dict[str, str] | None, Field(default=None, description="Environment variables") ] = None, pull_images: Annotated[ bool, Field(default=True, description="Pull images before deploying") ] = True, recreate: Annotated[bool, Field(default=False, description="Recreate containers")] = False, follow: Annotated[bool, Field(default=False, description="Follow log output")] = False, lines: Annotated[ int, Field(default=100, ge=1, le=10000, description="Number of log lines to retrieve") ] = 100, dry_run: Annotated[ bool, Field(default=False, description="Perform a dry run without making changes") ] = False, options: Annotated[ dict[str, str] | None, Field(default=None, description="Additional options for the operation"), ] = None, target_host_id: Annotated[ str, Field(default="", description="Target host ID for migration operations") ] = "", remove_source: Annotated[ bool, Field(default=False, description="Remove source stack after migration") ] = False, skip_stop_source: Annotated[ bool, Field(default=False, description="Skip stopping source stack before migration") ] = False, start_target: Annotated[ bool, Field(default=True, description="Start target stack after migration") ] = True, host_id: Annotated[str, Field(default="", description="Host identifier")] = "", ) -> ToolResult | dict[str, Any]: """Consolidated Docker Compose stack management tool. Actions: • list: List stacks on a host - Required: host_id • view: View the compose file for a stack - Required: stack_name, host_id • deploy: Deploy a stack - Required: stack_name, compose_content, host_id - Optional: environment, pull_images, recreate • up/down/restart/build/pull: Manage stack lifecycle - Required: stack_name, host_id - Optional: options • ps: List services in a stack - Required: stack_name, host_id - Optional: options • discover: Discover compose paths on a host - Required: host_id • logs: Get stack logs - Required: stack_name, host_id - Optional: follow, lines • migrate: Migrate stack between hosts - Required: stack_name, target_host_id, host_id - Optional: remove_source, skip_stop_source, start_target, dry_run """ # Parse and validate parameters using the parameter model try: # Convert string action to enum if isinstance(action, str): action_enum = ComposeAction(action) else: action_enum = action params = DockerComposeParams( action=action_enum, stack_name=stack_name, compose_content=compose_content, environment=environment or {}, pull_images=pull_images, recreate=recreate, follow=follow, lines=lines, dry_run=dry_run, options=options or {}, target_host_id=target_host_id, remove_source=remove_source, skip_stop_source=skip_stop_source, start_target=start_target, host_id=host_id, ) # Use validated enum from parameter model action = params.action except Exception as e: return { "success": False, "error": f"Parameter validation failed: {str(e)}", "action": str(action) if action else "unknown", } # Delegate to service layer for business logic return await self.stack_service.handle_action( action, **params.model_dump(exclude={"action"}) ) async def add_docker_host( self, host_id: str, ssh_host: str, ssh_user: str, ssh_port: int = 22, ssh_key_path: str | None = None, description: str = "", tags: list[str] | None = None, compose_path: str | None = None, enabled: bool = True, ) -> dict[str, Any]: """Add a new Docker host for management.""" return await self.host_service.add_docker_host( host_id, ssh_host, ssh_user, ssh_port, ssh_key_path, description, tags, compose_path, enabled, ) async def list_docker_hosts(self) -> dict[str, Any]: """List all configured Docker hosts.""" return await self.host_service.list_docker_hosts() async def list_containers( self, host_id: str, all_containers: bool = False, limit: int = 20, offset: int = 0 ) -> ToolResult: """List containers on a specific Docker host with pagination.""" return await self.container_service.list_containers(host_id, all_containers, limit, offset) async def get_container_info(self, host_id: str, container_id: str) -> ToolResult: """Get detailed information about a specific container.""" return await self.container_service.get_container_info(host_id, container_id) async def get_container_logs( self, host_id: str, container_id: str, lines: int = 100, follow: bool = False ) -> dict[str, Any]: """Get logs from a container. Args: host_id: Target Docker host identifier container_id: Container ID or name lines: Number of log lines to retrieve follow: Stream logs in real-time Returns: Container logs """ try: if host_id not in self.config.hosts: return {"success": False, "error": f"Host {host_id} not found"} # Use logs service to get logs logs_result = await self.logs_service.get_container_logs( host_id=host_id, container_id=container_id, lines=lines, since=None, timestamps=False, ) # Extract logs array from ContainerLogs model for cleaner API if isinstance(logs_result, dict) and "logs" in logs_result: logs = logs_result["logs"] # This is the list[str] of actual log lines truncated = logs_result.get("truncated", False) else: logs = [] truncated = False return { "success": True, "host_id": host_id, "container_id": container_id, "logs": logs, # Now this is list[str] of actual log lines "lines_requested": lines, "lines_returned": len(logs), "truncated": truncated, "follow": follow, } except Exception as e: self.logger.error( "Failed to get container logs", host_id=host_id, container_id=container_id, error=str(e), ) return { "success": False, "error": str(e), "host_id": host_id, "container_id": container_id, } async def manage_container( self, host_id: str, container_id: str, action: str, force: bool = False, timeout: int = 10 ) -> ToolResult: """Unified container action management.""" return await self.container_service.manage_container( host_id, container_id, action, force, timeout ) async def pull_image(self, host_id: str, image_name: str) -> ToolResult: """Pull a Docker image on a remote host.""" return await self.container_service.pull_image(host_id, image_name) async def list_host_ports(self, host_id: str, include_stopped: bool = False) -> ToolResult: """List all ports currently in use by containers on a Docker host.""" # Note: ContainerService.list_host_ports only takes host_id (includes stopped containers by default) return await self.container_service.list_host_ports(host_id) async def deploy_stack( self, host_id: str, stack_name: str, compose_content: str, environment: dict[str, str] | None = None, pull_images: bool = True, recreate: bool = False, ) -> ToolResult: """Deploy a Docker Compose stack to a remote host.""" return await self.stack_service.deploy_stack( host_id, stack_name, compose_content, environment, pull_images, recreate ) async def manage_stack( self, host_id: str, stack_name: str, action: str, options: dict[str, Any] | None = None ) -> ToolResult: """Unified stack lifecycle management.""" return await self.stack_service.manage_stack(host_id, stack_name, action, options) async def list_stacks(self, host_id: str) -> ToolResult: """List Docker Compose stacks on a host.""" return await self.stack_service.list_stacks(host_id) async def update_host_config(self, host_id: str, compose_path: str) -> ToolResult: """Update host configuration with compose file path.""" return await self.config_service.update_host_config(host_id, compose_path) async def import_ssh_config( self, ssh_config_path: str | None = None, selected_hosts: str | None = None, compose_path_overrides: dict[str, str] | None = None, auto_confirm: bool = False, ) -> ToolResult: """Import hosts from SSH config with interactive selection and compose path discovery.""" return await self.config_service.import_ssh_config( ssh_config_path, selected_hosts, self._config_path ) def _to_dict(self, result: Any, fallback_msg: str = "No structured content") -> dict[str, Any]: """Convert ToolResult to dictionary for programmatic access.""" if hasattr(result, "structured_content"): return result.structured_content or {"success": True, "data": fallback_msg} return result # Removed unused _normalize_protocol helper; reintroduce with tests when needed. def update_configuration(self, new_config: DockerMCPConfig) -> None: """Update server configuration and reinitialize components.""" self.config = new_config # Update managers with new config self.context_manager.config = new_config # Update service classes with new config self.host_service.config = new_config self.container_service.config = new_config self.stack_service.config = new_config self.config_service.config = new_config self.cleanup_service.config = new_config # Recreate logs service with updated config from .services.logs import LogsService self.logs_service = LogsService(new_config, self.context_manager) # Propagate the new logs service to dependent services try: self.container_service.logs_service = self.logs_service except Exception as e: self.logger.debug("Failed to set logs_service on container_service", error=str(e)) try: self.stack_service.logs_service = self.logs_service except Exception as e: self.logger.debug("Failed to set logs_service on stack_service", error=str(e)) self.logger.info("Configuration updated", hosts=list(new_config.hosts.keys())) async def start_hot_reload(self) -> None: """Start hot reload watcher if configured.""" await self.hot_reload_manager.start_hot_reload() async def stop_hot_reload(self) -> None: """Stop hot reload watcher.""" await self.hot_reload_manager.stop_hot_reload() def run(self) -> None: """Run the FastMCP server.""" try: # Initialize FastMCP app and run once self._initialize_app() self.logger.info( "Starting Docker MCP Server", host=self.config.server.host, port=self.config.server.port, ) # FastMCP.run() is synchronous and manages its own event loop if self.app is None: raise RuntimeError("FastMCP app not initialized") self.app.run( transport="http", host=self.config.server.host, port=self.config.server.port, ) except Exception as e: self.logger.error("Server startup failed", error=str(e)) raise def parse_args() -> argparse.Namespace: """Parse command line arguments.""" try: from dotenv import load_dotenv load_dotenv() except ImportError: # dotenv is optional - continue without it if not available pass except Exception as e: # Log unexpected errors but continue - environment loading shouldn't block startup import logging logging.getLogger("docker_mcp").debug("Failed to load .env file: %s", str(e)) default_host = os.getenv("FASTMCP_HOST", "127.0.0.1") # nosec B104 - Use 0.0.0.0 for container deployment default_port = int(os.getenv("FASTMCP_PORT", "8000")) default_log_level = os.getenv("LOG_LEVEL", "INFO") default_config = os.getenv("DOCKER_HOSTS_CONFIG", str(get_config_dir() / "hosts.yml")) parser = argparse.ArgumentParser(description="FastMCP Docker SSH Manager") parser.add_argument("--host", default=default_host, help="Server host") parser.add_argument("--port", type=int, default=default_port, help="Server port") parser.add_argument("--config", default=default_config, help="Configuration file path") parser.add_argument( "--log-level", default=default_log_level, choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging level", ) parser.add_argument( "--validate-config", action="store_true", help="Validate configuration and exit" ) return parser.parse_args() def main() -> None: """Main entry point.""" args = parse_args() # Setup logging log_dir = _setup_log_directory() logger = _setup_logging_system(args, log_dir) # Load and configure application config, config_path_for_reload = _load_and_configure(args, logger) if config is None: # Validation-only mode return # Create server and setup hot reload server = DockerMCPServer(config, config_path=config_path_for_reload) _setup_hot_reload(server, logger) # Run server with error handling _run_server(server, logger) def _setup_log_directory() -> str | None: """Setup log directory with fallback options.""" log_dir_candidates = [ os.getenv("LOG_DIR"), # Explicit environment override str(get_data_dir() / "logs"), # Primary data directory str(Path.home() / ".local" / "share" / "docker-mcp" / "logs"), # User fallback str(Path(tempfile.gettempdir()) / "docker-mcp-logs"), # System fallback ] for candidate in log_dir_candidates: if candidate: try: candidate_path = Path(candidate) candidate_path.mkdir(parents=True, exist_ok=True) if candidate_path.is_dir() and os.access(candidate_path, os.W_OK): return str(candidate_path) except (OSError, PermissionError): continue print("Warning: Unable to create log directory, using console-only logging") return None def _setup_logging_system(args, log_dir: str | None): """Setup logging system with error handling.""" from docker_mcp.core.logging_config import get_server_logger, setup_logging # Parse log file size with validation try: max_file_size_mb = int(os.getenv("LOG_FILE_SIZE_MB", "10")) if max_file_size_mb < 1 or max_file_size_mb > 100: max_file_size_mb = 10 # Reset to default if out of range except ValueError: max_file_size_mb = 10 # Setup logging with error handling try: setup_logging( log_dir=log_dir or "/tmp", log_level=args.log_level, max_file_size_mb=max_file_size_mb ) logger = get_server_logger() # Log successful initialization with configuration details logger.info( "Logging system initialized", log_dir=log_dir, log_level=args.log_level, max_file_size_mb=max_file_size_mb, console_logging=True, file_logging=log_dir is not None, ) return logger except Exception as e: print(f"Logging setup failed ({e}), using basic console logging") import logging logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) return logging.getLogger("docker_mcp") def _load_and_configure(args, logger) -> tuple[DockerMCPConfig | None, str]: """Load and configure application, returning None for validation-only mode.""" # Load configuration config = load_config(args.config) # Override server config from CLI args config.server.host = args.host config.server.port = args.port config.server.log_level = args.log_level # Validate configuration only if args.validate_config: logger.info("Configuration validation successful") logger.info("✅ Configuration is valid") return None, "" # Determine config path for reload config_path_for_reload = args.config or os.getenv( "DOCKER_HOSTS_CONFIG", str(get_config_dir() / "hosts.yml") ) logger.info( "Hot reload configuration", config_path=config_path_for_reload, args_config=args.config, env_config=os.getenv("DOCKER_HOSTS_CONFIG"), ) return config, config_path_for_reload def _setup_hot_reload(server: "DockerMCPServer", logger) -> None: """Setup hot reload in background thread with robust error handling.""" import asyncio import threading import time async def start_hot_reload(): # Wait longer to ensure FastMCP is completely initialized await asyncio.sleep(5.0) # Attempt hot reload with extensive error handling max_retries = 3 for attempt in range(max_retries): try: # Pre-verify FastMCP modules are available import fastmcp import fastmcp.settings # Verify the specific attribute exists before proceeding if not hasattr(fastmcp.settings, 'mask_error_details'): logger.warning( f"FastMCP settings module missing mask_error_details (attempt {attempt + 1}/{max_retries})" ) if attempt < max_retries - 1: await asyncio.sleep(2.0 ** attempt) # Exponential backoff continue else: logger.error("Hot reload disabled due to FastMCP module incompatibility") return logger.debug("FastMCP modules verified successfully in hot reload thread") await server.start_hot_reload() return except Exception as e: logger.warning( f"Hot reload initialization attempt {attempt + 1}/{max_retries} failed", error=str(e) ) if attempt < max_retries - 1: await asyncio.sleep(2.0 ** attempt) # Exponential backoff else: logger.error("Hot reload disabled after multiple failures", error=str(e)) return def run_hot_reload(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(start_hot_reload()) # Keep the loop running to handle file changes loop.run_forever() except Exception as e: logger.error("Hot reload thread crashed", error=str(e)) hot_reload_thread = threading.Thread(target=run_hot_reload, daemon=True, name="HotReloadThread") hot_reload_thread.start() logger.info("Hot reload enabled for configuration changes") def _run_server(server: "DockerMCPServer", logger) -> None: """Run server with error handling.""" try: server.run() except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error("Server error", error=str(e)) sys.exit(1) # Note: FastMCP dev mode not used - we run our own server with hot reload # Module-level app instance for FastMCP configuration file support _app_instance = None def create_app(): """Create and return FastMCP app instance for configuration file usage.""" import argparse from pathlib import Path # Create minimal args for config loading args = argparse.Namespace( config=os.getenv("DOCKER_HOSTS_CONFIG", "config/hosts.yml"), validate_only=False, log_level=os.getenv("LOG_LEVEL", "INFO"), hot_reload=False, log_file_size="10MB", log_file_count=5, log_dir=None ) # Setup minimal logging for app creation log_dir = _setup_log_directory() logger = _setup_logging_system(args, log_dir) # Load configuration config, _ = _load_and_configure(args, logger) if config is None: raise RuntimeError("Failed to load configuration") # Create server instance and initialize app server = DockerMCPServer(config) server._initialize_app() return server.app def get_app(): """Get the FastMCP app instance, creating it if necessary.""" global _app_instance if _app_instance is None: _app_instance = create_app() return _app_instance # Direct access for FastMCP (creates app on first access) def __getattr__(name): if name == "app": return get_app() raise AttributeError(f"module '{__name__}' has no attribute '{name}'") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server