Skip to main content
Glama
stack_service.py33.7 kB
""" Stack Management Service Thin facade that delegates to specialized stack management modules. Provides a clean interface while maintaining backward compatibility. """ import re from collections.abc import Awaitable, Callable from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from docker_mcp.core.docker_context import DockerContextManager from docker_mcp.models.enums import ComposeAction import structlog from fastmcp.tools.tool import ToolResult from docker_mcp.models.enums import ComposeAction from ..core.config_loader import DockerMCPConfig from .logs import LogsService from .stack.migration_orchestrator import StackMigrationOrchestrator from .stack.operations import StackOperations from .stack.validation import StackValidation class StackService: """Facade service for Docker Compose stack management operations.""" def __init__( self, config: DockerMCPConfig, context_manager: "DockerContextManager", logs_service: LogsService, ): self.config = config self.context_manager = context_manager self.logger = structlog.get_logger() # Initialize specialized modules self.operations = StackOperations(config, context_manager) self.migration_orchestrator = StackMigrationOrchestrator(config, context_manager) self.validation = StackValidation() self.logs_service = logs_service def _validate_host(self, host_id: str) -> tuple[bool, str]: """Validate host exists in configuration.""" if host_id not in self.config.hosts: return False, f"Host '{host_id}' not found" return True, "" def _unwrap(self, result: ToolResult) -> dict[str, Any]: """Unwrap ToolResult for consistent structured content access.""" if hasattr(result, "structured_content") and result.structured_content is not None: structured = result.structured_content if not isinstance(structured, dict): structured = dict(structured) else: structured = dict(structured) formatted_text = "" if hasattr(result, "content") and result.content: first_content = result.content[0] formatted_text = getattr(first_content, "text", "") or "" if formatted_text and "formatted_output" not in structured: structured["formatted_output"] = formatted_text if "formatted_output" in structured: formatted_value = structured["formatted_output"] ordered = {"formatted_output": formatted_value} for key, value in structured.items(): if key == "formatted_output": continue ordered[key] = value return ordered return structured return {"success": False, "error": "Invalid result format"} # Core Operations - Delegate to StackOperations async def deploy_stack( self, host_id: str, stack_name: str, compose_content: str, environment: dict[str, str] | None = None, pull_images: bool = True, recreate: bool = False, ) -> ToolResult: """Deploy a Docker Compose stack to a remote host.""" return await self.operations.deploy_stack( host_id, stack_name, compose_content, environment, pull_images, recreate ) async def manage_stack( self, host_id: str, stack_name: str, action: str, options: dict[str, Any] | None = None ) -> ToolResult: """Unified stack lifecycle management.""" return await self.operations.manage_stack(host_id, stack_name, action, options) async def list_stacks(self, host_id: str) -> ToolResult: """List Docker Compose stacks on a host.""" return await self.operations.list_stacks(host_id) async def get_stack_compose_file(self, host_id: str, stack_name: str) -> ToolResult: """Get the docker-compose.yml content for a specific stack.""" return await self.operations.get_stack_compose_file(host_id, stack_name) # Migration Operations - Delegate to StackMigrationOrchestrator async def migrate_stack( self, source_host_id: str, target_host_id: str, stack_name: str, skip_stop_source: bool = False, start_target: bool = True, remove_source: bool = False, dry_run: bool = False, ) -> ToolResult: """Migrate a Docker Compose stack between hosts with data integrity protection.""" return await self.migration_orchestrator.migrate_stack( source_host_id=source_host_id, target_host_id=target_host_id, stack_name=stack_name, skip_stop_source=skip_stop_source, start_target=start_target, remove_source=remove_source, dry_run=dry_run, ) # Validation Operations - Direct access for backward compatibility def validate_compose_syntax( self, compose_content: str, stack_name: str ) -> tuple[bool, list[str], dict]: """Validate Docker Compose file syntax and configuration.""" return self.validation.validate_compose_syntax(compose_content, stack_name) async def check_disk_space(self, host_id: str, estimated_size: int) -> tuple[bool, str, dict]: """Check if target host has sufficient disk space.""" is_valid, error_msg = self._validate_host(host_id) if not is_valid: return False, error_msg, {} host = self.config.hosts[host_id] return await self.validation.check_disk_space(host, estimated_size) async def check_tool_availability( self, host_id: str, tools: list[str] ) -> tuple[bool, list[str], dict]: """Check if required tools are available on host.""" is_valid, error_msg = self._validate_host(host_id) if not is_valid: return False, [f"Host validation failed: {error_msg}"], {} host = self.config.hosts[host_id] return await self.validation.check_tool_availability(host, tools) def extract_ports_from_compose(self, compose_content: str) -> list[int]: """Extract exposed ports from compose file.""" return self.validation.extract_ports_from_compose(compose_content) async def check_port_conflicts( self, host_id: str, ports: list[int] ) -> tuple[bool, list[int], dict]: """Check if ports are already in use on host.""" is_valid, error_msg = self._validate_host(host_id) if not is_valid: return False, ports, {"success": False, "error": error_msg} host = self.config.hosts[host_id] return await self.validation.check_port_conflicts(host, ports) def extract_names_from_compose(self, compose_content: str) -> tuple[list[str], list[str]]: """Extract service and network names from compose file.""" return self.validation.extract_names_from_compose(compose_content) async def check_name_conflicts( self, host_id: str, service_names: list[str], network_names: list[str] ) -> tuple[bool, list[str], dict]: """Check for container and network name conflicts.""" is_valid, error_msg = self._validate_host(host_id) if not is_valid: return False, [f"Host validation failed: {error_msg}"], {} host = self.config.hosts[host_id] return await self.validation.check_name_conflicts(host, service_names, network_names) # Legacy method aliases for backward compatibility def _format_stack_action_result( self, result: dict[str, Any], stack_name: str, action: str ) -> list[str]: """Legacy method - delegate to operations module.""" return self.operations._format_stack_action_result(result, stack_name, action) def _format_stacks_list(self, result: dict[str, Any], host_id: str) -> list[str]: """Legacy method - delegate to operations module.""" return self.operations._format_stacks_list(result, host_id) def _format_deploy_result(self, result: dict[str, Any], stack_name: str, host_id: str) -> list[str]: """Delegate to operations module for deployment result formatting.""" return self.operations._format_deploy_result(result, stack_name, host_id) def _format_ps_result(self, result: dict[str, Any], stack_name: str) -> list[str]: """Delegate to operations module for ps result formatting.""" return self.operations._format_ps_result(result, stack_name) def _format_migrate_result(self, result: dict[str, Any], stack_name: str, source_host: str, target_host: str) -> list[str]: """Delegate to operations module for migration result formatting.""" return self.operations._format_migrate_result(result, stack_name, source_host, target_host) # Utility methods that access specialized modules async def test_network_connectivity( self, source_host_id: str, target_host_id: str ) -> tuple[bool, dict]: """Test network connectivity between hosts.""" # Validate both hosts for host_id in [source_host_id, target_host_id]: is_valid, error_msg = self._validate_host(host_id) if not is_valid: return False, {"success": False, "error": error_msg} source_host = self.config.hosts[source_host_id] target_host = self.config.hosts[target_host_id] # Use network module from .stack import StackNetwork network = StackNetwork() return await network.test_network_connectivity(source_host, target_host) def assess_migration_risks( self, stack_name: str, data_size_bytes: int, estimated_downtime: float, source_inventory: dict = None, compose_content: str = "", ) -> dict: """Assess risks associated with migration.""" from .stack import StackRiskAssessment risk_assessment = StackRiskAssessment() return risk_assessment.assess_migration_risks( stack_name, data_size_bytes, estimated_downtime, source_inventory, compose_content ) def extract_expected_mounts( self, compose_content: str, target_appdata: str, stack_name: str ) -> list[str]: """Extract expected volume mounts from compose file.""" from .stack import StackVolumeUtils volume_utils = StackVolumeUtils() return volume_utils.extract_expected_mounts(compose_content, target_appdata, stack_name) def normalize_volume_entry( self, volume: Any, target_appdata: str, stack_name: str ) -> str | None: """Normalize a single volume entry to source:destination format.""" from .stack import StackVolumeUtils volume_utils = StackVolumeUtils() return volume_utils.normalize_volume_entry(volume, target_appdata, stack_name) async def _validate_compose_file_syntax(self, host_id: str, compose_content: str, environment: dict[str, str] | None = None) -> dict[str, Any]: """Validate compose file syntax using docker compose config.""" # Validate host exists is_valid, error_msg = self._validate_host(host_id) if not is_valid: return { "valid": False, "errors": [f"Host validation failed: {error_msg}"], "details": {"host_error": error_msg} } try: # Create temporary files and perform validation return await self._perform_remote_compose_validation(host_id, compose_content, environment) except Exception as e: return { "valid": False, "errors": [f"Validation process failed: {str(e)}"], "details": {"exception": str(e), "exception_type": type(e).__name__} } async def _perform_remote_compose_validation(self, host_id: str, compose_content: str, environment: dict[str, str] | None) -> dict[str, Any]: """Perform compose validation on remote host.""" host = self.config.hosts[host_id] # Create temporary files for compose content and environment temp_files = await self._create_temp_files(compose_content, environment) try: # Transfer files to remote host and validate remote_paths = await self._transfer_files_to_remote_host(host, temp_files) validation_result = await self._execute_remote_validation(host, remote_paths) await self._cleanup_remote_files(host, remote_paths) return validation_result finally: # Clean up local temporary files self._cleanup_local_temp_files(temp_files) async def _create_temp_files(self, compose_content: str, environment: dict[str, str] | None) -> dict[str, str]: """Create temporary files for compose content and environment.""" import tempfile temp_files = {} # Create compose file with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as compose_file: compose_file.write(compose_content) temp_files['compose'] = compose_file.name # Create environment file if needed if environment: with tempfile.NamedTemporaryFile(mode='w', suffix='.env', delete=False) as env_file: for key, value in environment.items(): env_file.write(f"{key}={value}\n") temp_files['env'] = env_file.name return temp_files async def _transfer_files_to_remote_host(self, host, temp_files: dict[str, str]) -> dict[str, str]: """Transfer temporary files to remote host for validation.""" import os remote_paths = { 'compose': f"/tmp/docker-mcp-validate-{os.path.basename(temp_files['compose'])}" # noqa: S108 - Using /tmp with unique identifiers for compose validation only } if 'env' in temp_files: remote_paths['env'] = f"/tmp/docker-mcp-validate-{os.path.basename(temp_files['env'])}" # noqa: S108 - Using /tmp with unique identifiers for compose validation only # Copy compose file to remote host copy_result = await self._copy_file_to_remote(host, temp_files['compose'], remote_paths['compose']) if not copy_result['success']: raise Exception(f"Failed to copy compose file: {copy_result['error']}") # Copy environment file if exists if 'env' in temp_files: await self._copy_file_to_remote(host, temp_files['env'], remote_paths['env']) return remote_paths async def _copy_file_to_remote(self, host, local_path: str, remote_path: str) -> dict[str, Any]: """Copy a single file to remote host via SCP.""" import asyncio copy_cmd = ["scp"] if host.port != 22: copy_cmd.extend(["-P", str(host.port)]) if host.identity_file: copy_cmd.extend(["-i", host.identity_file]) copy_cmd.extend([local_path, f"{host.user}@{host.hostname}:{remote_path}"]) copy_process = await asyncio.create_subprocess_exec( *copy_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) _, copy_stderr = await copy_process.communicate() if copy_process.returncode != 0: return { "success": False, "error": copy_stderr.decode().strip() } return {"success": True} async def _execute_remote_validation(self, host, remote_paths: dict[str, str]) -> dict[str, Any]: """Execute docker compose config validation on remote host.""" import asyncio from ..utils import build_ssh_command # Build validation command ssh_cmd = build_ssh_command(host) validate_cmd = ssh_cmd + [ f"cd /tmp && docker compose -f {remote_paths['compose']}" ] if 'env' in remote_paths: validate_cmd.extend(["--env-file", remote_paths['env']]) validate_cmd.extend(["config", "--quiet"]) # Execute validation validate_process = await asyncio.create_subprocess_exec( *validate_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await validate_process.communicate() if validate_process.returncode == 0: return { "valid": True, "errors": [], "details": {"message": "Compose file syntax is valid"} } else: error_output = stderr.decode().strip() validation_errors = self._parse_compose_validation_errors(error_output) return { "valid": False, "errors": validation_errors, "details": { "raw_error": error_output, "docker_exit_code": validate_process.returncode } } async def _cleanup_remote_files(self, host, remote_paths: dict[str, str]) -> None: """Clean up temporary files on remote host.""" import asyncio from ..utils import build_ssh_command ssh_cmd = build_ssh_command(host) cleanup_cmd = ssh_cmd + [f"rm -f {remote_paths['compose']}"] if 'env' in remote_paths: cleanup_cmd.extend([f"; rm -f {remote_paths['env']}"]) cleanup_process = await asyncio.create_subprocess_exec( *cleanup_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await cleanup_process.communicate() def _cleanup_local_temp_files(self, temp_files: dict[str, str]) -> None: """Clean up local temporary files.""" import os for file_path in temp_files.values(): try: os.unlink(file_path) except OSError: pass # Files may have already been cleaned up def _parse_compose_validation_errors(self, error_output: str) -> list[str]: """Parse docker compose validation errors into user-friendly messages.""" if not error_output: return ["Unknown validation error"] errors = [] lines = error_output.split('\n') for raw_line in lines: line = raw_line.strip() if not line: continue # Parse error line using helper method parsed_error = self._parse_single_error_line(line) errors.append(parsed_error) # If no specific errors were parsed, return the original output if not errors: errors.append(f"Docker compose validation failed: {error_output}") return errors def _parse_single_error_line(self, line: str) -> str: """Parse a single error line and categorize it.""" line_lower = line.lower() # Define error patterns and their categories error_patterns = [ ("yaml:", "YAML syntax error"), (["invalid", "service"], "Invalid service configuration"), ("missing", "Missing required field"), (["unknown", "field"], "Unknown configuration field"), ("environment variable", "Environment variable issue"), ] # Check for specific patterns for pattern, category in error_patterns: if self._line_matches_pattern(line_lower, pattern): return f"{category}: {line}" # Check for resource-specific errors resource_error = self._parse_resource_error(line_lower, line) if resource_error: return resource_error # Generic error return f"Validation error: {line}" def _line_matches_pattern(self, line_lower: str, pattern) -> bool: """Check if line matches a pattern (string or list of strings).""" if isinstance(pattern, str): return pattern in line_lower elif isinstance(pattern, list): return all(term in line_lower for term in pattern) return False def _parse_resource_error(self, line_lower: str, line: str) -> str | None: """Parse resource-specific errors (volume, network, port).""" resource_errors = [ ("volume", "Volume configuration error"), ("network", "Network configuration error"), ("port", "Port configuration error"), ] for resource, error_type in resource_errors: if resource in line_lower and ("invalid" in line_lower or "error" in line_lower): return f"{error_type}: {line}" return None async def handle_action(self, action: ComposeAction | str, **params) -> dict[str, Any]: """Unified action handler for all stack operations. This method consolidates all dispatcher logic from server.py into the service layer. """ try: return await self._dispatch_action(action, **params) except Exception as e: self.logger.error( "stack service action error", action=action, host_id=params.get("host_id", ""), stack_name=params.get("stack_name", ""), error=str(e), ) return { "success": False, "error": f"Service action failed: {str(e)}", "action": action, "host_id": params.get("host_id", ""), "stack_name": params.get("stack_name", ""), } async def _dispatch_action(self, action: ComposeAction | str, **params) -> dict[str, Any]: """Dispatch action to appropriate handler method.""" # Normalize strings to enum when possible normalized_action = self._normalize_action(action) # Create dispatch mapping dispatch_map: dict[ComposeAction, Callable[..., Awaitable[dict[str, Any]]]] = { ComposeAction.LIST: self._handle_list_action, ComposeAction.VIEW: self._handle_view_action, ComposeAction.DEPLOY: self._handle_deploy_action, ComposeAction.LOGS: self._handle_logs_action, ComposeAction.DISCOVER: self._handle_discover_action, ComposeAction.MIGRATE: self._handle_migrate_action, ComposeAction.UP: self._handle_lifecycle_action, ComposeAction.DOWN: self._handle_lifecycle_action, ComposeAction.RESTART: self._handle_lifecycle_action, ComposeAction.BUILD: self._handle_lifecycle_action, ComposeAction.PULL: self._handle_lifecycle_action, ComposeAction.PS: self._handle_manage_action, } # Handle string actions that map to manage action if normalized_action in ["up", "down", "restart", "build", "pull", "ps"]: return await self._handle_manage_action(normalized_action, **params) # Dispatch to appropriate handler if isinstance(normalized_action, ComposeAction): handler = dispatch_map.get(normalized_action) if handler: if normalized_action in [ComposeAction.UP, ComposeAction.DOWN, ComposeAction.RESTART, ComposeAction.BUILD, ComposeAction.PULL, ComposeAction.PS]: return await handler(normalized_action, **params) return await handler(**params) return { "success": False, "error": f"Unsupported action: {normalized_action.value if hasattr(normalized_action, 'value') else normalized_action}", "supported_actions": [a.value for a in ComposeAction], } def _normalize_action(self, action: ComposeAction | str) -> ComposeAction | str: """Normalize action string to enum when possible.""" if isinstance(action, str): try: return ComposeAction(action.lower().strip()) except ValueError: return action.lower().strip() return action def _error_response(self, message: str, **extra: Any) -> dict[str, Any]: formatted_text = f"❌ {message}" response: dict[str, Any] = { "success": False, "error": message, "formatted_output": formatted_text, } response.update(extra) return response async def _handle_list_action(self, **params) -> dict[str, Any]: """Handle LIST action.""" host_id = params.get("host_id", "") if not host_id: return self._error_response("host_id is required for list action") result = await self.list_stacks(host_id) return self._unwrap(result) async def _handle_view_action(self, **params) -> dict[str, Any]: """Handle VIEW action.""" host_id = params.get("host_id", "") stack_name = params.get("stack_name", "") if not host_id: return self._error_response("host_id is required for view action") if not stack_name: return self._error_response("stack_name is required for view action") result = await self.get_stack_compose_file(host_id, stack_name) return self._unwrap(result) async def _handle_deploy_action(self, **params) -> dict[str, Any]: """Handle DEPLOY action.""" host_id = params.get("host_id", "") stack_name = params.get("stack_name", "") compose_content = params.get("compose_content", "") environment = params.get("environment", {}) pull_images = params.get("pull_images", True) recreate = params.get("recreate", False) if not host_id: return self._error_response("host_id is required for deploy action") if not stack_name: return self._error_response("stack_name is required for deploy action") if not compose_content: return self._error_response("compose_content is required for deploy action") # Validate stack name format (allow underscores per IMPLEMENT_ME.md) if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$", stack_name): return self._error_response( "stack_name must contain only letters, numbers, underscores, and hyphens, starting with alphanumeric" ) # Validate stack name length and reserved names if len(stack_name) > 63: return self._error_response("stack_name must be 63 characters or fewer") reserved_names = {"docker", "compose", "system", "network", "volume"} if stack_name.lower() in reserved_names: return self._error_response(f"stack_name '{stack_name}' is reserved") # Validate compose file syntax before deployment validation_result = await self._validate_compose_file_syntax(host_id, compose_content, environment) if not validation_result["valid"]: return { "success": False, "error": "Compose file validation failed", "validation_errors": validation_result["errors"], "validation_details": validation_result.get("details", {}), "formatted_output": "❌ Compose file validation failed", } result = await self.deploy_stack( host_id, stack_name, compose_content, environment, pull_images, recreate ) return self._unwrap(result) async def _handle_manage_action(self, action: ComposeAction | str, **params) -> dict[str, Any]: """Handle string-based manage actions.""" # Convert enum to string if needed action_str = action.value if hasattr(action, "value") else str(action) host_id = params.get("host_id", "") stack_name = params.get("stack_name", "") options = params.get("options", {}) if not host_id: return self._error_response(f"host_id is required for {action_str} action") if not stack_name: return self._error_response(f"stack_name is required for {action_str} action") result = await self.manage_stack(host_id, stack_name, action_str, options) return self._unwrap(result) async def _handle_logs_action(self, **params) -> dict[str, Any]: """Handle LOGS action.""" host_id = params.get("host_id", "") stack_name = params.get("stack_name", "") follow = params.get("follow", False) lines = params.get("lines", 100) if not host_id: return self._error_response("host_id is required for logs action") if not stack_name: return self._error_response("stack_name is required for logs action") if lines < 1 or lines > 10000: return self._error_response("lines must be between 1 and 10000") try: if host_id not in self.config.hosts: return {"success": False, "error": f"Host {host_id} not found"} logs_options = {"tail": str(lines), "follow": follow} result = await self.manage_stack(host_id, stack_name, "logs", logs_options) logs_data = self._unwrap(result) if logs_data.get("success", False): if "output" in logs_data: logs_lines = logs_data["output"].split("\n") if logs_data["output"] else [] header = f"Stack Logs: {stack_name} on {host_id} ({len(logs_lines)} lines)" formatted_lines = [header] if logs_lines: formatted_lines.append("") formatted_lines.extend(logs_lines) formatted_text = "\n".join(formatted_lines) return { "success": True, "host_id": host_id, "stack_name": stack_name, "logs": logs_lines, "lines_requested": lines, "lines_returned": len(logs_lines), "follow": follow, "formatted_output": formatted_text, } logs_data.setdefault("formatted_output", "❌ Failed to retrieve stack logs") return logs_data return self._error_response("Failed to retrieve stack logs") except Exception as e: self.logger.error( "stack logs error", host_id=host_id, stack_name=stack_name, error=str(e) ) return self._error_response(f"Failed to get stack logs: {str(e)}") async def _handle_discover_action(self, **params) -> dict[str, Any]: """Handle DISCOVER action.""" host_id = params.get("host_id", "") if not host_id: return {"success": False, "error": "host_id is required for discover action"} try: compose_manager = self.operations.stack_tools.compose_manager discovery = await compose_manager.discover_compose_locations(host_id) return {"success": True, "host_id": host_id, "compose_discovery": discovery} except Exception as e: self.logger.error("compose discover error", host_id=host_id, error=str(e)) return { "success": False, "error": f"Failed to discover compose paths: {str(e)}", "host_id": host_id, } async def _handle_migrate_action(self, **params) -> dict[str, Any]: """Handle MIGRATE action.""" host_id = params.get("host_id", "") target_host_id = params.get("target_host_id", "") stack_name = params.get("stack_name", "") skip_stop_source = params.get("skip_stop_source", False) start_target = params.get("start_target", True) remove_source = params.get("remove_source", False) dry_run = params.get("dry_run", False) if not host_id: return {"success": False, "error": "host_id (source) is required for migrate action"} if not target_host_id: return {"success": False, "error": "target_host_id is required for migrate action"} if not stack_name: return {"success": False, "error": "stack_name is required for migrate action"} result = await self.migrate_stack( source_host_id=host_id, target_host_id=target_host_id, stack_name=stack_name, skip_stop_source=skip_stop_source, start_target=start_target, remove_source=remove_source, dry_run=dry_run, ) migration_result = self._unwrap(result) if migration_result.get("success", False): # Create a copy to avoid modifying the original migration_result = migration_result.copy() if "overall_success" in migration_result: migration_result["success"] = migration_result["overall_success"] return migration_result return migration_result async def _handle_lifecycle_action(self, action, **params) -> dict[str, Any]: """Handle ComposeAction enum lifecycle actions.""" host_id = params.get("host_id", "") stack_name = params.get("stack_name", "") options = params.get("options", {}) if not host_id: return {"success": False, "error": "host_id is required for stack lifecycle actions"} if not stack_name: return {"success": False, "error": "stack_name is required for stack lifecycle actions"} result = await self.manage_stack( host_id=host_id, stack_name=stack_name, action=action.value, options=options ) return self._unwrap(result)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server