Skip to main content
Glama
backup.py11.7 kB
"""Backup and restore operations for migration rollback capability.""" import asyncio import shlex import subprocess from datetime import UTC, datetime from pathlib import Path import structlog from pydantic import BaseModel, Field from ..utils import build_ssh_command, format_size from .config_loader import DockerHost from .exceptions import DockerMCPError from .safety import MigrationSafety logger = structlog.get_logger() # Timeout constants for backup operations BACKUP_TIMEOUT_SECONDS = 300 # 5 minutes for backup operations CHECK_TIMEOUT_SECONDS = 30 # 30 seconds for status checks class BackupInfo(BaseModel): """Backup information with validation and schema guarantees.""" success: bool = Field(description="Whether backup was successful") type: str = Field(description="Type of backup (directory)") host_id: str = Field(description="Host identifier where backup was created") source_path: str | None = Field( default=None, description="Source directory path for directory backups" ) backup_path: str | None = Field( default=None, description="Path to backup file (for directory backups)" ) backup_size: int = Field(description="Size of backup in bytes") backup_size_human: str = Field(description="Human-readable backup size") timestamp: str = Field(description="Timestamp when backup was created") reason: str = Field(description="Reason for creating the backup") stack_name: str = Field(description="Stack name associated with the backup") created_at: str = Field(description="ISO 8601 creation timestamp") class BackupError(DockerMCPError): """Backup operation failed.""" pass class BackupManager: """Manager for creating and managing backups during migrations.""" def __init__(self): self.logger = logger.bind(component="backup_manager") self.safety = MigrationSafety() self.backups: list[BackupInfo] = [] async def backup_directory( self, host: DockerHost, source_path: str, stack_name: str, backup_reason: str = "Pre-migration backup", ) -> BackupInfo: """Create a backup of a directory using tar. Args: host: Host configuration source_path: Directory path to backup stack_name: Stack name for backup naming backup_reason: Reason for backup (for audit trail) Returns: Backup information dictionary """ timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") backup_filename = f"backup_{stack_name}_{timestamp}.tar.gz" remote_tmp_dir = "/tmp/docker_mcp_backups" # noqa: S108 - Remote temp dir, not local backup_path = f"{remote_tmp_dir}/{backup_filename}" ssh_cmd = build_ssh_command(host) # Check if source path exists check_cmd = ssh_cmd + [ "sh", "-lc", f"test -d {shlex.quote(source_path)} && echo 'EXISTS' || echo 'NOT_FOUND'", ] try: result = await asyncio.to_thread( subprocess.run, # nosec B603 check_cmd, capture_output=True, text=True, check=False, timeout=CHECK_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired: logger.error( "Source path check timed out", host_id=host.hostname, source_path=source_path, timeout_seconds=CHECK_TIMEOUT_SECONDS, ) raise BackupError( f"Source path check timed out after {CHECK_TIMEOUT_SECONDS} seconds" ) from None if "NOT_FOUND" in result.stdout: self.logger.info("Source path does not exist, no backup needed", path=source_path) return BackupInfo( success=True, type="directory", host_id=host.hostname, source_path=source_path, backup_path=None, backup_size=0, backup_size_human="0 B", timestamp=datetime.now(UTC).strftime("%Y%m%d_%H%M%S"), reason=backup_reason, stack_name=stack_name, created_at=datetime.now(UTC).isoformat(), ) # Create backup using tar backup_cmd = ssh_cmd + [ "sh", "-lc", ( f"mkdir -p {shlex.quote(remote_tmp_dir)} && " f"cd {shlex.quote(str(Path(source_path).parent))} && " f"tar czf {shlex.quote(backup_path)} {shlex.quote(Path(source_path).name)} " "2>/dev/null && echo 'BACKUP_SUCCESS' || echo 'BACKUP_FAILED'" ), ] self.logger.info( "Creating directory backup", source=source_path, backup=backup_path, host=host.hostname, reason=backup_reason, ) try: result = await asyncio.to_thread( subprocess.run, # nosec B603 backup_cmd, capture_output=True, text=True, check=False, timeout=BACKUP_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired: logger.error( "Backup operation timed out", host_id=host.hostname, source_path=source_path, backup_path=backup_path, timeout_seconds=BACKUP_TIMEOUT_SECONDS, ) # Try to clean up partial backup cleanup_cmd = ssh_cmd + ["rm", "-f", shlex.quote(backup_path)] try: await asyncio.to_thread( subprocess.run, # nosec B603 cleanup_cmd, capture_output=True, check=False, timeout=CHECK_TIMEOUT_SECONDS, ) except Exception as cleanup_err: logger.warning("Failed to cleanup partial backup", error=str(cleanup_err)) raise BackupError( f"Backup operation timed out after {BACKUP_TIMEOUT_SECONDS} seconds" ) from None if "BACKUP_FAILED" in result.stdout or result.returncode != 0: raise BackupError(f"Failed to create backup: {result.stderr}") # Get backup size size_cmd = ssh_cmd + [ "sh", "-lc", f"stat -c%s {shlex.quote(backup_path)} 2>/dev/null || echo '0'", ] backup_size = 0 # Initialize to prevent UnboundLocalError try: size_result = await asyncio.to_thread( subprocess.run, # nosec B603 size_cmd, capture_output=True, text=True, check=False, timeout=CHECK_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired: logger.warning( "Backup size check timed out", host_id=host.hostname, backup_path=backup_path, timeout_seconds=CHECK_TIMEOUT_SECONDS, ) backup_size = 0 # Default to 0 if we can't check size except Exception as e: logger.warning( "Backup size check failed", host_id=host.hostname, backup_path=backup_path, error=str(e), ) backup_size = 0 # Default to 0 on any other error else: backup_size = ( int(size_result.stdout.strip()) if size_result.stdout.strip().isdigit() else 0 ) # Create backup record backup_info = BackupInfo( success=True, type="directory", host_id=host.hostname, source_path=source_path, backup_path=backup_path, backup_size=backup_size, backup_size_human=format_size(backup_size), timestamp=timestamp, reason=backup_reason, stack_name=stack_name, created_at=datetime.now(UTC).isoformat(), ) self.backups.append(backup_info) self.logger.info( "Directory backup created successfully", backup=backup_path, size=backup_info.backup_size_human, host=host.hostname, ) return backup_info async def restore_directory_backup( self, host: DockerHost, backup_info: BackupInfo ) -> tuple[bool, str]: """Restore a directory from backup. Args: host: Host configuration backup_info: Backup information from backup_directory() Returns: Tuple of (success: bool, message: str) """ if backup_info.type != "directory": return False, f"Not a directory backup: {backup_info.type}" backup_path = backup_info.backup_path source_path = backup_info.source_path if not backup_path: return True, "No backup to restore (directory didn't exist)" if not source_path: return False, "No source path specified in backup info" ssh_cmd = build_ssh_command(host) # Remove current directory and restore from backup restore_cmd = ssh_cmd + [ "sh", "-c", ( f"rm -rf {shlex.quote(source_path)} && " f"cd {shlex.quote(str(Path(source_path).parent))} && " f"tar xzf {shlex.quote(backup_path)} && " f"echo 'RESTORE_SUCCESS' || echo 'RESTORE_FAILED'" ), ] self.logger.info( "Restoring directory from backup", backup=backup_path, target=source_path, host=host.hostname, ) try: result = await asyncio.to_thread( subprocess.run, # nosec B603 restore_cmd, capture_output=True, text=True, check=False, timeout=BACKUP_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired: logger.error( "Backup restore operation timed out", host_id=host.hostname, backup_path=backup_path, source_path=source_path, timeout_seconds=BACKUP_TIMEOUT_SECONDS, ) raise BackupError( f"Backup restore operation timed out after {BACKUP_TIMEOUT_SECONDS} seconds" ) from None if "RESTORE_FAILED" in result.stdout or result.returncode != 0: return False, f"Failed to restore backup: {result.stderr}" return True, f"Directory restored from backup: {backup_path}" async def cleanup_backup(self, host: DockerHost, backup_info: BackupInfo) -> tuple[bool, str]: """Clean up a backup after successful migration. Args: host: Host configuration backup_info: Backup information Returns: Tuple of (success: bool, message: str) """ if backup_info.type == "directory": if not backup_info.backup_path: return True, "No backup file to clean up" success, message = await self.safety.safe_delete_file( build_ssh_command(host), backup_info.backup_path, f"Cleanup backup for {backup_info.stack_name}", ) return success, message return False, f"Unknown backup type: {backup_info.type}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server