Skip to main content
Glama
migration_executor.py44.7 kB
""" Stack Migration Executor Module Core migration execution logic for Docker Compose stacks. Handles compose file operations, data transfer, deployment, and verification. """ import asyncio import shlex import subprocess import tempfile from collections.abc import Callable from datetime import datetime from typing import Any, cast import structlog from ...core.backup import BackupInfo, BackupManager from ...core.config_loader import DockerHost, DockerMCPConfig from ...core.docker_context import DockerContextManager from ...core.migration.manager import MigrationManager from ...tools.stacks import StackTools from ...utils import build_ssh_command class StackMigrationExecutor: """Executes the core migration steps for Docker Compose stacks.""" def __init__(self, config: DockerMCPConfig, context_manager: DockerContextManager): self.config = config self.context_manager = context_manager self.stack_tools = StackTools(config, context_manager) self.migration_manager = MigrationManager( transfer_method=config.transfer.method, docker_image=config.transfer.docker_image ) self.backup_manager = BackupManager() self.logger = structlog.get_logger() async def retrieve_compose_file(self, host_id: str, stack_name: str) -> tuple[bool, str, str]: """Retrieve compose file from source host. Args: host_id: Source host ID stack_name: Stack name Returns: Tuple of (success: bool, compose_content: str, compose_path: str) """ try: # Get compose file path compose_file_path = await self.stack_tools.compose_manager.get_compose_file_path( host_id, stack_name ) # Build SSH command for source source_host = self.config.hosts[host_id] ssh_cmd_source = build_ssh_command(source_host) # Read compose file read_cmd = ssh_cmd_source + [f"cat {shlex.quote(compose_file_path)}"] try: result = await asyncio.to_thread( subprocess.run, # nosec B603 read_cmd, capture_output=True, text=True, check=False, timeout=30, ) except subprocess.TimeoutExpired: self.logger.error("Compose read timed out", host_id=host_id, stack_name=stack_name) return False, "", compose_file_path if result.returncode != 0: return False, "", compose_file_path return True, result.stdout, compose_file_path except Exception as e: self.logger.error( "Failed to retrieve compose file", error=str(e), host_id=host_id, stack_name=stack_name, ) return False, "", "" async def validate_host_compatibility( self, source_host: DockerHost, target_host: DockerHost, stack_name: str ) -> tuple[bool, dict[str, Any]]: """Validate source and target host compatibility for migration. Args: source_host: Source host configuration target_host: Target host configuration stack_name: Stack name for migration Returns: Tuple of (success: bool, validation_results: dict) """ validation_results = { "compatibility_checks": {}, "warnings": [], "errors": [] } try: source_ssh = build_ssh_command(source_host) target_ssh = build_ssh_command(target_host) target_appdata = target_host.appdata_path or "/opt/docker-appdata" target_stack_path = f"{target_appdata}/{stack_name}" # Run all validation checks await self._validate_docker_version(source_ssh, target_ssh, validation_results) await self._validate_target_storage(target_ssh, target_appdata, validation_results) await self._validate_network_connectivity(source_ssh, target_host.hostname, validation_results) await self._validate_target_permissions(target_ssh, target_stack_path, validation_results) # Determine overall compatibility overall_success = self._determine_overall_compatibility(validation_results) validation_results["overall_compatible"] = overall_success self._log_validation_results( overall_success, source_host.hostname, target_host.hostname, stack_name, validation_results ) return overall_success, validation_results except Exception as e: validation_results["errors"].append(f"Compatibility validation failed: {str(e)}") validation_results["overall_compatible"] = False self.logger.error( "Host compatibility validation error", source_host=source_host.hostname, target_host=target_host.hostname, error=str(e) ) return False, validation_results async def _validate_docker_version( self, source_ssh: list[str], target_ssh: list[str], validation_results: dict[str, Any] ) -> None: """Validate Docker version compatibility between hosts.""" source_version_cmd = source_ssh + ["docker", "version", "--format", "json"] target_version_cmd = target_ssh + ["docker", "version", "--format", "json"] try: source_result = await asyncio.to_thread( subprocess.run, source_version_cmd, capture_output=True, text=True, check=False, timeout=30 ) target_result = await asyncio.to_thread( subprocess.run, target_version_cmd, capture_output=True, text=True, check=False, timeout=30 ) if source_result.returncode == 0 and target_result.returncode == 0: validation_results["compatibility_checks"]["docker_version"] = { "source_accessible": True, "target_accessible": True, "status": "compatible" } else: validation_results["errors"].append("Failed to verify Docker version compatibility") validation_results["compatibility_checks"]["docker_version"] = { "source_accessible": source_result.returncode == 0, "target_accessible": target_result.returncode == 0, "status": "failed" } except subprocess.TimeoutExpired: validation_results["errors"].append("Docker version check timed out") validation_results["compatibility_checks"]["docker_version"] = {"status": "timeout"} async def _validate_target_storage( self, target_ssh: list[str], target_appdata: str, validation_results: dict[str, Any] ) -> None: """Validate target storage availability.""" storage_check_cmd = target_ssh + ["df", "-h", target_appdata] try: storage_result = await asyncio.to_thread( subprocess.run, storage_check_cmd, capture_output=True, text=True, check=False, timeout=30 ) if storage_result.returncode == 0: validation_results["compatibility_checks"]["storage"] = { "target_path_accessible": True, "status": "available" } else: validation_results["errors"].append(f"Target storage path {target_appdata} not accessible") validation_results["compatibility_checks"]["storage"] = { "target_path_accessible": False, "status": "failed" } except subprocess.TimeoutExpired: validation_results["warnings"].append("Storage check timed out, continuing with migration") validation_results["compatibility_checks"]["storage"] = {"status": "timeout"} async def _validate_network_connectivity( self, source_ssh: list[str], target_hostname: str, validation_results: dict[str, Any] ) -> None: """Validate network connectivity between hosts.""" network_check_cmd = source_ssh + ["ping", "-c", "1", "-W", "5", target_hostname] try: network_result = await asyncio.to_thread( subprocess.run, network_check_cmd, capture_output=True, text=True, check=False, timeout=30 ) validation_results["compatibility_checks"]["network"] = { "accessible": network_result.returncode == 0, "status": "reachable" if network_result.returncode == 0 else "unreachable" } if network_result.returncode != 0: validation_results["warnings"].append("Network connectivity test failed, but migration may still work") except subprocess.TimeoutExpired: validation_results["warnings"].append("Network check timed out") validation_results["compatibility_checks"]["network"] = {"status": "timeout"} async def _validate_target_permissions( self, target_ssh: list[str], target_stack_path: str, validation_results: dict[str, Any] ) -> None: """Validate target directory permissions.""" mkdir_cmd = target_ssh + ["mkdir", "-p", target_stack_path] try: mkdir_result = await asyncio.to_thread( subprocess.run, mkdir_cmd, capture_output=True, text=True, check=False, timeout=30 ) if mkdir_result.returncode == 0: validation_results["compatibility_checks"]["permissions"] = { "directory_creation": True, "status": "writable" } else: validation_results["errors"].append(f"Cannot create target directory {target_stack_path}") validation_results["compatibility_checks"]["permissions"] = { "directory_creation": False, "status": "failed" } except subprocess.TimeoutExpired: validation_results["errors"].append("Directory creation check timed out") validation_results["compatibility_checks"]["permissions"] = {"status": "timeout"} def _determine_overall_compatibility(self, validation_results: dict[str, Any]) -> bool: """Determine overall compatibility based on all validation checks.""" has_errors = len(validation_results["errors"]) > 0 critical_failures = any( check.get("status") == "failed" for check in validation_results["compatibility_checks"].values() if isinstance(check, dict) ) return not has_errors and not critical_failures def _log_validation_results( self, overall_success: bool, source_hostname: str, target_hostname: str, stack_name: str, validation_results: dict[str, Any] ) -> None: """Log validation results.""" if overall_success: self.logger.info( "Host compatibility validation passed", source_host=source_hostname, target_host=target_hostname, stack_name=stack_name, warnings=len(validation_results["warnings"]) ) else: self.logger.error( "Host compatibility validation failed", source_host=source_hostname, target_host=target_hostname, stack_name=stack_name, errors=validation_results["errors"] ) async def create_backup_archive( self, source_host: DockerHost, volume_paths: list[str], stack_name: str, dry_run: bool = False, ) -> tuple[bool, str, dict]: """Create archive of volume data for BACKUP purposes only. WARNING: This method is for backup operations only, not migration! Migrations use direct transfer methods (rsync). Args: source_host: Source host configuration volume_paths: List of volume paths to archive stack_name: Stack name for archive naming dry_run: Whether this is a dry run Returns: Tuple of (success: bool, archive_path: str, metadata: dict) """ if dry_run: # For dry run, create a fake small archive temp_dir = tempfile.mkdtemp(prefix="docker_mcp_dryrun_") archive_path = f"{temp_dir}/{stack_name}_migration_DRYRUN.tar.gz" return ( True, archive_path, { "dry_run": True, "estimated_size": len(volume_paths) * 1024 * 1024 * 100, # 100MB per path estimate "paths_included": volume_paths, }, ) try: ssh_cmd_source = build_ssh_command(source_host) # Create archive using migration manager archive_path = await self.migration_manager.archive_utils.create_archive( ssh_cmd_source, volume_paths, f"{stack_name}_migration" ) # Verify archive integrity is_valid = await self.migration_manager.archive_utils.verify_archive( ssh_cmd_source, archive_path ) if not is_valid: return ( False, archive_path, {"success": False, "error": "Archive integrity verification failed"}, ) # Get archive metadata metadata = { "archive_verified": True, "paths_included": volume_paths, "archive_size": 0, # Would be populated by actual archive creation } return True, archive_path, metadata except Exception as e: self.logger.error( "Failed to create data archive", error=str(e), host_id=source_host.hostname, stack_name=stack_name, ) return False, "", {"success": False, "error": str(e)} async def execute_migration_with_progress( self, source_host: DockerHost, target_host: DockerHost, stack_name: str, volume_paths: list[str], compose_content: str, dry_run: bool = False, progress_callback: Callable[[dict[str, Any]], None] | None = None ) -> tuple[bool, dict[str, Any]]: """Execute migration with detailed progress reporting. Args: source_host: Source host configuration target_host: Target host configuration stack_name: Stack name to migrate volume_paths: List of volume paths to transfer compose_content: Updated compose file content dry_run: Whether this is a dry run progress_callback: Optional callback for progress updates Returns: Tuple of (success: bool, migration_results: dict) """ migration_context = self._initialize_migration_context( source_host, target_host, stack_name ) update_progress = self._create_progress_updater( migration_context, progress_callback ) try: # Execute migration steps sequentially success = await self._execute_migration_steps( migration_context, update_progress, source_host, target_host, stack_name, volume_paths, compose_content, dry_run ) if success: self._finalize_successful_migration(migration_context) return success, migration_context except Exception as e: return self._handle_migration_exception( e, migration_context, update_progress ) def _initialize_migration_context( self, source_host: DockerHost, target_host: DockerHost, stack_name: str ) -> dict[str, Any]: """Initialize migration context with all required fields.""" migration_steps = [ {"name": "validate_compatibility", "description": "Validating host compatibility"}, {"name": "stop_source", "description": "Stopping source stack"}, {"name": "create_backup", "description": "Creating backup of target data"}, {"name": "transfer_data", "description": "Transferring stack data"}, {"name": "deploy_target", "description": "Deploying stack on target"}, {"name": "verify_deployment", "description": "Verifying deployment success"} ] return { "migration_id": f"{source_host.hostname}_{target_host.hostname}_{stack_name}", "total_steps": len(migration_steps), "completed_steps": 0, "current_step": None, "step_results": {}, "overall_success": False, "start_time": datetime.now().isoformat(), "end_time": None, "errors": [], "warnings": [] } def _create_progress_updater( self, migration_context: dict[str, Any], progress_callback: Callable[[dict[str, Any]], None] | None ) -> Callable[[str, str, dict | None], None]: """Create progress update function with logging and callback.""" def update_progress(step_name: str, status: str, details: dict | None = None): migration_context["current_step"] = {"name": step_name, "status": status} if details: migration_context["step_results"][step_name] = details if status == "completed": migration_context["completed_steps"] += 1 # Log progress self.logger.info( "Migration progress update", migration_id=migration_context["migration_id"], step=step_name, status=status, progress=f"{migration_context['completed_steps']}/{migration_context['total_steps']}" ) # Call progress callback if provided if progress_callback: try: progress_callback(migration_context.copy()) except Exception as e: self.logger.warning("Progress callback failed", error=str(e)) return update_progress async def _execute_migration_steps( self, migration_context: dict[str, Any], update_progress: Callable, source_host: DockerHost, target_host: DockerHost, stack_name: str, volume_paths: list[str], compose_content: str, dry_run: bool ) -> bool: """Execute all migration steps in sequence.""" # Step 1: Validate compatibility if not await self._execute_compatibility_step( update_progress, source_host, target_host, stack_name, migration_context, dry_run ): return False # Step 2: Stop source stack if not await self._execute_stop_source_step( update_progress, source_host, stack_name, migration_context, dry_run ): return False # Step 3: Create backup await self._execute_backup_step( update_progress, target_host, stack_name, migration_context, dry_run ) # Step 4: Transfer data if not await self._execute_transfer_step( update_progress, source_host, target_host, volume_paths, stack_name, migration_context, dry_run ): return False # Step 5: Deploy on target if not await self._execute_deploy_step( update_progress, target_host, stack_name, compose_content, migration_context, dry_run ): return False # Step 6: Verify deployment await self._execute_verify_step( update_progress, target_host, stack_name, volume_paths, migration_context, dry_run ) return True async def _execute_compatibility_step( self, update_progress: Callable, source_host: DockerHost, target_host: DockerHost, stack_name: str, migration_context: dict[str, Any], dry_run: bool ) -> bool: """Execute compatibility validation step.""" update_progress("validate_compatibility", "in_progress") compat_success, compat_results = await self.validate_host_compatibility( source_host, target_host, stack_name ) if not compat_success and not dry_run: update_progress("validate_compatibility", "failed", compat_results) migration_context["errors"].append("Host compatibility validation failed") return False update_progress("validate_compatibility", "completed", compat_results) return True async def _execute_stop_source_step( self, update_progress: Callable, source_host: DockerHost, stack_name: str, migration_context: dict[str, Any], dry_run: bool ) -> bool: """Execute source stack stop step.""" update_progress("stop_source", "in_progress") if not dry_run: stop_result = await self.stack_tools.manage_stack( source_host.hostname.replace(".", "_"), stack_name, "down" ) if not stop_result.get("success", False): update_progress("stop_source", "failed", stop_result) migration_context["errors"].append(f"Failed to stop source stack: {stop_result.get('error')}") return False else: stop_result = {"success": True, "dry_run": True} update_progress("stop_source", "completed", stop_result) return True async def _execute_backup_step( self, update_progress: Callable, target_host: DockerHost, stack_name: str, migration_context: dict[str, Any], dry_run: bool ) -> None: """Execute backup creation step.""" update_progress("create_backup", "in_progress") target_appdata = target_host.appdata_path or "/opt/docker-appdata" target_path = f"{target_appdata}/{stack_name}" backup_success, backup_info = await self.create_backup( target_host, target_path, stack_name, dry_run ) backup_results = backup_info if isinstance(backup_info, dict) else {"backup_path": backup_info} if not backup_success and not dry_run: migration_context["warnings"].append("Backup creation failed, continuing with migration") update_progress("create_backup", "completed", backup_results) async def _execute_transfer_step( self, update_progress: Callable, source_host: DockerHost, target_host: DockerHost, volume_paths: list[str], stack_name: str, migration_context: dict[str, Any], dry_run: bool ) -> bool: """Execute data transfer step.""" update_progress("transfer_data", "in_progress") transfer_success, transfer_results = await self.transfer_data( source_host, target_host, volume_paths, stack_name, None, dry_run ) if not transfer_success: update_progress("transfer_data", "failed", transfer_results) migration_context["errors"].append(f"Data transfer failed: {transfer_results.get('error')}") return False update_progress("transfer_data", "completed", transfer_results) return True async def _execute_deploy_step( self, update_progress: Callable, target_host: DockerHost, stack_name: str, compose_content: str, migration_context: dict[str, Any], dry_run: bool ) -> bool: """Execute deployment step.""" update_progress("deploy_target", "in_progress") deploy_success, deploy_results = await self.deploy_stack_on_target( target_host.hostname.replace(".", "_"), stack_name, compose_content, True, dry_run ) if not deploy_success: update_progress("deploy_target", "failed", deploy_results) migration_context["errors"].append(f"Target deployment failed: {deploy_results.get('error')}") return False update_progress("deploy_target", "completed", deploy_results) return True async def _execute_verify_step( self, update_progress: Callable, target_host: DockerHost, stack_name: str, volume_paths: list[str], migration_context: dict[str, Any], dry_run: bool ) -> None: """Execute deployment verification step.""" update_progress("verify_deployment", "in_progress") verify_success, verify_results = await self.verify_deployment( target_host.hostname.replace(".", "_"), stack_name, volume_paths, None, dry_run ) if not verify_success: update_progress("verify_deployment", "failed", verify_results) migration_context["warnings"].append(f"Deployment verification failed: {verify_results.get('error')}") else: update_progress("verify_deployment", "completed", verify_results) def _finalize_successful_migration(self, migration_context: dict[str, Any]) -> None: """Finalize successful migration context.""" migration_context["overall_success"] = True migration_context["end_time"] = datetime.now().isoformat() migration_context["current_step"] = {"name": "completed", "status": "success"} self.logger.info( "Migration completed successfully", migration_id=migration_context["migration_id"], duration_steps=migration_context["completed_steps"], warnings=len(migration_context["warnings"]) ) def _handle_migration_exception( self, exception: Exception, migration_context: dict[str, Any], update_progress: Callable ) -> tuple[bool, dict[str, Any]]: """Handle unexpected migration exceptions.""" current_step = migration_context.get("current_step", {}).get("name", "unknown") update_progress(current_step, "failed", {"error": str(exception)}) migration_context["errors"].append(f"Migration failed at step {current_step}: {str(exception)}") migration_context["overall_success"] = False migration_context["end_time"] = datetime.now().isoformat() self.logger.error( "Migration failed with exception", migration_id=migration_context["migration_id"], step=current_step, error=str(exception) ) return False, migration_context async def transfer_data( self, source_host: DockerHost, target_host: DockerHost, volume_paths: list[str], stack_name: str, path_mappings: dict[str, str] | None = None, dry_run: bool = False, ) -> tuple[bool, dict]: """Transfer volume data directly between hosts (no archiving). Uses rsync for direct directory synchronization Args: source_host: Source host configuration target_host: Target host configuration volume_paths: List of volume paths to transfer path_mappings: Optional mapping of source paths to target paths stack_name: Stack name dry_run: Whether this is a dry run Returns: Tuple of (success: bool, transfer_results: dict) """ if dry_run: return True, { "success": True, "dry_run": True, "transfer_type": "simulated", "estimated_time": "5-10 minutes", } try: # Use migration manager for optimal direct transfer transfer_result = await self.migration_manager.transfer_data( source_host=source_host, target_host=target_host, source_paths=volume_paths, target_path=f"{target_host.appdata_path or '/opt/docker-appdata'}/{stack_name}", stack_name=stack_name, path_mappings=path_mappings, dry_run=dry_run, ) return transfer_result["success"], transfer_result except Exception as e: self.logger.error( "Failed to transfer data", error=str(e), host_id=source_host.hostname, stack_name=stack_name, ) return False, {"success": False, "error": str(e)} async def extract_archive( self, target_host: DockerHost, archive_path: str, target_path: str, dry_run: bool = False ) -> tuple[bool, dict]: """Extract archive on target host for BACKUP/RESTORE operations only. WARNING: This method is for backup/restore operations only, not migration! Migrations use direct transfer methods without archiving. Args: target_host: Target host configuration archive_path: Path to archive file target_path: Target extraction path dry_run: Whether this is a dry run Returns: Tuple of (success: bool, extraction_results: dict) """ if dry_run: return True, { "success": True, "dry_run": True, "extraction_path": target_path, "files_extracted": "simulated", } try: ssh_cmd_target = build_ssh_command(target_host) success = await self.migration_manager.archive_utils.extract_archive( ssh_cmd_target, archive_path, target_path ) if success: return True, { "success": True, "extraction_successful": True, "extraction_path": target_path, } else: return False, {"success": False, "error": "Archive extraction failed"} except Exception as e: self.logger.error("Failed to extract archive", error=str(e)) return False, {"success": False, "error": str(e)} async def deploy_stack_on_target( self, host_id: str, stack_name: str, compose_content: str, start_stack: bool = True, dry_run: bool = False, ) -> tuple[bool, dict]: """Deploy stack on target host. Args: host_id: Target host ID stack_name: Stack name compose_content: Updated compose file content start_stack: Whether to start the stack after deployment dry_run: Whether this is a dry run Returns: Tuple of (success: bool, deployment_results: dict) """ if dry_run: return True, { "success": True, "dry_run": True, "deployment_simulated": True, "stack_would_start": start_stack, } try: # Deploy using stack tools result = await self.stack_tools.deploy_stack( host_id=host_id, stack_name=stack_name, compose_content=compose_content, environment=None, pull_images=True, recreate=False, ) if not result["success"]: return False, result container_ready = False # Start stack if requested if start_stack: start_result = await self.stack_tools.manage_stack( host_id=host_id, stack_name=stack_name, action="up", options=None, ) if not start_result["success"]: return False, { "deploy_success": True, "start_success": False, "start_error": start_result.get("error"), } # Wait for containers to fully start after deployment try: await asyncio.sleep(2) # Initial delay for deployment to settle # Poll for container readiness for attempt in range(10): # Up to 10 seconds # Check if container exists and is running target_host = self.config.hosts[host_id] ssh_cmd = build_ssh_command(target_host) check_cmd = ssh_cmd + [ "sh", "-c", ( "docker ps --filter " f"'label=com.docker.compose.project={shlex.quote(stack_name)}' " "--format '{{{{.Names}}}}' | grep -q . && " "echo 'RUNNING' || echo 'NOT_READY'" ), ] # typing.cast import at module level (line 10) result = cast( subprocess.CompletedProcess[str], await asyncio.to_thread( subprocess.run, # nosec B603 check_cmd, capture_output=True, text=True, check=False, timeout=10, ), ) # Safely check if result has the expected stdout attribute and contains "RUNNING" if ( result.returncode == 0 and hasattr(result, "stdout") and isinstance(result.stdout, str) and "RUNNING" in result.stdout ): container_ready = True self.logger.info( "Container ready for verification", stack_name=stack_name, host_id=host_id, attempt=attempt + 1, ) break await asyncio.sleep(1) else: self.logger.warning( "Container may not be fully ready for verification", stack_name=stack_name, host_id=host_id, ) except Exception as e: self.logger.warning("Container readiness check failed", error=str(e)) # Continue anyway - verification will handle missing containers return True, { "success": True, "deploy_success": True, "start_success": start_stack, "stack_deployed": True, "container_ready": container_ready, } except Exception as e: self.logger.error("Failed to deploy stack on target", error=str(e)) return False, {"success": False, "error": str(e)} async def verify_deployment( self, host_id: str, stack_name: str, expected_mounts: list[str], source_inventory: dict[str, Any] | None = None, dry_run: bool = False, ) -> tuple[bool, dict]: """Verify deployment success and data integrity. Args: host_id: Target host ID stack_name: Stack name expected_mounts: List of expected mount points source_inventory: Source data inventory for comparison dry_run: Whether this is a dry run Returns: Tuple of (success: bool, verification_results: dict) """ if dry_run: return True, { "success": True, "dry_run": True, "verification_simulated": True, "data_integrity": "would_be_verified", } try: target_host = self.config.hosts[host_id] ssh_cmd_target = build_ssh_command(target_host) target_appdata = target_host.appdata_path or "/opt/docker-appdata" # Verify container integration container_verification = await self.migration_manager.verify_container_integration( ssh_cmd_target, stack_name, target_appdata, expected_mounts ) # Verify data completeness if source inventory available data_verification = {"success": True, "details": "No source inventory provided"} if source_inventory: data_verification = await self.migration_manager.verify_migration_completeness( ssh_cmd_target, source_inventory, f"{target_appdata}/{stack_name}" ) # Extract the actual success from nested structure container_success: bool = bool( container_verification.get("container_integration", {}).get("success", False) ) # Handle both top-level and nested success fields data_success: bool = bool( data_verification.get("success") or data_verification.get("data_transfer", {}).get("success", False) ) overall_success: bool = container_success and data_success return overall_success, { "success": overall_success, "container_integration": container_verification, "data_verification": data_verification, "overall_verification": overall_success, } except Exception as e: self.logger.error("Failed to verify deployment", error=str(e)) return False, {"success": False, "error": str(e)} async def cleanup_source( self, host_id: str, stack_name: str, compose_path: str, remove_data: bool = False, dry_run: bool = False, ) -> tuple[bool, dict]: """Clean up source stack after successful migration. Args: host_id: Source host ID stack_name: Stack name compose_path: Path to source compose file remove_data: Whether to remove data as well dry_run: Whether this is a dry run Returns: Tuple of (success: bool, cleanup_results: dict) """ if dry_run: return True, { "success": True, "dry_run": True, "cleanup_simulated": True, "would_remove_compose": True, "would_remove_data": remove_data, } try: source_host = self.config.hosts[host_id] ssh_cmd_source = build_ssh_command(source_host) # Stop stack first stop_result = await self.stack_tools.manage_stack( host_id=host_id, stack_name=stack_name, action="down", options=None, ) # Remove compose file (safe operation) remove_cmd = ssh_cmd_source + [f"rm -f {shlex.quote(compose_path)}"] result = await asyncio.to_thread( subprocess.run, # nosec B603 remove_cmd, check=False, timeout=10, ) cleanup_results = { "stack_stopped": stop_result.get("success", False), "compose_removed": result.returncode == 0, "data_removed": False, } # Remove data if requested (more dangerous) if remove_data: source_appdata = source_host.appdata_path or "/opt/docker-appdata" stack_data_path = f"{source_appdata}/{stack_name}" # Use safety-validated removal ( remove_success, remove_message, ) = await self.migration_manager.safety.safe_delete_file( ssh_cmd_source, stack_data_path, f"Cleanup source data for {stack_name}" ) cleanup_results["data_removed"] = remove_success cleanup_results["data_removal_message"] = remove_message overall_success = cleanup_results["compose_removed"] if remove_data: overall_success = overall_success and cleanup_results["data_removed"] return overall_success, cleanup_results except Exception as e: self.logger.error("Failed to cleanup source", error=str(e)) return False, {"success": False, "error": str(e)} async def create_backup( self, target_host: DockerHost, target_path: str, stack_name: str, dry_run: bool = False ) -> tuple[bool, dict]: """Create backup of existing target data before migration. Args: target_host: Target host configuration target_path: Path to backup stack_name: Stack name for backup naming dry_run: Whether this is a dry run Returns: Tuple of (success: bool, backup_info: dict) """ if dry_run: return True, { "success": True, "dry_run": True, "backup_simulated": True, "backup_path": f"simulated_backup_{stack_name}", } try: backup_info = await self.backup_manager.backup_directory( host=target_host, source_path=target_path, stack_name=stack_name, backup_reason="Pre-migration backup", ) # Convert BackupInfo to dict and determine success backup_dict = backup_info.model_dump() # Consider backup successful if backup_path is not None success = backup_info.backup_path is not None return success, backup_dict except Exception as e: self.logger.error("Failed to create backup", error=str(e)) return False, {"success": False, "error": str(e)} async def restore_from_backup( self, target_host: DockerHost, backup_info: dict, dry_run: bool = False ) -> tuple[bool, str]: """Restore from backup in case of migration failure. Args: target_host: Target host configuration backup_info: Backup information from create_backup dry_run: Whether this is a dry run Returns: Tuple of (success: bool, message: str) """ if dry_run: return True, "Dry run - backup restore would be performed" try: # Convert dict back to BackupInfo object backup_obj = BackupInfo.model_validate(backup_info) success, message = await self.backup_manager.restore_directory_backup( host=target_host, backup_info=backup_obj ) return success, message except Exception as e: error_msg = f"Failed to restore from backup: {str(e)}" self.logger.error("Backup restore failed", error=str(e)) return False, error_msg def update_compose_for_target( self, compose_content: str, old_paths: dict[str, str], target_appdata: str, stack_name: str ) -> str: """Update compose file paths for target environment. Args: compose_content: Original compose content old_paths: Mapping of old to new paths target_appdata: Target appdata path stack_name: Stack name Returns: Updated compose content """ try: # Use migration manager's path update functionality updated_content = self.migration_manager.update_compose_for_migration( compose_content=compose_content, old_paths=old_paths, new_base_path=target_appdata, target_appdata_path=target_appdata, ) return updated_content except Exception as e: self.logger.error("Failed to update compose paths", error=str(e)) return compose_content # Return original on error

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server