"""Safety guards and validation for destructive operations."""
import asyncio
import shlex
import subprocess
import tempfile
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import structlog
from .exceptions import DockerMCPError
logger = structlog.get_logger()
# Timeout for remote deletion operations
DELETE_TIMEOUT_SECONDS = 30
class SafetyError(DockerMCPError):
"""Safety validation failed."""
pass
class MigrationSafety:
"""Safety guards for migration operations to prevent accidental data loss."""
# Allowed paths for safe deletion operations
SAFE_DELETE_PATHS = [
tempfile.gettempdir(), # System temp directory
"/var/tmp", # Standard UNIX temp directory # noqa: S108
"/opt/migration_temp",
# Add more safe temporary directories as needed
]
# Paths that should NEVER be deleted
FORBIDDEN_PATHS = [
"/",
"/bin",
"/boot",
"/dev",
"/etc",
"/lib",
"/proc",
"/root",
"/sbin",
"/sys",
"/usr",
"/var/log",
"/var/lib",
"/home",
"/mnt",
"/opt", # Top level - subdirs might be ok
]
def __init__(self):
self.logger = logger.bind(component="migration_safety")
self.deletion_manifest: list[dict[str, Any]] = []
def validate_deletion_path(self, file_path: str) -> tuple[bool, str]:
"""Validate that a path is safe to delete.
Args:
file_path: Path to validate for deletion
Returns:
Tuple of (is_safe: bool, reason: str)
"""
try:
# Resolve path to handle symlinks and relative paths
resolved_path = str(Path(file_path).resolve())
# Check for parent directory traversal attempts first
if any(part == ".." for part in Path(file_path).parts):
return False, f"Path '{file_path}' contains parent directory traversal"
# SECURITY: Check for forbidden paths BEFORE safe paths to prevent bypassing
if forbidden_path := self._get_forbidden_path(resolved_path):
return False, f"Path '{resolved_path}' is in forbidden directory '{forbidden_path}'"
# Check if path is in safe deletion areas
if self._is_in_safe_area(resolved_path):
return True, f"Path in safe area: {resolved_path}"
# Validate files outside safe areas
return self._validate_file_outside_safe_area(file_path, resolved_path)
except Exception as e:
return False, f"Path validation error: {str(e)}"
def _is_in_safe_area(self, resolved_path: str) -> bool:
"""Check if path is in a safe deletion area."""
res_path = Path(resolved_path)
for safe_path in self.SAFE_DELETE_PATHS:
safe_base = Path(safe_path).resolve()
if res_path.is_relative_to(safe_base) and res_path != safe_base:
return True
return False
def _get_forbidden_path(self, resolved_path: str) -> str | None:
"""Get forbidden path if resolved path is forbidden, None otherwise."""
res_path = Path(resolved_path)
for forbidden in self.FORBIDDEN_PATHS:
forb_base = Path(forbidden).resolve()
if res_path == forb_base or res_path.is_relative_to(forb_base):
return str(forb_base)
return None
def _validate_file_outside_safe_area(
self, file_path: str, resolved_path: str
) -> tuple[bool, str]:
"""Validate files outside safe areas."""
# Use resolved path to prevent bypass via symlinks
resolved_name = Path(resolved_path).name
if resolved_name.endswith((".tar.gz", ".tar", ".zip", ".tmp", ".temp", ".migration")):
return True, f"File type allowed: {resolved_name}"
# Allow specific filenames
filename = resolved_name
if filename in ("docker-compose.yml", "docker-compose.yaml"):
return True, f"Docker compose file allowed: {resolved_name}"
return False, f"Path '{resolved_path}' is not in safe deletion area"
def add_to_deletion_manifest(self, file_path: str, operation: str, reason: str) -> None:
"""Add a deletion operation to the manifest for audit trail.
Args:
file_path: Path to be deleted
operation: Type of operation (rm, rm -f, rm -rf, etc.)
reason: Reason for deletion
"""
manifest_entry = {
"path": file_path,
"operation": operation,
"reason": reason,
"timestamp": datetime.now(UTC).isoformat(),
"validated": False,
}
# Validate the path
is_safe, validation_reason = self.validate_deletion_path(file_path)
manifest_entry["validated"] = is_safe
manifest_entry["validation_reason"] = validation_reason
self.deletion_manifest.append(manifest_entry)
self.logger.info(
"Added deletion to manifest",
path=file_path,
operation=operation,
safe=is_safe,
reason=validation_reason,
)
def get_deletion_manifest(self) -> list[dict[str, Any]]:
"""Get a copy of the current deletion manifest to prevent external mutation."""
import copy
return copy.deepcopy(self.deletion_manifest)
def clear_deletion_manifest(self) -> None:
"""Clear the deletion manifest."""
self.deletion_manifest.clear()
async def safe_delete_file(
self, ssh_cmd: list[str], file_path: str, reason: str = "Migration cleanup"
) -> tuple[bool, str]:
"""Safely delete a file with validation and audit trail.
Args:
ssh_cmd: SSH command parts for remote execution
file_path: Path to file to delete
reason: Reason for deletion
Returns:
Tuple of (success: bool, message: str)
"""
# Add to manifest
self.add_to_deletion_manifest(file_path, "rm -f", reason)
# Validate path
is_safe, validation_reason = self.validate_deletion_path(file_path)
if not is_safe:
error_msg = f"SAFETY BLOCK: {validation_reason}"
self.logger.error(
"File deletion blocked by safety check", path=file_path, reason=validation_reason
)
raise SafetyError(error_msg)
# Proceed with deletion with proper argument separation and path safety
delete_cmd = ssh_cmd + ["rm", "-f", "--", shlex.quote(file_path)]
try:
result = await asyncio.to_thread(
subprocess.run, # nosec B603
delete_cmd,
check=False,
capture_output=True,
text=True,
timeout=DELETE_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
error_msg = f"Deletion timeout after {DELETE_TIMEOUT_SECONDS}s"
self.logger.error(
"File deletion timeout", path=file_path, timeout=DELETE_TIMEOUT_SECONDS
)
return False, error_msg
except Exception as e:
error_msg = f"Deletion error: {str(e)}"
self.logger.error("File deletion exception", path=file_path, error=str(e))
return False, error_msg
if result.returncode == 0:
self.logger.info("File deleted safely", path=file_path, reason=reason)
return True, f"File deleted: {file_path}"
else:
error_msg = f"Deletion failed: {result.stderr}"
self.logger.error("File deletion failed", path=file_path, error=result.stderr)
return False, error_msg
async def safe_cleanup_archive(
self, ssh_cmd: list[str], archive_path: str, reason: str = "Migration cleanup"
) -> tuple[bool, str]:
"""Safely cleanup archive files with validation.
Args:
ssh_cmd: SSH command parts for remote execution
archive_path: Path to archive file
reason: Reason for cleanup
Returns:
Tuple of (success: bool, message: str)
"""
# Validate archive path
if not archive_path.endswith((".tar.gz", ".tar", ".zip")):
return False, f"Not an archive file: {archive_path}"
# Use safe delete
return await self.safe_delete_file(ssh_cmd, archive_path, reason)
def create_safety_report(self) -> dict[str, Any]:
"""Create a safety report of all deletion operations."""
total_deletions = len(self.deletion_manifest)
validated_deletions = len([m for m in self.deletion_manifest if m["validated"]])
blocked_deletions = total_deletions - validated_deletions
return {
"total_deletion_attempts": total_deletions,
"validated_deletions": validated_deletions,
"blocked_deletions": blocked_deletions,
"safety_rate": validated_deletions / total_deletions * 100
if total_deletions > 0
else 100,
"manifest": self.deletion_manifest.copy(),
"safe_paths": self.SAFE_DELETE_PATHS,
"forbidden_paths": self.FORBIDDEN_PATHS,
}