Skip to main content
Glama
validation.py28.5 kB
""" Stack Validation Module All validation and checking logic for Docker Compose stacks. Handles compose syntax validation, resource checks, conflict detection, etc. """ import asyncio import re import shlex import subprocess from typing import Any import structlog from ...core.config_loader import DockerHost from ...utils import build_ssh_command, format_size class StackValidation: """Validation and resource checking for Docker Compose stacks.""" def __init__(self): self.logger = structlog.get_logger() def validate_compose_syntax( self, compose_content: str, stack_name: str ) -> tuple[bool, list[str], dict]: """Validate Docker Compose file syntax and configuration. Args: compose_content: Docker Compose YAML content stack_name: Name of the stack Returns: Tuple of (is_valid: bool, issues: list[str], details: dict) """ issues: list[str] = [] details: dict[str, Any] = { "stack_name": stack_name, "validation_checks": {}, "syntax_valid": False, "services_found": 0, "issues": [], } # Validate stack name early (format, length, reserved names) if not self._validate_stack_name(stack_name, issues, details): details["issues"] = issues return False, issues, details try: # Basic YAML syntax validation compose_data = self._validate_yaml_syntax(compose_content, issues, details) if compose_data is None: return False, issues, details # Validate compose structure structure_valid = self._validate_compose_structure(compose_data, issues, details) if not structure_valid: return False, issues, details # Validate individual services self._validate_services(compose_data.get("services", {}), issues, details) details["issues"] = issues return len(issues) == 0, issues, details except Exception as e: error_msg = f"Failed to validate compose file: {str(e)}" issues.append(error_msg) details["validation_checks"]["general_error"] = {"passed": False, "error": error_msg} details["issues"] = issues return False, issues, details def _validate_yaml_syntax( self, compose_content: str, issues: list[str], details: dict ) -> Any | None: """Validate YAML syntax and return parsed data.""" import yaml try: compose_data = yaml.safe_load(compose_content) details["syntax_valid"] = True details["validation_checks"]["yaml_syntax"] = {"passed": True} return compose_data except yaml.YAMLError as e: issues.append(f"YAML syntax error: {str(e)}") details["validation_checks"]["yaml_syntax"] = {"passed": False, "error": str(e)} details["issues"] = issues return None def _validate_compose_structure( self, compose_data: Any, issues: list[str], details: dict[str, Any] ) -> bool: """Validate basic compose file structure.""" if not isinstance(compose_data, dict): issues.append("Compose file must be a YAML object") details["validation_checks"]["structure"] = { "passed": False, "error": "Not a YAML object", } details["issues"] = issues return False # Check for required sections if "services" not in compose_data: issues.append("No 'services' section found") details["validation_checks"]["services_section"] = { "passed": False, "error": "Missing services section", } return False services = compose_data["services"] if not isinstance(services, dict) or len(services) == 0: issues.append("'services' section is empty or invalid") details["validation_checks"]["services_section"] = { "passed": False, "error": "Empty or invalid services", } return False details["services_found"] = len(services) details["validation_checks"]["services_section"] = { "passed": True, "count": len(services), } return True def _validate_services(self, services: dict, issues: list[str], details: dict) -> None: """Validate individual services configuration.""" service_issues = [] for service_name, service_config in services.items(): if not isinstance(service_config, dict): service_issues.append( f"Service '{service_name}': Invalid configuration (not an object)" ) continue # Check for image or build if "image" not in service_config and "build" not in service_config: service_issues.append( f"Service '{service_name}': Missing 'image' or 'build' directive" ) # Validate port and volume specifications self._validate_service_ports(service_name, service_config.get("ports"), service_issues) self._validate_service_volumes( service_name, service_config.get("volumes"), service_issues ) issues.extend(service_issues) details["validation_checks"]["service_validation"] = { "passed": len(service_issues) == 0, "issues_found": len(service_issues), } def _validate_service_ports( self, service_name: str, ports: Any, service_issues: list[str] ) -> None: """Validate service port specifications.""" if ports is None or not isinstance(ports, list): return for port_spec in ports: if isinstance(port_spec, str): # Allow container-only form like "80" (no host port); validate others via shared parser if ":" not in port_spec: continue if self._parse_port_string(port_spec) is None: service_issues.append( f"Service '{service_name}': Invalid port specification '{port_spec}'" ) elif isinstance(port_spec, dict): if "target" not in port_spec: service_issues.append( f"Service '{service_name}': Port object missing 'target' field" ) def _validate_service_volumes( self, service_name: str, volumes: Any, service_issues: list[str] ) -> None: """Validate service volume specifications.""" if volumes is None or not isinstance(volumes, list): return for volume_spec in volumes: if isinstance(volume_spec, str): if ":" not in volume_spec and not volume_spec.startswith("/"): service_issues.append( f"Service '{service_name}': Invalid volume specification '{volume_spec}'" ) async def check_disk_space( self, host: DockerHost, estimated_size: int ) -> tuple[bool, str, dict]: """Check if target host has sufficient disk space for migration. Args: host: Target host configuration estimated_size: Estimated size needed in bytes Returns: Tuple of (has_space: bool, message: str, details: dict) """ try: # Get disk space information for the appdata directory appdata_path = host.appdata_path or "/opt/docker-appdata" ssh_cmd = build_ssh_command(host) # Use df to get disk space in bytes df_cmd = ssh_cmd + [ f"df -B1 {shlex.quote(appdata_path)} | tail -1 | awk '{{print $2,$3,$4}}'" ] try: result = await asyncio.to_thread( subprocess.run, # nosec B603 df_cmd, capture_output=True, text=True, check=False, timeout=30, ) except subprocess.TimeoutExpired: return ( False, f"Disk space check timed out after 30s on {host.hostname}", { "host": host.hostname, "path_checked": appdata_path, "operation": "check_disk_space", "timed_out": True, "timeout_seconds": 30, }, ) if result.returncode == 0 and result.stdout.strip(): total, used, available = map(int, result.stdout.strip().split()) # Add 20% safety margin required_with_margin = int(estimated_size * 1.2) has_space = available >= required_with_margin details = { "total_space": total, "used_space": used, "available_space": available, "estimated_need": estimated_size, "required_with_margin": required_with_margin, "usage_percentage": (used / total * 100) if total > 0 else 0, "has_sufficient_space": has_space, "path_checked": appdata_path, } if has_space: message = f"✅ Sufficient disk space: {format_size(available)} available, {format_size(required_with_margin)} needed (with 20% margin)" else: shortfall = required_with_margin - available message = f"❌ Insufficient disk space: {format_size(available)} available, {format_size(required_with_margin)} needed (shortfall: {format_size(shortfall)})" return has_space, message, details else: return False, f"Failed to check disk space on {host.hostname}: {result.stderr}", {} except Exception as e: return False, f"Error checking disk space: {str(e)}", {} async def check_tool_availability( self, host: DockerHost, tools: list[str] ) -> tuple[bool, list[str], dict]: """Check if required tools are available on host. Args: host: Host configuration to check tools: List of tool names to check (e.g., ['rsync', 'tar', 'docker']) Returns: Tuple of (all_available: bool, missing_tools: list[str], details: dict) """ ssh_cmd = build_ssh_command(host) tool_status = {} missing_tools = [] for tool in tools: try: # Use 'which' to check if tool is available check_cmd = ssh_cmd + [ f"which {shlex.quote(tool)} >/dev/null 2>&1 && echo 'AVAILABLE' || echo 'MISSING'" ] try: result = await asyncio.to_thread( subprocess.run, # nosec B603 check_cmd, capture_output=True, text=True, check=False, timeout=30, ) except subprocess.TimeoutExpired: self.logger.error( "Tool availability check timed out", hostname=host.hostname, tool=tool, timeout_seconds=30, ) # Create fallback result indicating timeout (treat as missing) result = subprocess.CompletedProcess( args=check_cmd, returncode=1, stdout="MISSING", stderr="Tool check command timed out after 30 seconds", ) except asyncio.CancelledError: self.logger.warning( "Tool availability check cancelled", hostname=host.hostname, tool=tool ) raise is_available = result.returncode == 0 and "AVAILABLE" in result.stdout tool_status[tool] = { "available": is_available, "check_result": result.stdout.strip(), "error": result.stderr if result.stderr else None, } if not is_available: missing_tools.append(tool) except Exception as e: tool_status[tool] = {"available": False, "check_result": None, "error": str(e)} missing_tools.append(tool) all_available = len(missing_tools) == 0 details = { "host": host.hostname, "tools_checked": tools, "tool_status": tool_status, "all_tools_available": all_available, "missing_tools": missing_tools, } return all_available, missing_tools, details def extract_ports_from_compose(self, compose_content: str) -> list[int]: """Extract exposed ports from compose file. Args: compose_content: Docker Compose YAML content Returns: List of port numbers that will be exposed """ try: import yaml compose_data = yaml.safe_load(compose_content) if not compose_data: return [] exposed_ports = [] services = compose_data.get("services", {}) for _service_name, service_config in services.items(): service_ports = self._extract_service_ports(service_config) exposed_ports.extend(service_ports) return sorted(list(set(exposed_ports))) # Remove duplicates and sort except Exception as e: self.logger.warning("Failed to parse ports from compose file", error=str(e)) return [] def _extract_service_ports(self, service_config: dict) -> list[int]: """Extract ports from a single service configuration.""" ports = service_config.get("ports", []) service_ports = [] for port_spec in ports: parsed_port = self._parse_port_specification(port_spec) if parsed_port is not None: service_ports.append(parsed_port) return service_ports def _parse_port_specification(self, port_spec) -> int | None: """Parse a single port specification into a port number.""" if isinstance(port_spec, str): return self._parse_port_string(port_spec) elif isinstance(port_spec, int): return port_spec elif isinstance(port_spec, dict): return self._parse_port_dict(port_spec) else: return None def _parse_port_string(self, port_spec: str) -> int | None: """Parse string format ports and extract the published host port. Returns the host port (int) for published ports, or None for container-only ports. Extracts host port from formats: - 'host_port:container_port' -> host_port - 'ip:host_port:container_port' -> host_port - 'ip:host_port:container_port/proto' -> host_port - '[::1]:host_port:container_port' -> host_port (IPv6) Returns None for container-only formats: - 'port' or 'container_port' -> None (no host mapping) """ try: # Remove protocol suffix if present (e.g., /tcp, /udp) if "/" in port_spec: port_spec = port_spec.split("/")[0] # Handle IPv6 format [ip]:host:container if port_spec.startswith("[") and "]:" in port_spec: # Extract everything after the IPv6 part: [::1]:8080:80 -> 8080:80 _, port_part = port_spec.split("]", 1) if port_part.startswith(":"): port_part = port_part[1:] # Remove leading colon port_spec = port_part # Split on colons to handle different formats parts = port_spec.split(":") if len(parts) == 1: # Simple format: just 'port' (container-only port, no host mapping) return None elif len(parts) == 2: # Format: 'host_port:container_port' return int(parts[0]) elif len(parts) == 3: # Format: 'ip:host_port:container_port' - use middle part as host port return int(parts[1]) elif len(parts) >= 4: # Complex format like host:ip:hostport:containerport - use second-to-last as host port return int(parts[-2]) else: return None except (ValueError, IndexError): return None def _parse_port_dict(self, port_spec: dict) -> int | None: """Parse dictionary format ports with robust field handling. Handles formats like: - {target: 80, published: 8080} - {target: 80, published: "8080"} - {containerPort: 80, hostPort: 8080} """ # Try different field names for published/host port for field_name in ["published", "hostPort", "host_port", "external"]: port_value = port_spec.get(field_name) if port_value is not None: try: if isinstance(port_value, str): # Handle string values that might be complex port specs parsed = self._parse_port_string(port_value) if parsed is not None: return parsed else: return int(port_value) except (ValueError, TypeError): continue return None async def check_port_conflicts( self, host: DockerHost, ports: list[int] ) -> tuple[bool, list[int], dict]: """Check if ports are already in use on host. Args: host: Host configuration to check ports: List of port numbers to check Returns: Tuple of (all_available: bool, conflicting_ports: list[int], details: dict) """ if not ports: return True, [], {"ports_checked": [], "conflicts": {}} ssh_cmd = build_ssh_command(host) conflicting_ports = [] port_details = {} for port in ports: try: # Check if port is in use using netstat or ss check_cmd = ssh_cmd + [ f"(netstat -tuln 2>/dev/null | grep ':{port} ' || ss -tuln 2>/dev/null | grep ':{port} ') && echo 'IN_USE' || echo 'AVAILABLE'" ] try: result = await asyncio.to_thread( subprocess.run, # nosec B603 check_cmd, capture_output=True, text=True, check=False, timeout=30, ) except subprocess.TimeoutExpired: self.logger.error( "Port availability check timed out", hostname=host.hostname, port=port, timeout_seconds=30, ) # Create fallback result indicating timeout result = subprocess.CompletedProcess( args=check_cmd, returncode=1, stdout="", stderr="Port check command timed out after 30 seconds", ) is_in_use = result.returncode == 0 and "IN_USE" in result.stdout port_details[port] = { "in_use": is_in_use, "check_result": result.stdout.strip(), "error": result.stderr if result.stderr else None, } if is_in_use: conflicting_ports.append(port) except Exception as e: port_details[port] = {"in_use": True, "check_result": None, "error": str(e)} conflicting_ports.append(port) all_available = len(conflicting_ports) == 0 details = { "host": host.hostname, "ports_checked": ports, "port_details": port_details, "all_ports_available": all_available, "conflicting_ports": conflicting_ports, } return all_available, conflicting_ports, details async def find_available_port( self, host: DockerHost, starting_port: int, avoid_ports: set[int] | None = None, max_attempts: int = 50, ) -> int: """Find the next available host port on the target machine. Args: host: Host configuration to inspect starting_port: Port number to start searching from (inclusive) avoid_ports: Optional set of ports that should be skipped even if available max_attempts: Maximum number of sequential ports to probe Returns: First available port number greater than or equal to ``starting_port`` Raises: RuntimeError: If no available port is found within the attempt window """ candidate = max(1, starting_port) skip_ports = avoid_ports or set() for _ in range(max_attempts): if candidate in skip_ports: candidate += 1 continue available, _conflicts, _details = await self.check_port_conflicts(host, [candidate]) if available: return candidate candidate += 1 raise RuntimeError( f"Unable to find available port after probing {max_attempts} candidates starting at {starting_port}" ) def extract_names_from_compose(self, compose_content: str) -> tuple[list[str], list[str]]: """Extract service and network names from compose file. Args: compose_content: Docker Compose YAML content Returns: Tuple of (service_names: list[str], network_names: list[str]) """ try: import yaml compose_data = yaml.safe_load(compose_content) service_names = [] network_names = [] # Extract service names services = compose_data.get("services", {}) service_names = list(services.keys()) # Extract network names networks = compose_data.get("networks", {}) network_names = list(networks.keys()) return service_names, network_names except Exception as e: self.logger.warning("Failed to parse names from compose file", error=str(e)) return [], [] async def check_name_conflicts( self, host: DockerHost, service_names: list[str], network_names: list[str] ) -> tuple[bool, list[str], dict]: """Check for container and network name conflicts. Args: host: Host configuration to check service_names: List of service names to check network_names: List of network names to check Returns: Tuple of (no_conflicts: bool, conflicting_names: list[str], details: dict) """ ssh_cmd = build_ssh_command(host) conflicting_names = [] name_details = {} # Check service/container name conflicts for service_name in service_names: try: check_cmd = ssh_cmd + [ f"docker ps -a --filter name=^{shlex.quote(service_name)}$ --format '{{{{.Names}}}}' | grep -x {shlex.quote(service_name)} && echo 'CONFLICT' || echo 'AVAILABLE'" ] result = await asyncio.to_thread( subprocess.run, # nosec B603 check_cmd, capture_output=True, text=True, check=False, timeout=30, ) has_conflict = result.returncode == 0 and "CONFLICT" in result.stdout name_details[f"container_{service_name}"] = { "type": "container", "has_conflict": has_conflict, "check_result": result.stdout.strip(), } if has_conflict: conflicting_names.append(f"container:{service_name}") except Exception as e: name_details[f"container_{service_name}"] = { "type": "container", "has_conflict": True, "error": str(e), } conflicting_names.append(f"container:{service_name}") # Check network name conflicts for network_name in network_names: try: check_cmd = ssh_cmd + [ f"docker network ls --filter name=^{shlex.quote(network_name)}$ --format '{{{{.Name}}}}' | grep -x {shlex.quote(network_name)} && echo 'CONFLICT' || echo 'AVAILABLE'" ] try: result = await asyncio.to_thread( subprocess.run, # nosec B603 check_cmd, capture_output=True, text=True, check=False, timeout=30, ) except subprocess.TimeoutExpired: self.logger.error( "Network conflict check timed out", hostname=host.hostname, network_name=network_name, timeout_seconds=30, ) # Create fallback result indicating timeout result = subprocess.CompletedProcess( args=check_cmd, returncode=1, stdout="", stderr="Command timed out after 30 seconds", ) has_conflict = result.returncode == 0 and "CONFLICT" in result.stdout name_details[f"network_{network_name}"] = { "type": "network", "has_conflict": has_conflict, "check_result": result.stdout.strip(), } if has_conflict: conflicting_names.append(f"network:{network_name}") except Exception as e: name_details[f"network_{network_name}"] = { "type": "network", "has_conflict": True, "error": str(e), } conflicting_names.append(f"network:{network_name}") no_conflicts = len(conflicting_names) == 0 details = { "host": host.hostname, "service_names_checked": service_names, "network_names_checked": network_names, "name_details": name_details, "no_conflicts": no_conflicts, "conflicting_names": conflicting_names, } return no_conflicts, conflicting_names, details def _validate_stack_name(self, stack_name: str, issues: list[str], details: dict) -> bool: """Validate stack name format, length, and reserved names.""" reserved = {"docker", "compose", "system", "network", "volume"} errs: list[str] = [] if len(stack_name) > 63: errs.append("Stack name exceeds 63 characters.") if not re.fullmatch(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$", stack_name or ""): errs.append("Stack name must match ^[a-zA-Z0-9][a-zA-Z0-9_-]*$.") if stack_name.lower() in reserved: errs.append(f"Stack name '{stack_name}' is reserved.") issues.extend(errs) details.setdefault("validation_checks", {})["stack_name"] = { "passed": len(errs) == 0, "errors": errs or None, } return len(errs) == 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server