"""
Cascade Deletion Orchestration - Agent Orchestration Platform
This module implements comprehensive cascade deletion for sessions with proper
agent ordering, resource tracking, and recovery mechanisms.
Architecture:
- Pattern: Orchestrator pattern for coordinated deletion
- Error Handling: Transaction-like semantics with rollback
- Performance: Parallel deletion where safe, sequential where required
Author: Adder_1 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
from src.contracts_compat import ensure, require
from src.models.agent import AgentState, AgentStatus
# Import type system
from src.models.ids import AgentId, SessionId
from src.models.session import SessionState
class DeletionStatus(Enum):
"""Status of deletion operation."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
SUCCESS = "success"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
@dataclass
class AgentDeletionRecord:
"""Record of agent deletion attempt."""
agent_id: AgentId
agent_name: str
status: DeletionStatus
start_time: datetime
end_time: Optional[datetime] = None
error: Optional[str] = None
resources_freed: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DeletionResult:
"""Result of a deletion operation."""
agent_id: AgentId
success: bool
error_message: Optional[str] = None
affected_resources: List[str] = field(default_factory=list)
resources_freed: Dict[str, Any] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SessionDeletionRecord:
"""Complete record of session deletion."""
session_id: SessionId
session_name: str
start_time: datetime
status: DeletionStatus
end_time: Optional[datetime] = None
agents_total: int = 0
agents_deleted: int = 0
agents_failed: List[AgentDeletionRecord] = field(default_factory=list)
preservation_completed: bool = False
backup_created: bool = False
backup_path: Optional[str] = None
errors: List[str] = field(default_factory=list)
class CascadeDeletionOrchestrator:
"""
Orchestrates cascade deletion of sessions with all agents.
Provides transaction-like semantics for session deletion with
proper ordering, resource tracking, and recovery capabilities.
"""
def __init__(self):
"""Initialize cascade deletion orchestrator."""
self.deletion_record: Optional[SessionDeletionRecord] = None
self.agent_records: Dict[AgentId, AgentDeletionRecord] = {}
self._deletion_order: List[AgentId] = []
self._rollback_stack: List[Tuple[str, Callable]] = []
@require(lambda self, session_state: session_state is not None)
async def prepare_cascade_deletion(
self, session_state: SessionState, agents: List[AgentState]
) -> List[AgentId]:
"""
Prepare cascade deletion with proper agent ordering.
Args:
session_state: Session being deleted
agents: All agents in the session
Returns:
List of agent IDs in deletion order
"""
# Initialize deletion record
self.deletion_record = SessionDeletionRecord(
session_id=session_state.session_id,
session_name=session_state.name,
start_time=datetime.now(),
status=DeletionStatus.PENDING,
agents_total=len(agents),
)
# Determine deletion order (reverse creation order for safety)
deletion_order = self._determine_deletion_order(agents)
self._deletion_order = deletion_order
# Initialize agent records
for agent in agents:
self.agent_records[agent.agent_id] = AgentDeletionRecord(
agent_id=agent.agent_id,
agent_name=agent.name,
status=DeletionStatus.PENDING,
start_time=datetime.now(),
)
return deletion_order
def _determine_deletion_order(self, agents: List[AgentState]) -> List[AgentId]:
"""
Determine safe deletion order for agents.
Args:
agents: List of agents to order
Returns:
Agent IDs in safe deletion order
"""
# Sort by creation time (newest first for safer deletion)
sorted_agents = sorted(
agents,
key=lambda a: a.created_at if hasattr(a, "created_at") else datetime.min,
reverse=True,
)
# Check for dependencies and adjust order if needed
ordered_ids = []
processed = set()
for agent in sorted_agents:
if agent.agent_id not in processed:
# Add any agents this one depends on first
deps = self._get_agent_dependencies(agent, agents)
for dep_id in deps:
if dep_id not in processed:
ordered_ids.append(dep_id)
processed.add(dep_id)
# Then add this agent
ordered_ids.append(agent.agent_id)
processed.add(agent.agent_id)
return ordered_ids
def _get_agent_dependencies(
self, agent: AgentState, all_agents: List[AgentState]
) -> List[AgentId]:
"""Get agents that depend on this agent."""
dependencies = []
# Check for explicit dependencies
if hasattr(agent, "depends_on"):
for dep_name in agent.depends_on:
for other in all_agents:
if other.name == dep_name:
dependencies.append(other.agent_id)
return dependencies
async def delete_agent_with_tracking(
self, agent_id: AgentId, deletion_func: Callable, force: bool = False
) -> bool:
"""
Delete agent with comprehensive tracking.
Args:
agent_id: Agent to delete
deletion_func: Function to perform deletion
force: Whether to force deletion
Returns:
bool: Success status
"""
if agent_id not in self.agent_records:
return False
record = self.agent_records[agent_id]
record.status = DeletionStatus.IN_PROGRESS
try:
# Perform deletion
result = await deletion_func(agent_id, force=force)
# Update record on success
record.status = DeletionStatus.SUCCESS
record.end_time = datetime.now()
if isinstance(result, dict):
record.resources_freed = result.get("resources_freed", {})
if self.deletion_record:
self.deletion_record.agents_deleted += 1
return True
except Exception as e:
# Update record on failure
record.status = DeletionStatus.FAILED
record.end_time = datetime.now()
record.error = str(e)
if self.deletion_record:
self.deletion_record.agents_failed.append(record)
self.deletion_record.errors.append(
f"Failed to delete {record.agent_name}: {e}"
)
return False
def record_preservation(
self, success: bool, details: Optional[Dict[str, Any]] = None
):
"""Record work preservation outcome."""
if self.deletion_record:
self.deletion_record.preservation_completed = success
if details and "path" in details:
# Store preservation path in metadata
pass
def record_backup(self, backup_path: Optional[str]):
"""Record backup creation."""
if self.deletion_record:
self.deletion_record.backup_created = backup_path is not None
self.deletion_record.backup_path = backup_path
def add_rollback_action(self, description: str, action: Callable):
"""Add action to rollback stack."""
self._rollback_stack.append((description, action))
async def execute_rollback(self) -> List[str]:
"""
Execute rollback actions in reverse order.
Returns:
List of rollback action results
"""
results = []
while self._rollback_stack:
description, action = self._rollback_stack.pop()
try:
await action()
results.append(f"✓ Rolled back: {description}")
except Exception as e:
results.append(f"✗ Rollback failed: {description} - {e}")
if self.deletion_record:
self.deletion_record.status = DeletionStatus.ROLLED_BACK
return results
def get_deletion_summary(self) -> Dict[str, Any]:
"""Get comprehensive deletion summary."""
if not self.deletion_record:
return {"status": "no_deletion_record"}
record = self.deletion_record
return {
"session_id": str(record.session_id),
"session_name": record.session_name,
"status": record.status.value,
"duration_seconds": (
(record.end_time - record.start_time).total_seconds()
if record.end_time
else None
),
"agents": {
"total": record.agents_total,
"deleted": record.agents_deleted,
"failed": len(record.agents_failed),
},
"preservation_completed": record.preservation_completed,
"backup_created": record.backup_created,
"backup_path": record.backup_path,
"errors": record.errors,
"agent_details": [
{
"agent_id": str(ar.agent_id),
"agent_name": ar.agent_name,
"status": ar.status.value,
"error": ar.error,
}
for ar in record.agents_failed
],
}
def mark_complete(self):
"""Mark deletion as complete."""
if self.deletion_record:
self.deletion_record.end_time = datetime.now()
if self.deletion_record.agents_failed:
self.deletion_record.status = DeletionStatus.FAILED
else:
self.deletion_record.status = DeletionStatus.SUCCESS
def get_partial_deletions(self) -> List[Dict[str, Any]]:
"""Get details of partial deletions."""
if not self.deletion_record:
return []
return [
{
"agent_id": str(record.agent_id),
"agent_name": record.agent_name,
"error": record.error,
"attempted_at": record.start_time.isoformat(),
}
for record in self.deletion_record.agents_failed
]
def should_continue_on_failure(self, force: bool) -> bool:
"""
Determine if deletion should continue after failure.
Args:
force: Whether force mode is enabled
Returns:
bool: Whether to continue
"""
if force:
return True
# In non-force mode, stop after first failure
if self.deletion_record and self.deletion_record.agents_failed:
return False
return True
class ParallelDeletionExecutor:
"""
Executes parallel deletion operations with proper synchronization.
Manages concurrent agent deletions while respecting dependencies
and resource constraints.
"""
def __init__(self, max_concurrent: int = 3):
"""
Initialize parallel deletion executor.
Args:
max_concurrent: Maximum concurrent deletions
"""
self.max_concurrent = max_concurrent
self._semaphore = asyncio.Semaphore(max_concurrent)
async def delete_agents_parallel(
self,
agent_groups: List[List[AgentId]],
deletion_func: Callable,
orchestrator: CascadeDeletionOrchestrator,
force: bool = False,
) -> int:
"""
Delete agents in parallel groups.
Args:
agent_groups: Groups of agents that can be deleted in parallel
deletion_func: Function to delete individual agent
orchestrator: Deletion orchestrator for tracking
force: Whether to force deletion
Returns:
Number of successfully deleted agents
"""
total_deleted = 0
for group in agent_groups:
# Delete agents in group concurrently
tasks = []
for agent_id in group:
task = self._delete_with_limit(
agent_id, deletion_func, orchestrator, force
)
tasks.append(task)
# Wait for group to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successes
successes = sum(1 for r in results if r is True)
total_deleted += successes
# Check if we should continue
if not orchestrator.should_continue_on_failure(force):
break
return total_deleted
async def _delete_with_limit(
self,
agent_id: AgentId,
deletion_func: Callable,
orchestrator: CascadeDeletionOrchestrator,
force: bool,
) -> bool:
"""Delete agent with concurrency limit."""
async with self._semaphore:
return await orchestrator.delete_agent_with_tracking(
agent_id, deletion_func, force
)
# Export public interface
__all__ = [
"CascadeDeletionOrchestrator",
"ParallelDeletionExecutor",
"DeletionStatus",
"DeletionResult",
"AgentDeletionRecord",
"SessionDeletionRecord",
]