"""
Agent Orchestration Platform - Comprehensive Error Handling
This module provides comprehensive error handling for the Agent Orchestration Platform,
implementing security-preserving error handling with detailed logging and recovery mechanisms.
Architecture Integration:
- Design Patterns: Chain of Responsibility for error handlers, Strategy for different error types
- Security Model: Error handling that preserves security boundaries and prevents information leakage
- Performance Profile: O(1) error handling with efficient error categorization and logging
Technical Decisions:
- Security-First Errors: Error messages that don't leak sensitive information
- Structured Error Hierarchy: Comprehensive error categorization for intelligent handling
- Recovery Mechanisms: Automatic recovery strategies for different error types
- Audit Integration: Complete error audit trail with security context
Dependencies & Integration:
- External: None beyond standard library for maximum reliability
- Internal: Audit logging for error persistence, security contracts for access control
Quality Assurance:
- Test Coverage: Property-based testing for error handling and recovery mechanisms
- Error Handling: Meta-error handling for error handling system failures
Author: Adder_5 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import logging
import sys
import traceback
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Type, Union
# Import audit logging for error persistence
from src.boundaries.audit import AuditCategory, AuditLevel, get_audit_logger
# Import OperationError from communication types
from src.models.communication import OperationError
from .contracts_shim import ensure, require
class ErrorSeverity(Enum):
"""Error severity levels for classification and response."""
LOW = auto()
MEDIUM = auto()
HIGH = auto()
CRITICAL = auto()
class ErrorCategory(Enum):
"""Error categories for intelligent handling and routing."""
VALIDATION = "validation"
AUTHENTICATION = "authentication"
AUTHORIZATION = "authorization"
RESOURCE = "resource"
NETWORK = "network"
SYSTEM = "system"
BUSINESS_LOGIC = "business_logic"
EXTERNAL_SERVICE = "external_service"
CONFIGURATION = "configuration"
UNKNOWN = "unknown"
@dataclass
class ErrorContext:
"""
Comprehensive error context with security-aware information.
Provides detailed error context while ensuring sensitive information
is not exposed in error messages or logs.
"""
error_id: str
timestamp: datetime
severity: ErrorSeverity
category: ErrorCategory
operation: str
component: str
user_id: Optional[str]
session_id: Optional[str]
request_id: Optional[str]
# Technical details (internal use only)
exception_type: str
error_message: str
stack_trace: Optional[str]
# Context information
metadata: Dict[str, Any] = field(default_factory=dict)
def to_user_message(self) -> str:
"""Generate user-safe error message that doesn't leak sensitive information."""
base_messages = {
ErrorCategory.VALIDATION: "Invalid input provided. Please check your request and try again.",
ErrorCategory.AUTHENTICATION: "Authentication failed. Please verify your credentials.",
ErrorCategory.AUTHORIZATION: "Access denied. You don't have permission for this operation.",
ErrorCategory.RESOURCE: "Resource temporarily unavailable. Please try again later.",
ErrorCategory.NETWORK: "Network error occurred. Please check your connection and retry.",
ErrorCategory.SYSTEM: "System error occurred. Please try again later.",
ErrorCategory.BUSINESS_LOGIC: "Operation cannot be completed. Please check your request.",
ErrorCategory.EXTERNAL_SERVICE: "External service unavailable. Please try again later.",
ErrorCategory.CONFIGURATION: "Service configuration error. Please contact support.",
ErrorCategory.UNKNOWN: "An unexpected error occurred. Please try again later.",
}
base_message = base_messages.get(
self.category, base_messages[ErrorCategory.UNKNOWN]
)
if self.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
return f"{base_message} Error ID: {self.error_id}"
else:
return base_message
def to_audit_dict(self) -> Dict[str, Any]:
"""Convert error context to dictionary for audit logging."""
return {
"error_id": self.error_id,
"timestamp": self.timestamp.isoformat(),
"severity": self.severity.name,
"category": self.category.value,
"operation": self.operation,
"component": self.component,
"user_id": self.user_id,
"session_id": self.session_id,
"request_id": self.request_id,
"exception_type": self.exception_type,
"error_message": self.error_message,
"has_stack_trace": self.stack_trace is not None,
"metadata": self.metadata,
}
class AgentOrchestrationError(Exception):
"""
Base exception class for Agent Orchestration Platform with security context.
Provides comprehensive error handling with security-aware error messages
and automatic audit logging integration.
"""
def __init__(
self,
message: str,
error_context: Optional[ErrorContext] = None,
cause: Optional[Exception] = None,
):
"""Initialize error with context and cause."""
super().__init__(message)
self.error_context = error_context
self.cause = cause
# Generate error ID if not provided
if self.error_context and not self.error_context.error_id:
import uuid
self.error_context.error_id = str(uuid.uuid4())
class ValidationError(AgentOrchestrationError):
"""Input validation errors with detailed field information."""
def __init__(
self, message: str, field: Optional[str] = None, value: Optional[str] = None
):
error_context = ErrorContext(
error_id="",
timestamp=datetime.utcnow(),
severity=ErrorSeverity.LOW,
category=ErrorCategory.VALIDATION,
operation="validation",
component="input_validator",
user_id=None,
session_id=None,
request_id=None,
exception_type="ValidationError",
error_message=message,
stack_trace=None,
metadata={"field": field, "value": value[:50] if value else None},
)
super().__init__(message, error_context)
class AuthenticationError(AgentOrchestrationError):
"""Authentication failures with security logging."""
def __init__(self, message: str, user_id: Optional[str] = None):
error_context = ErrorContext(
error_id="",
timestamp=datetime.utcnow(),
severity=ErrorSeverity.HIGH,
category=ErrorCategory.AUTHENTICATION,
operation="authentication",
component="auth_provider",
user_id=user_id,
session_id=None,
request_id=None,
exception_type="AuthenticationError",
error_message=message,
stack_trace=None,
)
super().__init__(message, error_context)
class AuthorizationError(AgentOrchestrationError):
"""Authorization failures with permission context."""
def __init__(
self,
message: str,
user_id: Optional[str] = None,
required_permission: Optional[str] = None,
):
error_context = ErrorContext(
error_id="",
timestamp=datetime.utcnow(),
severity=ErrorSeverity.HIGH,
category=ErrorCategory.AUTHORIZATION,
operation="authorization",
component="security_contracts",
user_id=user_id,
session_id=None,
request_id=None,
exception_type="AuthorizationError",
error_message=message,
stack_trace=None,
metadata={"required_permission": required_permission},
)
super().__init__(message, error_context)
class ResourceError(AgentOrchestrationError):
"""Resource availability and management errors."""
def __init__(
self,
message: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
):
error_context = ErrorContext(
error_id="",
timestamp=datetime.utcnow(),
severity=ErrorSeverity.MEDIUM,
category=ErrorCategory.RESOURCE,
operation="resource_management",
component="resource_manager",
user_id=None,
session_id=None,
request_id=None,
exception_type="ResourceError",
error_message=message,
stack_trace=None,
metadata={"resource_type": resource_type, "resource_id": resource_id},
)
super().__init__(message, error_context)
class SystemError(AgentOrchestrationError):
"""System-level errors with comprehensive diagnostics."""
def __init__(
self,
message: str,
component: str,
operation: str,
cause: Optional[Exception] = None,
):
error_context = ErrorContext(
error_id="",
timestamp=datetime.utcnow(),
severity=ErrorSeverity.CRITICAL,
category=ErrorCategory.SYSTEM,
operation=operation,
component=component,
user_id=None,
session_id=None,
request_id=None,
exception_type="SystemError",
error_message=message,
stack_trace=traceback.format_exc() if cause else None,
)
super().__init__(message, error_context, cause)
class ExternalServiceError(AgentOrchestrationError):
"""External service integration errors."""
def __init__(
self,
message: str,
service_name: str,
operation: str,
status_code: Optional[int] = None,
):
error_context = ErrorContext(
error_id="",
timestamp=datetime.utcnow(),
severity=ErrorSeverity.MEDIUM,
category=ErrorCategory.EXTERNAL_SERVICE,
operation=operation,
component=f"external_service_{service_name}",
user_id=None,
session_id=None,
request_id=None,
exception_type="ExternalServiceError",
error_message=message,
stack_trace=None,
metadata={"service_name": service_name, "status_code": status_code},
)
super().__init__(message, error_context)
class ErrorHandler:
"""
Comprehensive error handler with intelligent recovery and audit logging.
Implements sophisticated error handling with automatic categorization,
security-aware logging, and intelligent recovery strategies.
"""
def __init__(self):
"""Initialize error handler."""
self._error_handlers: Dict[Type[Exception], Callable] = {}
self._recovery_strategies: Dict[ErrorCategory, Callable] = {}
self._audit_logger = None
self._error_stats = {
"total_errors": 0,
"by_category": {category.value: 0 for category in ErrorCategory},
"by_severity": {severity.name: 0 for severity in ErrorSeverity},
}
def register_handler(
self, exception_type: Type[Exception], handler: Callable
) -> None:
"""Register custom error handler for specific exception type."""
self._error_handlers[exception_type] = handler
def register_recovery_strategy(
self, category: ErrorCategory, strategy: Callable
) -> None:
"""Register recovery strategy for error category."""
self._recovery_strategies[category] = strategy
@require(lambda exception: isinstance(exception, Exception))
async def handle_error(
self,
exception: Exception,
operation: str = "unknown",
component: str = "unknown",
user_id: Optional[str] = None,
session_id: Optional[str] = None,
request_id: Optional[str] = None,
) -> ErrorContext:
"""
Handle exception with comprehensive error processing.
Contracts:
Preconditions:
- Exception is valid exception object
Postconditions:
- Error context is created and populated
- Error is logged to audit system
- Recovery strategy is attempted if available
Invariants:
- Error handling never raises additional exceptions
- Security context is preserved
- Audit trail is maintained
"""
try:
# Create or extract error context
if (
isinstance(exception, AgentOrchestrationError)
and exception.error_context
):
error_context = exception.error_context
# Update context with additional information
if not error_context.user_id:
error_context.user_id = user_id
if not error_context.session_id:
error_context.session_id = session_id
if not error_context.request_id:
error_context.request_id = request_id
else:
error_context = self._create_error_context(
exception, operation, component, user_id, session_id, request_id
)
# Update error statistics
self._update_error_stats(error_context)
# Log error to audit system
await self._log_error(error_context)
# Apply custom error handler if registered
exception_type = type(exception)
if exception_type in self._error_handlers:
try:
await self._error_handlers[exception_type](exception, error_context)
except Exception:
pass # Don't fail on handler errors
# Attempt recovery strategy
await self._attempt_recovery(error_context)
return error_context
except Exception as meta_error:
# Meta-error handling: handle errors in error handling
return await self._handle_meta_error(
meta_error, exception, operation, component
)
def _create_error_context(
self,
exception: Exception,
operation: str,
component: str,
user_id: Optional[str],
session_id: Optional[str],
request_id: Optional[str],
) -> ErrorContext:
"""Create error context from exception."""
import uuid
# Categorize error
category = self._categorize_error(exception)
severity = self._determine_severity(exception, category)
return ErrorContext(
error_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
severity=severity,
category=category,
operation=operation,
component=component,
user_id=user_id,
session_id=session_id,
request_id=request_id,
exception_type=type(exception).__name__,
error_message=str(exception),
stack_trace=traceback.format_exc(),
)
def _categorize_error(self, exception: Exception) -> ErrorCategory:
"""Automatically categorize error based on exception type and message."""
exception_type = type(exception).__name__.lower()
exception_message = str(exception).lower()
# Type-based categorization
if "validation" in exception_type or "value" in exception_type:
return ErrorCategory.VALIDATION
elif "auth" in exception_type or "permission" in exception_type:
return ErrorCategory.AUTHENTICATION
elif "connection" in exception_type or "network" in exception_type:
return ErrorCategory.NETWORK
elif "resource" in exception_type or "memory" in exception_type:
return ErrorCategory.RESOURCE
elif "config" in exception_type or "setting" in exception_type:
return ErrorCategory.CONFIGURATION
# Message-based categorization
if any(
word in exception_message for word in ["invalid", "validation", "format"]
):
return ErrorCategory.VALIDATION
elif any(
word in exception_message
for word in ["auth", "token", "permission", "access"]
):
return ErrorCategory.AUTHENTICATION
elif any(
word in exception_message for word in ["connection", "network", "timeout"]
):
return ErrorCategory.NETWORK
elif any(
word in exception_message for word in ["resource", "memory", "disk", "cpu"]
):
return ErrorCategory.RESOURCE
elif any(
word in exception_message for word in ["config", "setting", "environment"]
):
return ErrorCategory.CONFIGURATION
return ErrorCategory.UNKNOWN
def _determine_severity(
self, exception: Exception, category: ErrorCategory
) -> ErrorSeverity:
"""Determine error severity based on type and category."""
# High severity categories
if category in [ErrorCategory.AUTHENTICATION, ErrorCategory.AUTHORIZATION]:
return ErrorSeverity.HIGH
elif category == ErrorCategory.SYSTEM:
return ErrorSeverity.CRITICAL
# Exception-type based severity
if isinstance(exception, (SystemError, OSError)):
return ErrorSeverity.CRITICAL
elif isinstance(exception, (ConnectionError, TimeoutError)):
return ErrorSeverity.HIGH
elif isinstance(exception, (ValueError, TypeError)):
return ErrorSeverity.LOW
return ErrorSeverity.MEDIUM
def _update_error_stats(self, error_context: ErrorContext) -> None:
"""Update error statistics for monitoring."""
self._error_stats["total_errors"] += 1
self._error_stats["by_category"][error_context.category.value] += 1
self._error_stats["by_severity"][error_context.severity.name] += 1
async def _log_error(self, error_context: ErrorContext) -> None:
"""Log error to audit system."""
try:
if not self._audit_logger:
self._audit_logger = get_audit_logger()
audit_level = {
ErrorSeverity.LOW: AuditLevel.INFO,
ErrorSeverity.MEDIUM: AuditLevel.WARNING,
ErrorSeverity.HIGH: AuditLevel.ERROR,
ErrorSeverity.CRITICAL: AuditLevel.CRITICAL,
}[error_context.severity]
await self._audit_logger.log_event(
level=audit_level,
category=AuditCategory.ERROR_HANDLING,
operation=error_context.operation,
resource_type=error_context.component,
resource_id=error_context.error_id,
success=False,
user_id=error_context.user_id,
session_id=error_context.session_id,
error_message=error_context.error_message,
metadata=error_context.to_audit_dict(),
)
except Exception:
# Don't fail error handling due to audit logging failures
pass
async def _attempt_recovery(self, error_context: ErrorContext) -> None:
"""Attempt automatic recovery based on error category."""
if error_context.category in self._recovery_strategies:
try:
recovery_strategy = self._recovery_strategies[error_context.category]
await recovery_strategy(error_context)
except Exception:
# Don't fail on recovery strategy errors
pass
async def _handle_meta_error(
self,
meta_error: Exception,
original_exception: Exception,
operation: str,
component: str,
) -> ErrorContext:
"""Handle errors that occur during error handling."""
import uuid
# Create minimal error context for meta-error
error_context = ErrorContext(
error_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
severity=ErrorSeverity.CRITICAL,
category=ErrorCategory.SYSTEM,
operation=operation,
component="error_handler",
user_id=None,
session_id=None,
request_id=None,
exception_type="MetaError",
error_message=f"Error handling failed: {meta_error}",
stack_trace=None,
metadata={
"original_exception": str(original_exception),
"meta_error": str(meta_error),
},
)
# Log to standard logger as fallback
logging.error(f"Meta-error in error handling: {meta_error}")
return error_context
def get_error_stats(self) -> Dict[str, Any]:
"""Get error statistics for monitoring."""
return self._error_stats.copy()
# Global error handler instance
_error_handler_instance: Optional[ErrorHandler] = None
def get_error_handler() -> ErrorHandler:
"""Get global error handler instance."""
global _error_handler_instance
if _error_handler_instance is None:
_error_handler_instance = ErrorHandler()
return _error_handler_instance
async def handle_error(
exception: Exception,
operation: str = "unknown",
component: str = "unknown",
user_id: Optional[str] = None,
session_id: Optional[str] = None,
request_id: Optional[str] = None,
) -> ErrorContext:
"""Convenience function for error handling."""
error_handler = get_error_handler()
return await error_handler.handle_error(
exception, operation, component, user_id, session_id, request_id
)
def error_boundary(
operation: str, component: str, reraise: bool = True, return_on_error: Any = None
):
"""
Decorator for automatic error handling with boundary enforcement.
Provides comprehensive error handling decorator that automatically
captures and handles errors while preserving function signatures.
"""
def decorator(func):
if asyncio.iscoroutinefunction(func):
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
error_context = await handle_error(e, operation, component)
if reraise:
raise
return return_on_error
return async_wrapper
else:
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
asyncio.create_task(handle_error(e, operation, component))
if reraise:
raise
return return_on_error
return sync_wrapper
return decorator