"""
Session Deletion Validation - Agent Orchestration Platform
This module implements comprehensive validation for session deletion operations with
security checks, critical operation detection, and work preservation analysis.
Security Implementation:
- Permission validation for session deletion
- Critical operation detection across all agents
- Work preservation requirement analysis
- Cascade deletion safety checks
Author: Adder_1 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
# Import security contracts
from src.contracts.security import ContractViolationError, validate_security_context
from src.contracts_compat import ensure, require
from src.models.agent import AgentState, AgentStatus
# Import type system
from src.models.ids import AgentId, SessionId, validate_session_id
from src.models.security import SecurityContext, SecurityError, SecurityLevel
from src.models.session import SessionState, SessionStatus
from src.models.validation import ValidationError, ValidationResult
@dataclass(frozen=True)
class SessionDeletionContext:
"""
Context for session deletion validation.
Contains all information needed to validate whether a session
can be safely deleted, including agent states and work status.
"""
session_state: SessionState
agents: List[AgentState]
security_context: SecurityContext
uncommitted_changes: Dict[str, List[str]] = field(default_factory=dict)
active_operations: Dict[AgentId, List[str]] = field(default_factory=dict)
critical_files: Set[str] = field(default_factory=set)
@dataclass(frozen=True)
class SessionDeletionValidationResult:
"""
Result of session deletion validation.
Contains validation outcome with detailed information about
any issues that would prevent safe deletion.
"""
is_valid: bool
session_id: SessionId
can_delete: bool
requires_force: bool
preservation_required: bool
warnings: List[str] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
critical_operations: List[str] = field(default_factory=list)
uncommitted_work: Dict[str, List[str]] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def get_failure_summary(self) -> str:
"""Get human-readable summary of validation failures."""
if self.is_valid:
return "Validation passed"
issues = []
if self.errors:
issues.append(f"Errors: {'; '.join(self.errors)}")
if self.critical_operations:
issues.append(f"Critical operations: {len(self.critical_operations)}")
if self.uncommitted_work:
total_files = sum(len(files) for files in self.uncommitted_work.values())
issues.append(f"Uncommitted work: {total_files} files")
return " | ".join(issues)
class SessionDeletionValidator:
"""
Comprehensive validator for session deletion operations.
Performs multi-stage validation including:
- Permission and access control validation
- Critical operation detection across agents
- Uncommitted work analysis
- Cascade deletion safety verification
- Work preservation requirement analysis
"""
def __init__(self):
"""Initialize session deletion validator."""
self._validation_cache: Dict[SessionId, SessionDeletionValidationResult] = {}
self._cache_ttl_seconds = 30 # Cache validation results briefly
@require(lambda self, session_id: session_id is not None)
@ensure(lambda result: isinstance(result, SessionDeletionValidationResult))
async def validate_session_deletion(
self,
session_id: SessionId,
context: SessionDeletionContext,
cleanup_agents: bool = True,
preserve_work: bool = True,
force: bool = False,
) -> SessionDeletionValidationResult:
"""
Validate session deletion request with comprehensive checks.
Args:
session_id: Session to validate for deletion
context: Deletion context with session and agent states
cleanup_agents: Whether agents will be deleted
preserve_work: Whether work should be preserved
force: Whether to allow forced deletion
Returns:
SessionDeletionValidationResult with validation outcome
"""
errors = []
warnings = []
critical_operations = []
try:
# Stage 1: Basic validation
if context.session_state.session_id != session_id:
errors.append(
f"Session ID mismatch: {session_id} != {context.session_state.session_id}"
)
if context.session_state.status == SessionStatus.TERMINATED:
errors.append("Session is already terminated")
# Stage 2: Permission validation
if not self._validate_deletion_permissions(context):
errors.append("Insufficient permissions to delete session")
# Stage 3: Critical operation detection
if cleanup_agents and not force:
critical_ops = await self._detect_critical_operations(context)
if critical_ops:
critical_operations.extend(critical_ops)
if not force:
errors.append(
f"Session has {len(critical_ops)} critical operations"
)
# Stage 4: Uncommitted work analysis
uncommitted_work = {}
if not preserve_work:
uncommitted = await self._analyze_uncommitted_work(context)
if uncommitted:
uncommitted_work = uncommitted
warnings.append(
f"Session has uncommitted work in {len(uncommitted)} locations"
)
# Stage 5: Resource allocation checks
if cleanup_agents:
agent_count = len(context.agents)
if agent_count > 0:
warnings.append(f"Will delete {agent_count} agents")
# Stage 6: Cascade safety validation
if cleanup_agents:
cascade_issues = self._validate_cascade_safety(context)
if cascade_issues:
warnings.extend(cascade_issues)
# Determine final validation result
is_valid = len(errors) == 0
can_delete = is_valid or force
requires_force = len(critical_operations) > 0 or len(errors) > 0
preservation_required = len(uncommitted_work) > 0 and not preserve_work
return SessionDeletionValidationResult(
is_valid=is_valid,
session_id=session_id,
can_delete=can_delete,
requires_force=requires_force,
preservation_required=preservation_required,
warnings=warnings,
errors=errors,
critical_operations=critical_operations,
uncommitted_work=uncommitted_work,
metadata={
"session_name": context.session_state.name,
"agent_count": len(context.agents),
"security_level": context.session_state.security_level.value,
"validation_timestamp": datetime.utcnow().isoformat(),
},
)
except Exception as e:
# Validation process failure
return SessionDeletionValidationResult(
is_valid=False,
session_id=session_id,
can_delete=False,
requires_force=True,
preservation_required=True,
errors=[f"Validation process failed: {str(e)}"],
metadata={"exception": str(e)},
)
def _validate_deletion_permissions(self, context: SessionDeletionContext) -> bool:
"""
Validate permissions for session deletion.
Args:
context: Deletion context with security information
Returns:
bool: Whether deletion is permitted
"""
# Check security level requirements
if context.session_state.security_level == SecurityLevel.MAXIMUM:
# Maximum security sessions require elevated permissions
if not context.security_context.has_elevated_permissions:
return False
# Check session ownership or admin rights
if not (
context.security_context.is_session_owner
or context.security_context.is_admin
):
return False
return True
async def _detect_critical_operations(
self, context: SessionDeletionContext
) -> List[str]:
"""
Detect critical operations across all session agents.
Args:
context: Deletion context with agent states
Returns:
List of critical operation descriptions
"""
critical_operations = []
for agent in context.agents:
# Check agent status
if agent.status in [AgentStatus.EXECUTING_CRITICAL, AgentStatus.DEPLOYING]:
critical_operations.append(f"{agent.name}: Status={agent.status.value}")
# Check for active critical operations
if agent.agent_id in context.active_operations:
ops = context.active_operations[agent.agent_id]
for op in ops:
if self._is_critical_operation(op):
critical_operations.append(f"{agent.name}: {op}")
return critical_operations
def _is_critical_operation(self, operation: str) -> bool:
"""
Determine if an operation is critical.
Args:
operation: Operation description
Returns:
bool: Whether operation is critical
"""
critical_keywords = [
"deploy",
"release",
"migration",
"backup",
"security",
"encryption",
"payment",
"transaction",
]
operation_lower = operation.lower()
return any(keyword in operation_lower for keyword in critical_keywords)
async def _analyze_uncommitted_work(
self, context: SessionDeletionContext
) -> Dict[str, List[str]]:
"""
Analyze uncommitted work in the session.
Args:
context: Deletion context
Returns:
Dict mapping work types to file lists
"""
uncommitted_work = {}
# Check for uncommitted changes from context
if context.uncommitted_changes:
uncommitted_work.update(context.uncommitted_changes)
# Check for critical files
if context.critical_files:
uncommitted_work["critical_files"] = list(context.critical_files)
# Check for agent-specific uncommitted work
for agent in context.agents:
agent_work = []
# Check if agent has unsaved work
if hasattr(agent, "has_unsaved_work") and agent.has_unsaved_work:
agent_work.append(f"Unsaved work in {agent.name}")
# Check if agent is editing files
if hasattr(agent, "editing_files") and agent.editing_files:
agent_work.extend([f"Editing: {f}" for f in agent.editing_files])
if agent_work:
uncommitted_work[f"agent_{agent.name}"] = agent_work
return uncommitted_work
def _validate_cascade_safety(self, context: SessionDeletionContext) -> List[str]:
"""
Validate safety of cascade agent deletion.
Args:
context: Deletion context
Returns:
List of cascade safety warnings
"""
warnings = []
# Check for agent dependencies
agent_names = {agent.name for agent in context.agents}
# Check for inter-agent communications
for agent in context.agents:
if hasattr(agent, "communicating_with"):
for other_agent in agent.communicating_with:
if other_agent not in agent_names:
warnings.append(
f"{agent.name} is communicating with external agent {other_agent}"
)
# Check for shared resources
resource_usage = {}
for agent in context.agents:
if hasattr(agent, "resources"):
for resource in agent.resources:
if resource not in resource_usage:
resource_usage[resource] = []
resource_usage[resource].append(agent.name)
for resource, users in resource_usage.items():
if len(users) > 1:
warnings.append(
f"Shared resource '{resource}' used by: {', '.join(users)}"
)
return warnings
def validate_session_deletion_request(func):
"""
Decorator for session deletion request validation.
Performs comprehensive validation before allowing session deletion
to proceed, ensuring all safety checks are performed.
"""
async def wrapper(*args, **kwargs):
session_id = kwargs.get("session_id")
force = kwargs.get("force", False)
# Basic parameter validation
if not session_id:
raise ValidationError("Session ID is required")
try:
# Validate session ID format
validated_id = validate_session_id(session_id)
kwargs["session_id"] = validated_id
except Exception as e:
raise ValidationError(f"Invalid session ID format: {e}")
# Additional validation would be performed by the tool implementation
# after loading session state and checking permissions
return await func(*args, **kwargs)
return wrapper
# Export public interface
__all__ = [
"SessionDeletionValidator",
"SessionDeletionContext",
"SessionDeletionValidationResult",
"validate_session_deletion_request",
]