"""
Secure Deletion Utilities
Provides comprehensive secure deletion capabilities for agent resources with
verification, audit trails, and recovery support.
Architecture Integration:
- Design Patterns: Command pattern for deletion operations, Memento for recovery
- Security Model: Defense-in-depth with cryptographic erasure and verification
- Performance Profile: O(n) for state deletion with verification passes
Technical Decisions:
- Multi-Pass Deletion: Secure overwrite for sensitive data
- Verification Steps: Post-deletion verification of resource cleanup
- Audit Integration: Complete audit trail for compliance
Dependencies & Integration:
- Internal: Crypto boundaries, audit logging, state management
- External: OS-level secure deletion utilities when available
Quality Assurance:
- Test Coverage: Unit tests for all deletion operations
- Security Testing: Verification of secure erasure
- Recovery Testing: Rollback capability validation
Author: ADDER_4 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import os
import shutil
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from src.boundaries.audit import AuditCategory, AuditLevel, get_audit_logger
# Import boundaries
from src.boundaries.crypto import CryptoEngine
from src.models.agent import AgentState
# Import type system
from src.models.ids import AgentId, SessionId
from src.models.security import SecurityContext
# Import utilities
from src.utils.errors import OperationError
from .contracts_shim import ensure, require
class DeletionPhase(Enum):
"""Phases of secure deletion process."""
INITIALIZATION = "initialization"
PROCESS_TERMINATION = "process_termination"
RESOURCE_CLEANUP = "resource_cleanup"
STATE_DELETION = "state_deletion"
VERIFICATION = "verification"
COMPLETION = "completion"
@dataclass
class DeletionRecord:
"""Record of a deletion operation."""
phase: DeletionPhase
success: bool
timestamp: datetime
details: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
@dataclass
class DeletionSummary:
"""Summary of deletion operations."""
agent_id: AgentId
agent_name: str
start_time: datetime
end_time: Optional[datetime] = None
phases_completed: List[DeletionPhase] = field(default_factory=list)
phases_failed: List[DeletionPhase] = field(default_factory=list)
records: List[DeletionRecord] = field(default_factory=list)
resources_freed: Dict[str, Any] = field(default_factory=dict)
verification_passed: bool = False
class SecureDeletionHandler:
"""
Handles secure deletion of agent resources with verification.
Provides comprehensive deletion capabilities with audit trails,
verification steps, and recovery support for failed operations.
Contracts:
Preconditions:
- Handler must be initialized before use
- Resources must exist before deletion
Postconditions:
- All resources securely deleted or error raised
- Audit trail created for all operations
- Verification confirms deletion success
Invariants:
- Deletion operations are atomic per phase
- Failed deletions leave audit trail
- No partial deletions without record
"""
def __init__(self):
"""Initialize secure deletion handler."""
self._deletion_summary: Optional[DeletionSummary] = None
self._audit_logger = None
self._crypto_engine = None
self._deleted_paths: Set[Path] = set()
self._orphaned_resources: List[Dict[str, Any]] = []
async def initialize(self, agent_id: AgentId, agent_name: str) -> None:
"""
Initialize handler for specific agent deletion.
Args:
agent_id: ID of agent being deleted
agent_name: Name of agent being deleted
"""
self._audit_logger = get_audit_logger()
self._crypto_engine = CryptoEngine()
await self._crypto_engine.initialize()
self._deletion_summary = DeletionSummary(
agent_id=agent_id, agent_name=agent_name, start_time=datetime.utcnow()
)
self._record_phase(
DeletionPhase.INITIALIZATION,
True,
{"agent_id": str(agent_id), "agent_name": agent_name},
)
def record_termination(self, termination_result: Dict[str, Any]) -> None:
"""
Record process termination results.
Args:
termination_result: Results of termination attempt
"""
success = termination_result.get("success", False)
self._record_phase(
DeletionPhase.PROCESS_TERMINATION, success, termination_result
)
if success:
self._deletion_summary.resources_freed["process"] = {
"pid": termination_result.get("process_id"),
"graceful": termination_result.get("graceful", False),
}
def record_tab_closure(self, tab_id: str, success: bool = True) -> None:
"""
Record iTerm2 tab closure.
Args:
tab_id: ID of closed tab
success: Whether closure succeeded
"""
self._record_phase(
DeletionPhase.RESOURCE_CLEANUP,
success,
{"tab_id": tab_id, "resource_type": "iterm_tab"},
)
if success:
self._deletion_summary.resources_freed["iterm_tab"] = tab_id
async def delete_agent_state(
self,
agent_id: AgentId,
state_paths: List[Path],
secure: bool = True,
verify: bool = True,
) -> bool:
"""
Securely delete agent state files.
Performs multi-pass secure deletion of state files with
optional verification of deletion success.
Args:
agent_id: Agent ID for state files
state_paths: Paths to state files
secure: Whether to use secure deletion
verify: Whether to verify deletion
Returns:
bool: True if all deletions successful
"""
all_success = True
for state_path in state_paths:
try:
if secure:
await self._secure_delete_file(state_path)
else:
await self._standard_delete_file(state_path)
self._deleted_paths.add(state_path)
if verify:
if state_path.exists():
raise OperationError(
f"Verification failed: {state_path} still exists"
)
except Exception as e:
all_success = False
self._record_phase(
DeletionPhase.STATE_DELETION,
False,
{"path": str(state_path), "error": str(e), "secure": secure},
)
if all_success:
self._record_phase(
DeletionPhase.STATE_DELETION,
True,
{
"files_deleted": len(state_paths),
"secure": secure,
"verified": verify,
},
)
self._deletion_summary.resources_freed["state_files"] = len(state_paths)
return all_success
async def _secure_delete_file(self, file_path: Path) -> None:
"""
Securely delete a file with multi-pass overwrite.
Args:
file_path: Path to file to delete
"""
if not file_path.exists():
return
# Multi-pass overwrite for secure deletion
file_size = file_path.stat().st_size
# Pass 1: Overwrite with random data
with open(file_path, "rb+") as f:
random_data = os.urandom(file_size)
f.seek(0)
f.write(random_data)
f.flush()
os.fsync(f.fileno())
# Pass 2: Overwrite with zeros
with open(file_path, "rb+") as f:
f.seek(0)
f.write(b"\x00" * file_size)
f.flush()
os.fsync(f.fileno())
# Pass 3: Overwrite with ones
with open(file_path, "rb+") as f:
f.seek(0)
f.write(b"\xFF" * file_size)
f.flush()
os.fsync(f.fileno())
# Finally, delete the file
file_path.unlink()
async def _standard_delete_file(self, file_path: Path) -> None:
"""
Standard file deletion.
Args:
file_path: Path to file to delete
"""
if file_path.exists():
file_path.unlink()
def record_state_deletion(self, agent_id: AgentId) -> None:
"""
Record successful state deletion.
Args:
agent_id: ID of agent whose state was deleted
"""
self._deletion_summary.resources_freed["agent_state"] = str(agent_id)
async def verify_deletion(self, verification_checks: Dict[str, Any]) -> bool:
"""
Verify all resources have been deleted.
Args:
verification_checks: Dictionary of verification checks to perform
Returns:
bool: True if all verifications pass
"""
all_verified = True
verification_results = {}
# Verify deleted files don't exist
for path in self._deleted_paths:
if path.exists():
all_verified = False
verification_results[str(path)] = "still_exists"
else:
verification_results[str(path)] = "verified_deleted"
# Additional verification checks from input
for check_name, check_result in verification_checks.items():
verification_results[check_name] = check_result
if not check_result:
all_verified = False
self._record_phase(
DeletionPhase.VERIFICATION, all_verified, verification_results
)
self._deletion_summary.verification_passed = all_verified
return all_verified
def get_summary(self) -> Dict[str, Any]:
"""
Get deletion summary.
Returns:
Dict containing deletion summary information
"""
if not self._deletion_summary:
return {}
self._deletion_summary.end_time = datetime.utcnow()
return {
"agent_id": str(self._deletion_summary.agent_id),
"agent_name": self._deletion_summary.agent_name,
"duration_seconds": (
self._deletion_summary.end_time - self._deletion_summary.start_time
).total_seconds(),
"phases_completed": [
p.value for p in self._deletion_summary.phases_completed
],
"phases_failed": [p.value for p in self._deletion_summary.phases_failed],
"resources_freed": self._deletion_summary.resources_freed,
"verification_passed": self._deletion_summary.verification_passed,
"orphaned_resources": self._orphaned_resources,
}
async def cleanup_orphaned_resources(self) -> None:
"""Clean up any orphaned resources found during deletion."""
for resource in self._orphaned_resources:
try:
resource_type = resource.get("type")
resource_id = resource.get("id")
if resource_type == "process":
# Kill orphaned process
os.kill(resource_id, 9)
elif resource_type == "file":
# Delete orphaned file
Path(resource_id).unlink(missing_ok=True)
elif resource_type == "directory":
# Remove orphaned directory
shutil.rmtree(resource_id, ignore_errors=True)
except Exception as e:
# Log but don't fail on orphaned resource cleanup
if self._audit_logger:
await self._audit_logger.log_event(
level=AuditLevel.WARNING,
category=AuditCategory.RESOURCE_MANAGEMENT,
operation="orphaned_resource_cleanup_failed",
resource_type=resource.get("type", "unknown"),
resource_id=str(resource.get("id", "unknown")),
success=False,
error_message=str(e),
)
def _record_phase(
self,
phase: DeletionPhase,
success: bool,
details: Dict[str, Any],
error: Optional[str] = None,
) -> None:
"""
Record a deletion phase completion.
Args:
phase: Phase that was completed
success: Whether phase succeeded
details: Phase execution details
error: Error message if failed
"""
record = DeletionRecord(
phase=phase,
success=success,
timestamp=datetime.utcnow(),
details=details,
error=error,
)
self._deletion_summary.records.append(record)
if success:
if phase not in self._deletion_summary.phases_completed:
self._deletion_summary.phases_completed.append(phase)
else:
if phase not in self._deletion_summary.phases_failed:
self._deletion_summary.phases_failed.append(phase)
@require(lambda self: self._deletion_summary is not None)
@ensure(lambda result: isinstance(result, bool))
async def finalize(self) -> bool:
"""
Finalize deletion and clean up handler.
Returns:
bool: True if deletion fully successful
"""
# Clean up any orphaned resources
await self.cleanup_orphaned_resources()
# Record completion
self._record_phase(
DeletionPhase.COMPLETION,
self._deletion_summary.verification_passed,
{
"total_phases": len(DeletionPhase),
"completed_phases": len(self._deletion_summary.phases_completed),
"failed_phases": len(self._deletion_summary.phases_failed),
},
)
# Log final summary
if self._audit_logger:
await self._audit_logger.log_event(
level=AuditLevel.INFO,
category=AuditCategory.RESOURCE_MANAGEMENT,
operation="secure_deletion_completed",
resource_type="agent",
resource_id=str(self._deletion_summary.agent_id),
success=self._deletion_summary.verification_passed,
metadata=self.get_summary(),
)
return self._deletion_summary.verification_passed
# Export utilities
__all__ = [
"SecureDeletionHandler",
"DeletionPhase",
"DeletionRecord",
"DeletionSummary",
]