Skip to main content
Glama
manager.py15.2 kB
"""Main migration orchestrator for Docker stack transfers.""" import asyncio import json import shlex import subprocess from typing import Any import structlog from ..config_loader import DockerHost from ..exceptions import DockerMCPError from ..transfer import ArchiveUtils, ContainerizedRsyncTransfer, RsyncTransfer from .verification import MigrationVerifier from .volume_parser import VolumeParser logger = structlog.get_logger() class MigrationError(DockerMCPError): """Migration operation failed.""" pass class MigrationManager: """Orchestrates Docker stack migrations between hosts using modular components.""" def __init__(self, transfer_method: str = "rsync", docker_image: str = "instrumentisto/rsync-ssh:latest"): self.logger = logger.bind(component="migration_manager") self.transfer_method = transfer_method self.docker_image = docker_image # Initialize focused components self.volume_parser = VolumeParser() self.verifier = MigrationVerifier() # Initialize transfer methods self.archive_utils = ArchiveUtils() self.rsync_transfer = RsyncTransfer() self.containerized_rsync_transfer = ContainerizedRsyncTransfer(docker_image) async def choose_transfer_method( self, source_host: DockerHost, target_host: DockerHost ) -> tuple[str, Any]: """Choose transfer method based on configuration. Args: source_host: Source host configuration target_host: Target host configuration Returns: Tuple of (transfer_type: str, transfer_instance) """ if self.transfer_method == "containerized": self.logger.info( "Using containerized rsync transfer for permission handling", docker_image=self.docker_image ) return "containerized_rsync", self.containerized_rsync_transfer else: self.logger.info("Using standard rsync transfer for universal compatibility") return "rsync", self.rsync_transfer async def verify_containers_stopped( self, ssh_cmd: list[str], stack_name: str, force_stop: bool = False, ) -> tuple[bool, list[str]]: """Verify all containers in a stack are stopped. Args: ssh_cmd: SSH command parts for remote execution stack_name: Stack name to check force_stop: Force stop running containers Returns: Tuple of (all_stopped, list_of_running_containers) """ compose_cmd = ( "docker compose " "--ansi never " f"--project-name {shlex.quote(stack_name)} " "ps --format json" ) check_cmd = ssh_cmd + [compose_cmd] result = await asyncio.to_thread( subprocess.run, # nosec B603 check_cmd, check=False, capture_output=True, text=True, timeout=300, ) if result.returncode != 0: error_message = result.stderr.strip() or result.stdout.strip() or "unknown error" self.logger.error( "docker compose ps verification failed", stack=stack_name, error=error_message, ) raise MigrationError( f"docker compose ps failed while verifying shutdown: {error_message}" ) running_containers: list[str] = [] for line in result.stdout.splitlines(): payload = line.strip() if not payload: continue try: entry = json.loads(payload) except json.JSONDecodeError: self.logger.warning( "Failed to parse docker compose ps output line", line=payload ) continue state_value = str(entry.get("State") or entry.get("state") or "").lower() if state_value.startswith("running") or state_value.startswith("up"): name = entry.get("Name") or entry.get("name") or entry.get("ID") if name: running_containers.append(str(name)) if not running_containers: return True, [] if force_stop: self.logger.info( "Force stopping containers", stack=stack_name, containers=running_containers, ) # Force stop each container for container in running_containers: stop_cmd = ssh_cmd + [f"docker kill {shlex.quote(container)}"] await asyncio.to_thread( subprocess.run, # nosec B603 stop_cmd, check=False, capture_output=True, text=True, timeout=60, ) # Wait for containers to stop and processes to fully terminate await asyncio.sleep(10) # Increased from 3s to ensure complete shutdown # Re-check return await self.verify_containers_stopped(ssh_cmd, stack_name, force_stop=False) return False, running_containers async def prepare_target_directories( self, ssh_cmd: list[str], appdata_path: str, stack_name: str, ) -> str: """Prepare target directories for migration. Args: ssh_cmd: SSH command parts for remote execution appdata_path: Base appdata path on target host stack_name: Stack name for directory organization Returns: Path to stack-specific appdata directory """ # Create stack-specific directory stack_dir = f"{appdata_path}/{stack_name}" mkdir_cmd = f"mkdir -p {shlex.quote(stack_dir)}" full_cmd = ssh_cmd + [mkdir_cmd] result = await asyncio.to_thread( subprocess.run, # nosec B603 full_cmd, check=False, capture_output=True, text=True, timeout=300, ) if result.returncode != 0: raise MigrationError(f"Failed to create target directory: {result.stderr}") self.logger.info( "Prepared target directory", path=stack_dir, ) return stack_dir async def transfer_data( self, source_host: DockerHost, target_host: DockerHost, source_paths: list[str], target_path: str, stack_name: str, path_mappings: dict[str, str] | None = None, dry_run: bool = False, ) -> dict[str, Any]: """Transfer data between hosts using the optimal method. Args: source_host: Source host configuration target_host: Target host configuration source_paths: List of paths to transfer from source target_path: Target path on destination stack_name: Stack name for organization path_mappings: Optional mapping of specific source paths to target paths dry_run: Whether this is a dry run Returns: Transfer result dictionary """ if not source_paths: return {"success": True, "message": "No data to transfer", "transfer_type": "none"} # Choose transfer method transfer_type, transfer_instance = await self.choose_transfer_method( source_host, target_host ) self.logger.info( "Selected transfer method for migration", transfer_type=transfer_type, source_host=source_host.hostname, target_host=target_host.hostname, source_paths_count=len(source_paths) ) # Use rsync transfer - direct directory synchronization # Rsync transfer - direct directory synchronization (no archiving) if dry_run: return { "success": True, "message": f"Dry run - would transfer via {transfer_type}", "transfer_type": transfer_type, } # For rsync, directly sync each source path to target transfer_results = [] overall_success = True target_dirs_created: set[str] = set() ssh_cmd_target = self.rsync_transfer.build_ssh_cmd(target_host) for source_path in source_paths: normalized_source_path = self._normalize_source_path(source_path, source_host) try: desired_target_path = ( path_mappings.get(source_path) if path_mappings and source_path in path_mappings else target_path ) if desired_target_path and desired_target_path not in target_dirs_created: await self._ensure_remote_directory(ssh_cmd_target, desired_target_path) target_dirs_created.add(desired_target_path) result = await transfer_instance.transfer( source_host=source_host, target_host=target_host, source_path=normalized_source_path, target_path=desired_target_path, compress=True, delete=False, # Safety: don't delete target files ) result.setdefault("metadata", {})["original_source_path"] = source_path transfer_results.append(result) if not result.get("success", False): overall_success = False except Exception as e: overall_success = False transfer_results.append( {"success": False, "error": str(e), "source_path": source_path} ) final_result = { "success": overall_success, "transfer_type": transfer_type, "transfers": transfer_results, "paths_transferred": len([r for r in transfer_results if r.get("success", False)]), "total_paths": len(source_paths), } if not overall_success: # Extract first error for detailed reporting first_error = next( (r.get("error") for r in transfer_results if r.get("error")), "Unknown transfer error" ) final_result["error"] = first_error final_result["message"] = f"Transfer failed: {first_error}" else: final_result["message"] = ( f"Successfully transferred {final_result['paths_transferred']} paths via {transfer_type}" ) return final_result async def _ensure_remote_directory(self, ssh_cmd: list[str], directory: str) -> None: """Ensure a remote directory exists before data transfer.""" mkdir_cmd = ssh_cmd + [f"mkdir -p {shlex.quote(directory)}"] result = await asyncio.to_thread( subprocess.run, # nosec B603 mkdir_cmd, check=False, capture_output=True, text=True, timeout=120, ) if result.returncode != 0: error_message = result.stderr.strip() or result.stdout.strip() or "unknown error" raise MigrationError( f"Failed to prepare remote directory {directory}: {error_message}" ) def _normalize_source_path(self, source_path: str, source_host: DockerHost) -> str: """Normalize rsync source paths to local form when prefixed with the host name. Rsync accepts paths such as ``host:/remote/path`` to identify remote sources. During migration we execute rsync *on* the source host itself, so the source segment must be a local filesystem path. Some call sites pass values like ``"squirts:/mnt/data"`` which causes rsync to treat the path as a secondary remote target. This helper strips prefixes that simply repeat the current source host identifier while leaving genuine remote mounts (e.g. an NFS path ``nfs-server:/exports``) untouched. Args: source_path: Path string that may include an SCP-style host prefix. source_host: Host configuration for the migration source. Returns: Sanitized local path suitable for rsync execution on the source host. """ if not source_path or ":" not in source_path: return source_path prefix, remainder = source_path.split(":", 1) # Only normalize absolute paths (rsync remote syntax). Leave anything else untouched. if not remainder.startswith("/"): return source_path normalized_prefix = prefix.strip("'\"") if "@" in normalized_prefix: normalized_prefix = normalized_prefix.split("@", 1)[1] normalized_prefix = normalized_prefix.strip("[]") host_aliases = {source_host.hostname} if "." in source_host.hostname: host_aliases.add(source_host.hostname.split(".", 1)[0]) host_aliases.update({"localhost", "127.0.0.1"}) if normalized_prefix in host_aliases: return remainder return source_path # Delegate methods to focused components async def parse_compose_volumes( self, compose_content: str, source_appdata_path: str = None ) -> dict[str, Any]: """Delegate to VolumeParser.""" return await self.volume_parser.parse_compose_volumes(compose_content, source_appdata_path) async def get_volume_locations( self, ssh_cmd: list[str], named_volumes: list[str] ) -> dict[str, str]: """Delegate to VolumeParser.""" return await self.volume_parser.get_volume_locations(ssh_cmd, named_volumes) def update_compose_for_migration( self, compose_content: str, old_paths: dict[str, str], new_base_path: str, target_appdata_path: str = None, ) -> str: """Delegate to VolumeParser.""" return self.volume_parser.update_compose_for_migration( compose_content, old_paths, new_base_path, target_appdata_path ) async def create_source_inventory( self, ssh_cmd: list[str], volume_paths: list[str] ) -> dict[str, Any]: """Delegate to MigrationVerifier.""" return await self.verifier.create_source_inventory(ssh_cmd, volume_paths) async def verify_migration_completeness( self, ssh_cmd: list[str], source_inventory: dict[str, Any], target_path: str ) -> dict[str, Any]: """Delegate to MigrationVerifier.""" return await self.verifier.verify_migration_completeness( ssh_cmd, source_inventory, target_path ) async def verify_container_integration( self, ssh_cmd: list[str], stack_name: str, expected_appdata_path: str, expected_volumes: list[str], ) -> dict[str, Any]: """Delegate to MigrationVerifier.""" return await self.verifier.verify_container_integration( ssh_cmd, stack_name, expected_appdata_path, expected_volumes )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server