Skip to main content
Glama
safety.py9.33 kB
"""Safety guards and validation for destructive operations.""" import asyncio import shlex import subprocess import tempfile from datetime import UTC, datetime from pathlib import Path from typing import Any import structlog from .exceptions import DockerMCPError logger = structlog.get_logger() # Timeout for remote deletion operations DELETE_TIMEOUT_SECONDS = 30 class SafetyError(DockerMCPError): """Safety validation failed.""" pass class MigrationSafety: """Safety guards for migration operations to prevent accidental data loss.""" # Allowed paths for safe deletion operations SAFE_DELETE_PATHS = [ tempfile.gettempdir(), # System temp directory "/var/tmp", # Standard UNIX temp directory # noqa: S108 "/opt/migration_temp", # Add more safe temporary directories as needed ] # Paths that should NEVER be deleted FORBIDDEN_PATHS = [ "/", "/bin", "/boot", "/dev", "/etc", "/lib", "/proc", "/root", "/sbin", "/sys", "/usr", "/var/log", "/var/lib", "/home", "/mnt", "/opt", # Top level - subdirs might be ok ] def __init__(self): self.logger = logger.bind(component="migration_safety") self.deletion_manifest: list[dict[str, Any]] = [] def validate_deletion_path(self, file_path: str) -> tuple[bool, str]: """Validate that a path is safe to delete. Args: file_path: Path to validate for deletion Returns: Tuple of (is_safe: bool, reason: str) """ try: # Resolve path to handle symlinks and relative paths resolved_path = str(Path(file_path).resolve()) # Check for parent directory traversal attempts first if any(part == ".." for part in Path(file_path).parts): return False, f"Path '{file_path}' contains parent directory traversal" # SECURITY: Check for forbidden paths BEFORE safe paths to prevent bypassing if forbidden_path := self._get_forbidden_path(resolved_path): return False, f"Path '{resolved_path}' is in forbidden directory '{forbidden_path}'" # Check if path is in safe deletion areas if self._is_in_safe_area(resolved_path): return True, f"Path in safe area: {resolved_path}" # Validate files outside safe areas return self._validate_file_outside_safe_area(file_path, resolved_path) except Exception as e: return False, f"Path validation error: {str(e)}" def _is_in_safe_area(self, resolved_path: str) -> bool: """Check if path is in a safe deletion area.""" res_path = Path(resolved_path) for safe_path in self.SAFE_DELETE_PATHS: safe_base = Path(safe_path).resolve() if res_path.is_relative_to(safe_base) and res_path != safe_base: return True return False def _get_forbidden_path(self, resolved_path: str) -> str | None: """Get forbidden path if resolved path is forbidden, None otherwise.""" res_path = Path(resolved_path) for forbidden in self.FORBIDDEN_PATHS: forb_base = Path(forbidden).resolve() if res_path == forb_base or res_path.is_relative_to(forb_base): return str(forb_base) return None def _validate_file_outside_safe_area( self, file_path: str, resolved_path: str ) -> tuple[bool, str]: """Validate files outside safe areas.""" # Use resolved path to prevent bypass via symlinks resolved_name = Path(resolved_path).name if resolved_name.endswith((".tar.gz", ".tar", ".zip", ".tmp", ".temp", ".migration")): return True, f"File type allowed: {resolved_name}" # Allow specific filenames filename = resolved_name if filename in ("docker-compose.yml", "docker-compose.yaml"): return True, f"Docker compose file allowed: {resolved_name}" return False, f"Path '{resolved_path}' is not in safe deletion area" def add_to_deletion_manifest(self, file_path: str, operation: str, reason: str) -> None: """Add a deletion operation to the manifest for audit trail. Args: file_path: Path to be deleted operation: Type of operation (rm, rm -f, rm -rf, etc.) reason: Reason for deletion """ manifest_entry = { "path": file_path, "operation": operation, "reason": reason, "timestamp": datetime.now(UTC).isoformat(), "validated": False, } # Validate the path is_safe, validation_reason = self.validate_deletion_path(file_path) manifest_entry["validated"] = is_safe manifest_entry["validation_reason"] = validation_reason self.deletion_manifest.append(manifest_entry) self.logger.info( "Added deletion to manifest", path=file_path, operation=operation, safe=is_safe, reason=validation_reason, ) def get_deletion_manifest(self) -> list[dict[str, Any]]: """Get a copy of the current deletion manifest to prevent external mutation.""" import copy return copy.deepcopy(self.deletion_manifest) def clear_deletion_manifest(self) -> None: """Clear the deletion manifest.""" self.deletion_manifest.clear() async def safe_delete_file( self, ssh_cmd: list[str], file_path: str, reason: str = "Migration cleanup" ) -> tuple[bool, str]: """Safely delete a file with validation and audit trail. Args: ssh_cmd: SSH command parts for remote execution file_path: Path to file to delete reason: Reason for deletion Returns: Tuple of (success: bool, message: str) """ # Add to manifest self.add_to_deletion_manifest(file_path, "rm -f", reason) # Validate path is_safe, validation_reason = self.validate_deletion_path(file_path) if not is_safe: error_msg = f"SAFETY BLOCK: {validation_reason}" self.logger.error( "File deletion blocked by safety check", path=file_path, reason=validation_reason ) raise SafetyError(error_msg) # Proceed with deletion with proper argument separation and path safety delete_cmd = ssh_cmd + ["rm", "-f", "--", shlex.quote(file_path)] try: result = await asyncio.to_thread( subprocess.run, # nosec B603 delete_cmd, check=False, capture_output=True, text=True, timeout=DELETE_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired: error_msg = f"Deletion timeout after {DELETE_TIMEOUT_SECONDS}s" self.logger.error( "File deletion timeout", path=file_path, timeout=DELETE_TIMEOUT_SECONDS ) return False, error_msg except Exception as e: error_msg = f"Deletion error: {str(e)}" self.logger.error("File deletion exception", path=file_path, error=str(e)) return False, error_msg if result.returncode == 0: self.logger.info("File deleted safely", path=file_path, reason=reason) return True, f"File deleted: {file_path}" else: error_msg = f"Deletion failed: {result.stderr}" self.logger.error("File deletion failed", path=file_path, error=result.stderr) return False, error_msg async def safe_cleanup_archive( self, ssh_cmd: list[str], archive_path: str, reason: str = "Migration cleanup" ) -> tuple[bool, str]: """Safely cleanup archive files with validation. Args: ssh_cmd: SSH command parts for remote execution archive_path: Path to archive file reason: Reason for cleanup Returns: Tuple of (success: bool, message: str) """ # Validate archive path if not archive_path.endswith((".tar.gz", ".tar", ".zip")): return False, f"Not an archive file: {archive_path}" # Use safe delete return await self.safe_delete_file(ssh_cmd, archive_path, reason) def create_safety_report(self) -> dict[str, Any]: """Create a safety report of all deletion operations.""" total_deletions = len(self.deletion_manifest) validated_deletions = len([m for m in self.deletion_manifest if m["validated"]]) blocked_deletions = total_deletions - validated_deletions return { "total_deletion_attempts": total_deletions, "validated_deletions": validated_deletions, "blocked_deletions": blocked_deletions, "safety_rate": validated_deletions / total_deletions * 100 if total_deletions > 0 else 100, "manifest": self.deletion_manifest.copy(), "safe_paths": self.SAFE_DELETE_PATHS, "forbidden_paths": self.FORBIDDEN_PATHS, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server