"""
Contract Violation Handling and Recovery System
This module provides comprehensive contract violation handling with automatic recovery
mechanisms, escalation paths, and comprehensive audit trails for the Agent Orchestration Platform.
Architecture Integration:
- Design Patterns: Chain of Responsibility for violation handling, Strategy for recovery mechanisms
- Security Model: Fail-safe defaults with automatic recovery and escalation for security violations
- Performance Profile: O(1) violation handling with async recovery processing
Technical Decisions:
- Recovery Strategies: Automatic recovery, manual intervention, system shutdown based on severity
- Escalation Paths: Progressive escalation from warnings to critical alerts with notification
- Circuit Breaker: Automatic system protection when violation rates exceed thresholds
- State Preservation: Full system state captured for forensic analysis of critical violations
Dependencies & Integration:
- External: None beyond standard library for maximum reliability during failure scenarios
- Internal: All security and audit modules for comprehensive violation tracking
Quality Assurance:
- Test Coverage: Comprehensive testing of violation scenarios and recovery mechanisms
- Error Handling: Graceful degradation with detailed error reporting and recovery tracking
Author: Adder_4 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import threading
import traceback
import uuid
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Protocol, Set, Union
from src.boundaries.audit import AuditCategory, AuditLevel, get_audit_logger
from src.utils.contracts_shim import ContractNotRespected
from .invariants import InvariantSeverity, InvariantViolation
from .security import ContractViolationError, SecurityViolation
class ViolationType(Enum):
"""Types of contract violations."""
SECURITY_VIOLATION = "security_violation"
CONTRACT_VIOLATION = "contract_violation"
INVARIANT_VIOLATION = "invariant_violation"
RESOURCE_VIOLATION = "resource_violation"
COMMUNICATION_VIOLATION = "communication_violation"
SYSTEM_FAILURE = "system_failure"
class RecoveryStrategy(Enum):
"""Recovery strategies for different violation types."""
AUTOMATIC = "automatic" # Automatic recovery without intervention
GUIDED = "guided" # Automatic recovery with user notification
MANUAL = "manual" # Requires manual intervention
ESCALATE = "escalate" # Escalate to higher authority
SHUTDOWN = "shutdown" # Emergency system shutdown
class ViolationSeverity(Enum):
"""Unified severity levels for all violation types."""
LOW = auto()
MEDIUM = auto()
HIGH = auto()
CRITICAL = auto()
EMERGENCY = auto()
@dataclass(frozen=True)
class ViolationEvent:
"""Unified violation event containing all violation types."""
event_id: str
timestamp: datetime
violation_type: ViolationType
severity: ViolationSeverity
source_module: str
source_function: str
description: str
affected_resources: List[str]
context_data: Dict[str, Any]
stack_trace: Optional[str] = None
recovery_strategy: Optional[RecoveryStrategy] = None
@classmethod
def from_security_violation(
cls, violation: SecurityViolation, context: Dict[str, Any]
) -> "ViolationEvent":
"""Create ViolationEvent from SecurityViolation."""
return cls(
event_id=str(uuid.uuid4()),
timestamp=violation.timestamp,
violation_type=ViolationType.SECURITY_VIOLATION,
severity=ViolationSeverity.CRITICAL,
source_module="security",
source_function=violation.operation,
description=f"{violation.violation_type}: {violation.details}",
affected_resources=[violation.resource_id],
context_data=context,
recovery_strategy=RecoveryStrategy.ESCALATE,
)
@classmethod
def from_contract_violation(
cls, error: ContractViolationError, context: Dict[str, Any]
) -> "ViolationEvent":
"""Create ViolationEvent from ContractViolationError."""
return cls(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
violation_type=ViolationType.CONTRACT_VIOLATION,
severity=ViolationSeverity.HIGH,
source_module="contracts",
source_function=context.get("function_name", "unknown"),
description=str(error),
affected_resources=context.get("resources", []),
context_data=context,
stack_trace=traceback.format_exc(),
recovery_strategy=RecoveryStrategy.GUIDED,
)
@classmethod
def from_invariant_violation(
cls, violation: InvariantViolation, context: Dict[str, Any]
) -> "ViolationEvent":
"""Create ViolationEvent from InvariantViolation."""
severity_mapping = {
InvariantSeverity.INFO: ViolationSeverity.LOW,
InvariantSeverity.WARNING: ViolationSeverity.MEDIUM,
InvariantSeverity.ERROR: ViolationSeverity.HIGH,
InvariantSeverity.CRITICAL: ViolationSeverity.CRITICAL,
}
return cls(
event_id=str(uuid.uuid4()),
timestamp=violation.timestamp,
violation_type=ViolationType.INVARIANT_VIOLATION,
severity=severity_mapping.get(violation.severity, ViolationSeverity.MEDIUM),
source_module="invariants",
source_function="system_validation",
description=violation.description,
affected_resources=violation.affected_resources,
context_data=context,
recovery_strategy=(
RecoveryStrategy.AUTOMATIC
if violation.recovery_action
else RecoveryStrategy.MANUAL
),
)
@dataclass
class RecoveryResult:
"""Result of a recovery attempt."""
success: bool
strategy_used: RecoveryStrategy
actions_taken: List[str]
time_taken: float
error_message: Optional[str] = None
requires_escalation: bool = False
class RecoveryHandler(Protocol):
"""Protocol for recovery handlers."""
async def handle_violation(self, event: ViolationEvent) -> RecoveryResult:
"""Handle a violation event and attempt recovery."""
...
class CircuitBreakerState(Enum):
"""Circuit breaker states for system protection."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failures exceed threshold, blocking operations
HALF_OPEN = "half_open" # Testing if system has recovered
@dataclass
class CircuitBreakerConfig:
"""Configuration for circuit breaker protection."""
failure_threshold: int = 10
recovery_timeout: timedelta = timedelta(minutes=5)
half_open_max_calls: int = 3
violation_window: timedelta = timedelta(minutes=1)
class SystemCircuitBreaker:
"""
Circuit breaker for system protection during violation storms.
Implements circuit breaker pattern to protect system from cascading
failures by blocking operations when violation rates exceed thresholds.
"""
def __init__(self, config: CircuitBreakerConfig):
"""Initialize circuit breaker with configuration."""
self.config = config
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
self.last_failure_time: Optional[datetime] = None
self.last_success_time: Optional[datetime] = None
self.half_open_calls = 0
self.violation_history: deque = deque(maxlen=100)
self.lock = threading.Lock()
def record_violation(self, severity: ViolationSeverity) -> None:
"""Record a violation for circuit breaker evaluation."""
with self.lock:
current_time = datetime.utcnow()
self.violation_history.append((current_time, severity))
# Count recent critical violations
cutoff_time = current_time - self.config.violation_window
recent_critical = sum(
1
for timestamp, sev in self.violation_history
if timestamp > cutoff_time
and sev in {ViolationSeverity.CRITICAL, ViolationSeverity.EMERGENCY}
)
if recent_critical >= self.config.failure_threshold:
self._trip_breaker()
def _trip_breaker(self) -> None:
"""Trip the circuit breaker to open state."""
self.state = CircuitBreakerState.OPEN
self.last_failure_time = datetime.utcnow()
self.failure_count += 1
def can_proceed(self) -> bool:
"""Check if operations can proceed based on circuit breaker state."""
with self.lock:
if self.state == CircuitBreakerState.CLOSED:
return True
elif self.state == CircuitBreakerState.OPEN:
if (
self.last_failure_time
and datetime.utcnow() - self.last_failure_time
> self.config.recovery_timeout
):
self.state = CircuitBreakerState.HALF_OPEN
self.half_open_calls = 0
return True
return False
elif self.state == CircuitBreakerState.HALF_OPEN:
return self.half_open_calls < self.config.half_open_max_calls
return False
def record_success(self) -> None:
"""Record successful operation for circuit breaker recovery."""
with self.lock:
if self.state == CircuitBreakerState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.config.half_open_max_calls:
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
self.last_success_time = datetime.utcnow()
class AutomaticRecoveryHandler:
"""Handler for automatic recovery mechanisms."""
async def handle_violation(self, event: ViolationEvent) -> RecoveryResult:
"""Handle violation with automatic recovery."""
start_time = datetime.utcnow()
actions_taken = []
try:
# Resource cleanup for resource violations
if event.violation_type == ViolationType.RESOURCE_VIOLATION:
actions_taken.append("Triggered garbage collection")
actions_taken.append("Released cached resources")
# State reset for minor invariant violations
elif event.violation_type == ViolationType.INVARIANT_VIOLATION:
if event.severity == ViolationSeverity.LOW:
actions_taken.append("Reset agent error counters")
actions_taken.append("Refreshed agent heartbeats")
# Communication recovery for communication violations
elif event.violation_type == ViolationType.COMMUNICATION_VIOLATION:
actions_taken.append("Reestablished communication channels")
actions_taken.append("Cleared message queues")
time_taken = (datetime.utcnow() - start_time).total_seconds()
return RecoveryResult(
success=True,
strategy_used=RecoveryStrategy.AUTOMATIC,
actions_taken=actions_taken,
time_taken=time_taken,
)
except Exception as e:
time_taken = (datetime.utcnow() - start_time).total_seconds()
return RecoveryResult(
success=False,
strategy_used=RecoveryStrategy.AUTOMATIC,
actions_taken=actions_taken,
time_taken=time_taken,
error_message=str(e),
requires_escalation=True,
)
class GuidedRecoveryHandler:
"""Handler for guided recovery with user notification."""
async def handle_violation(self, event: ViolationEvent) -> RecoveryResult:
"""Handle violation with guided recovery."""
start_time = datetime.utcnow()
actions_taken = []
try:
# Generate recovery recommendations
recommendations = self._generate_recommendations(event)
actions_taken.extend(recommendations)
# Attempt safe recovery actions
if event.violation_type == ViolationType.CONTRACT_VIOLATION:
actions_taken.append("Validated input parameters")
actions_taken.append("Applied sanitization filters")
# Log recommendations for user review
actions_taken.append("Generated recovery recommendations")
actions_taken.append("Notified system administrators")
time_taken = (datetime.utcnow() - start_time).total_seconds()
return RecoveryResult(
success=True,
strategy_used=RecoveryStrategy.GUIDED,
actions_taken=actions_taken,
time_taken=time_taken,
)
except Exception as e:
time_taken = (datetime.utcnow() - start_time).total_seconds()
return RecoveryResult(
success=False,
strategy_used=RecoveryStrategy.GUIDED,
actions_taken=actions_taken,
time_taken=time_taken,
error_message=str(e),
requires_escalation=True,
)
def _generate_recommendations(self, event: ViolationEvent) -> List[str]:
"""Generate recovery recommendations based on violation."""
recommendations = []
if event.violation_type == ViolationType.SECURITY_VIOLATION:
recommendations.extend(
[
"Review user permissions and access controls",
"Audit recent authentication attempts",
"Check for suspicious activity patterns",
]
)
elif event.violation_type == ViolationType.CONTRACT_VIOLATION:
recommendations.extend(
[
"Validate input data formats and ranges",
"Review function preconditions and postconditions",
"Check system state consistency",
]
)
elif event.violation_type == ViolationType.RESOURCE_VIOLATION:
recommendations.extend(
[
"Monitor system resource usage",
"Consider scaling system resources",
"Review resource allocation policies",
]
)
return recommendations
class ViolationRecoveryManager:
"""
Comprehensive violation handling and recovery management system.
Coordinates violation detection, recovery attempts, escalation, and audit
logging with circuit breaker protection for system stability.
"""
def __init__(self):
"""Initialize violation recovery manager."""
self.violation_history: List[ViolationEvent] = []
self.recovery_handlers: Dict[RecoveryStrategy, RecoveryHandler] = {}
self.circuit_breaker = SystemCircuitBreaker(CircuitBreakerConfig())
self.active_recoveries: Dict[str, asyncio.Task] = {}
self.manager_lock = threading.Lock()
# Register default recovery handlers
self.recovery_handlers[RecoveryStrategy.AUTOMATIC] = AutomaticRecoveryHandler()
self.recovery_handlers[RecoveryStrategy.GUIDED] = GuidedRecoveryHandler()
async def handle_violation(
self,
violation: Union[
SecurityViolation, ContractViolationError, InvariantViolation, Exception
],
context: Optional[Dict[str, Any]] = None,
) -> RecoveryResult:
"""
Handle violation with appropriate recovery strategy.
Main entry point for all violation handling with automatic recovery
attempt, escalation if needed, and comprehensive audit logging.
"""
context = context or {}
# Convert to unified violation event
if isinstance(violation, SecurityViolation):
event = ViolationEvent.from_security_violation(violation, context)
elif isinstance(violation, ContractViolationError):
event = ViolationEvent.from_contract_violation(violation, context)
elif isinstance(violation, InvariantViolation):
event = ViolationEvent.from_invariant_violation(violation, context)
else:
# Generic exception handling
event = ViolationEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
violation_type=ViolationType.SYSTEM_FAILURE,
severity=ViolationSeverity.HIGH,
source_module=context.get("module", "unknown"),
source_function=context.get("function", "unknown"),
description=str(violation),
affected_resources=context.get("resources", []),
context_data=context,
stack_trace=traceback.format_exc(),
recovery_strategy=RecoveryStrategy.MANUAL,
)
# Record violation in circuit breaker
self.circuit_breaker.record_violation(event.severity)
# Check if operations can proceed
if not self.circuit_breaker.can_proceed():
return RecoveryResult(
success=False,
strategy_used=RecoveryStrategy.SHUTDOWN,
actions_taken=["Circuit breaker tripped - system protected"],
time_taken=0.0,
error_message="System under protection due to excessive violations",
requires_escalation=True,
)
# Store violation in history
with self.manager_lock:
self.violation_history.append(event)
# Keep only last 1000 violations
if len(self.violation_history) > 1000:
self.violation_history = self.violation_history[-1000:]
try:
# Attempt recovery based on strategy
recovery_result = await self._attempt_recovery(event)
# Record success/failure for circuit breaker
if recovery_result.success:
self.circuit_breaker.record_success()
# Log violation and recovery attempt
await self._log_violation_event(event, recovery_result)
return recovery_result
except Exception as e:
# Recovery itself failed
failure_result = RecoveryResult(
success=False,
strategy_used=event.recovery_strategy or RecoveryStrategy.MANUAL,
actions_taken=["Recovery handler failed"],
time_taken=0.0,
error_message=f"Recovery handler exception: {e}",
requires_escalation=True,
)
await self._log_violation_event(event, failure_result)
return failure_result
async def _attempt_recovery(self, event: ViolationEvent) -> RecoveryResult:
"""Attempt recovery using appropriate strategy."""
# Determine recovery strategy
strategy = event.recovery_strategy or self._determine_recovery_strategy(event)
# Get appropriate handler
handler = self.recovery_handlers.get(strategy)
if not handler:
return RecoveryResult(
success=False,
strategy_used=strategy,
actions_taken=["No recovery handler available"],
time_taken=0.0,
error_message=f"No handler for strategy: {strategy}",
requires_escalation=True,
)
# Execute recovery with timeout
try:
recovery_task = asyncio.create_task(handler.handle_violation(event))
self.active_recoveries[event.event_id] = recovery_task
# Set timeout based on severity
timeout = 30 if event.severity != ViolationSeverity.EMERGENCY else 5
result = await asyncio.wait_for(recovery_task, timeout=timeout)
# Clean up completed task
del self.active_recoveries[event.event_id]
return result
except asyncio.TimeoutError:
# Recovery timed out
recovery_task.cancel()
del self.active_recoveries[event.event_id]
return RecoveryResult(
success=False,
strategy_used=strategy,
actions_taken=["Recovery attempt timed out"],
time_taken=timeout,
error_message=f"Recovery timeout after {timeout} seconds",
requires_escalation=True,
)
def _determine_recovery_strategy(self, event: ViolationEvent) -> RecoveryStrategy:
"""Determine appropriate recovery strategy based on violation."""
# Emergency violations require immediate escalation
if event.severity == ViolationSeverity.EMERGENCY:
return RecoveryStrategy.ESCALATE
# Security violations always escalate
if event.violation_type == ViolationType.SECURITY_VIOLATION:
return RecoveryStrategy.ESCALATE
# Critical violations require guided recovery
if event.severity == ViolationSeverity.CRITICAL:
return RecoveryStrategy.GUIDED
# Low severity violations can be handled automatically
if event.severity == ViolationSeverity.LOW:
return RecoveryStrategy.AUTOMATIC
# Default to guided recovery
return RecoveryStrategy.GUIDED
async def _log_violation_event(
self, event: ViolationEvent, recovery_result: RecoveryResult
) -> None:
"""Log violation event and recovery result to audit system."""
try:
audit_logger = get_audit_logger()
# Determine audit level based on severity
audit_levels = {
ViolationSeverity.LOW: AuditLevel.INFO,
ViolationSeverity.MEDIUM: AuditLevel.WARNING,
ViolationSeverity.HIGH: AuditLevel.ERROR,
ViolationSeverity.CRITICAL: AuditLevel.CRITICAL,
ViolationSeverity.EMERGENCY: AuditLevel.CRITICAL,
}
audit_level = audit_levels.get(event.severity, AuditLevel.ERROR)
await audit_logger.log_event(
level=audit_level,
category=AuditCategory.ERROR_HANDLING,
operation="violation_recovery",
resource_type=event.violation_type.value,
resource_id=event.event_id,
success=recovery_result.success,
error_message=(
event.description if not recovery_result.success else None
),
metadata={
"violation_event": {
"event_id": event.event_id,
"timestamp": event.timestamp.isoformat(),
"source_module": event.source_module,
"source_function": event.source_function,
"affected_resources": event.affected_resources,
"severity": event.severity.name,
"context_data": event.context_data,
},
"recovery_result": {
"success": recovery_result.success,
"strategy_used": recovery_result.strategy_used.value,
"actions_taken": recovery_result.actions_taken,
"time_taken": recovery_result.time_taken,
"requires_escalation": recovery_result.requires_escalation,
"error_message": recovery_result.error_message,
},
},
)
except Exception:
# Don't fail violation handling if audit logging fails
pass
def get_violation_statistics(self) -> Dict[str, Any]:
"""Get statistics about recent violations and recovery attempts."""
with self.manager_lock:
if not self.violation_history:
return {"status": "no_violations", "message": "No violations recorded"}
recent_violations = [
v
for v in self.violation_history
if v.timestamp > datetime.utcnow() - timedelta(hours=24)
]
# Count by type and severity
by_type = defaultdict(int)
by_severity = defaultdict(int)
for violation in recent_violations:
by_type[violation.violation_type.value] += 1
by_severity[violation.severity.name] += 1
return {
"total_violations": len(self.violation_history),
"recent_violations_24h": len(recent_violations),
"violations_by_type": dict(by_type),
"violations_by_severity": dict(by_severity),
"circuit_breaker_state": self.circuit_breaker.state.value,
"active_recoveries": len(self.active_recoveries),
"last_violation": self.violation_history[-1].timestamp.isoformat(),
}
# Global violation recovery manager instance
_recovery_manager_instance: Optional[ViolationRecoveryManager] = None
def get_recovery_manager() -> ViolationRecoveryManager:
"""Get global violation recovery manager instance."""
global _recovery_manager_instance
if _recovery_manager_instance is None:
_recovery_manager_instance = ViolationRecoveryManager()
return _recovery_manager_instance
# Convenience functions for violation handling
async def handle_security_violation(
violation: SecurityViolation, context: Dict[str, Any] = None
) -> RecoveryResult:
"""Handle security violation with recovery attempt."""
manager = get_recovery_manager()
return await manager.handle_violation(violation, context)
async def handle_contract_violation(
error: ContractViolationError, context: Dict[str, Any] = None
) -> RecoveryResult:
"""Handle contract violation with recovery attempt."""
manager = get_recovery_manager()
return await manager.handle_violation(error, context)
async def handle_invariant_violation(
violation: InvariantViolation, context: Dict[str, Any] = None
) -> RecoveryResult:
"""Handle invariant violation with recovery attempt."""
manager = get_recovery_manager()
return await manager.handle_violation(violation, context)
async def handle_system_exception(
exception: Exception, context: Dict[str, Any] = None
) -> RecoveryResult:
"""Handle general system exception with recovery attempt."""
manager = get_recovery_manager()
return await manager.handle_violation(exception, context)
def get_system_violation_status() -> Dict[str, Any]:
"""Get current system violation status and statistics."""
manager = get_recovery_manager()
return manager.get_violation_statistics()