self_healing.pyโข34.9 kB
"""
Enhanced Error Recovery and Self-Healing System - Phase 3
Advanced self-healing capabilities with predictive error prevention,
automatic recovery, and adaptive resilience strategies.
"""
import asyncio
import hashlib
import json
import traceback
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple, Callable
from dataclasses import dataclass, field
from enum import Enum
import uuid
import logging
import statistics
from .data_models import (
FeedbackEvent, PerformanceSnapshot, CapabilityProfile,
LearningRecord, AdaptationProposal
)
logger = logging.getLogger(__name__)
class ErrorSeverity(Enum):
"""Severity levels for errors"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class RecoveryStrategy(Enum):
"""Types of recovery strategies"""
RETRY = "retry"
RESTART = "restart"
FAILOVER = "failover"
DEGRADE = "degrade"
CIRCUIT_BREAKER = "circuit_breaker"
RESOURCE_SCALE = "resource_scale"
CONFIG_ROLLBACK = "config_rollback"
CACHE_CLEAR = "cache_clear"
class HealingMethod(Enum):
"""Self-healing methods"""
REACTIVE = "reactive" # Heal after error occurs
PREDICTIVE = "predictive" # Heal before error occurs
PREVENTIVE = "preventive" # Prevent errors from occurring
ADAPTIVE = "adaptive" # Adapt to prevent future errors
@dataclass
class ErrorPattern:
"""Pattern of errors that can be recognized and healed"""
pattern_id: str = field(default_factory=lambda: str(uuid.uuid4()))
created_at: datetime = field(default_factory=datetime.now)
# Pattern definition
error_type: str = ""
error_signature: str = "" # Unique signature of the error
context_patterns: Dict[str, Any] = field(default_factory=dict)
# Pattern characteristics
frequency_threshold: int = 3 # Minimum occurrences to recognize pattern
time_window: int = 300 # Time window in seconds
severity_distribution: Dict[str, int] = field(default_factory=dict)
# Healing strategy
recommended_strategy: RecoveryStrategy = RecoveryStrategy.RETRY
healing_confidence: float = 0.0 # 0-1
auto_heal_enabled: bool = False
# Pattern statistics
occurrence_count: int = 0
successful_heals: int = 0
failed_heals: int = 0
last_seen: Optional[datetime] = None
# Learning data
contributing_factors: List[str] = field(default_factory=list)
prevention_methods: List[str] = field(default_factory=list)
# Status
status: str = "active" # active, deprecated, resolved
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HealingAction:
"""A specific healing action to be taken"""
action_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
# Action details
strategy: RecoveryStrategy = RecoveryStrategy.RETRY
method: HealingMethod = HealingMethod.REACTIVE
target_capability_id: str = ""
target_component: str = ""
# Action parameters
parameters: Dict[str, Any] = field(default_factory=dict)
timeout: float = 30.0
max_attempts: int = 3
# Execution context
trigger_error_id: str = ""
trigger_pattern_id: Optional[str] = None
execution_context: Dict[str, Any] = field(default_factory=dict)
# Results
status: str = "pending" # pending, executing, completed, failed, cancelled
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
execution_time: float = 0.0
# Outcome
success: bool = False
error_message: Optional[str] = None
side_effects: List[str] = field(default_factory=list)
# Verification
verification_required: bool = True
verification_passed: Optional[bool] = None
verification_metrics: Dict[str, Any] = field(default_factory=dict)
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HealthStatus:
"""Health status of a capability or component"""
status_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
# Target
target_type: str = "" # capability, component, system
target_id: str = ""
# Health indicators
overall_health: float = 1.0 # 0-1, 1 = perfectly healthy
availability: float = 1.0 # 0-1
performance: float = 1.0 # 0-1
error_rate: float = 0.0 # 0-1
resource_utilization: float = 0.5 # 0-1
# Health trends
health_trend: str = "stable" # improving, stable, degrading
performance_trend: str = "stable"
error_trend: str = "stable"
# Recent issues
recent_errors: List[str] = field(default_factory=list)
recent_healing_actions: List[str] = field(default_factory=list)
active_alerts: List[str] = field(default_factory=list)
# Predictions
predicted_health: Optional[float] = None
time_to_degradation: Optional[float] = None # minutes
risk_factors: List[str] = field(default_factory=list)
# Status classification
status_level: str = "healthy" # healthy, warning, critical, failed
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ResiliencePolicy:
"""Policy for building resilience"""
policy_id: str = field(default_factory=lambda: str(uuid.uuid4()))
created_at: datetime = field(default_factory=datetime.now)
# Policy scope
target_capability_id: str = ""
target_components: List[str] = field(default_factory=list)
# Resilience requirements
availability_target: float = 0.99 # 99% availability
recovery_time_objective: float = 5.0 # RTO in minutes
recovery_point_objective: float = 1.0 # RPO in minutes
# Healing strategies
allowed_strategies: List[RecoveryStrategy] = field(default_factory=list)
strategy_priorities: Dict[RecoveryStrategy, int] = field(default_factory=dict)
auto_heal_threshold: ErrorSeverity = ErrorSeverity.HIGH
# Prevention measures
preventive_actions: List[str] = field(default_factory=list)
monitoring_requirements: List[str] = field(default_factory=list)
# Adaptation rules
adaptation_enabled: bool = True
learning_enabled: bool = True
strategy_evolution: bool = True
# Status
status: str = "active" # active, disabled, deprecated
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
class SelfHealingEngine:
"""Advanced self-healing and error recovery system"""
def __init__(self):
# Storage
self.error_patterns: Dict[str, ErrorPattern] = {}
self.healing_actions: Dict[str, HealingAction] = {}
self.health_status: Dict[str, HealthStatus] = {}
self.resilience_policies: Dict[str, ResiliencePolicy] = {}
# Error tracking
self.error_history: Dict[str, List[Tuple[datetime, Dict[str, Any]]]] = {}
self.active_errors: Dict[str, Dict[str, Any]] = {}
# Healing state
self.healing_lock = asyncio.Lock()
self.active_healing: Dict[str, asyncio.Task] = {}
self.circuit_breakers: Dict[str, Dict[str, Any]] = {}
# Health monitoring
self.health_monitors: Dict[str, asyncio.Task] = {}
self.health_check_interval = 60 # seconds
# Configuration
self.max_concurrent_healings = 3
self.default_retry_attempts = 3
self.default_retry_delay = 1.0
self.circuit_breaker_threshold = 5
self.circuit_breaker_timeout = 300 # seconds
# Healing strategies registry
self.healing_strategies: Dict[RecoveryStrategy, Callable] = {}
self._register_healing_strategies()
logger.info("SelfHealingEngine initialized")
def _register_healing_strategies(self):
"""Register available healing strategies"""
self.healing_strategies[RecoveryStrategy.RETRY] = self._retry_strategy
self.healing_strategies[RecoveryStrategy.RESTART] = self._restart_strategy
self.healing_strategies[RecoveryStrategy.FAILOVER] = self._failover_strategy
self.healing_strategies[RecoveryStrategy.DEGRADE] = self._degrade_strategy
self.healing_strategies[RecoveryStrategy.CIRCUIT_BREAKER] = self._circuit_breaker_strategy
self.healing_strategies[RecoveryStrategy.RESOURCE_SCALE] = self._resource_scale_strategy
self.healing_strategies[RecoveryStrategy.CONFIG_ROLLBACK] = self._config_rollback_strategy
self.healing_strategies[RecoveryStrategy.CACHE_CLEAR] = self._cache_clear_strategy
async def report_error(
self,
capability_id: str,
error_type: str,
error_message: str,
context: Optional[Dict[str, Any]] = None,
severity: ErrorSeverity = ErrorSeverity.MEDIUM
) -> str:
"""Report an error for healing analysis"""
error_id = str(uuid.uuid4())
timestamp = datetime.now()
error_data = {
"error_id": error_id,
"capability_id": capability_id,
"error_type": error_type,
"error_message": error_message,
"severity": severity.value,
"context": context or {},
"timestamp": timestamp,
"stack_trace": traceback.format_exc() if traceback.format_exc().strip() != "NoneType: None" else None
}
# Store error
if capability_id not in self.error_history:
self.error_history[capability_id] = []
self.error_history[capability_id].append((timestamp, error_data))
self.active_errors[error_id] = error_data
# Keep history manageable
if len(self.error_history[capability_id]) > 1000:
self.error_history[capability_id] = self.error_history[capability_id][-500:]
# Analyze for patterns
asyncio.create_task(self._analyze_error_pattern(error_data))
# Trigger immediate healing if critical
if severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
asyncio.create_task(self._trigger_immediate_healing(error_data))
logger.info(f"Error reported: {error_type} for {capability_id} (severity: {severity.value})")
return error_id
async def _analyze_error_pattern(self, error_data: Dict[str, Any]):
"""Analyze error for pattern recognition"""
async with self.healing_lock:
capability_id = error_data["capability_id"]
error_type = error_data["error_type"]
# Generate error signature
signature = self._generate_error_signature(error_data)
# Look for existing patterns
matching_pattern = None
for pattern in self.error_patterns.values():
if (pattern.error_type == error_type and
pattern.error_signature == signature and
pattern.status == "active"):
matching_pattern = pattern
break
if matching_pattern:
# Update existing pattern
matching_pattern.occurrence_count += 1
matching_pattern.last_seen = error_data["timestamp"]
# Update severity distribution
severity = error_data["severity"]
matching_pattern.severity_distribution[severity] = (
matching_pattern.severity_distribution.get(severity, 0) + 1
)
# Check if pattern meets healing threshold
if (matching_pattern.occurrence_count >= matching_pattern.frequency_threshold and
matching_pattern.auto_heal_enabled):
await self._trigger_pattern_healing(matching_pattern, error_data)
else:
# Check if we have enough occurrences to create a pattern
recent_errors = [
err for _, err in self.error_history.get(capability_id, [])
if (error_data["timestamp"] - err["timestamp"]).total_seconds() < 300 # 5 minutes
]
similar_errors = [
err for err in recent_errors
if (err["error_type"] == error_type and
self._generate_error_signature(err) == signature)
]
if len(similar_errors) >= 3: # Minimum pattern threshold
# Create new pattern
pattern = ErrorPattern(
error_type=error_type,
error_signature=signature,
occurrence_count=len(similar_errors),
last_seen=error_data["timestamp"],
severity_distribution={error_data["severity"]: 1}
)
# Determine recommended strategy
pattern.recommended_strategy = await self._recommend_healing_strategy(similar_errors)
pattern.healing_confidence = await self._calculate_healing_confidence(similar_errors)
self.error_patterns[pattern.pattern_id] = pattern
logger.info(f"Created new error pattern: {pattern.pattern_id}")
def _generate_error_signature(self, error_data: Dict[str, Any]) -> str:
"""Generate a unique signature for an error"""
# Use error type and key context elements
signature_parts = [
error_data["error_type"],
error_data.get("capability_id", ""),
]
# Add context signature if available
context = error_data.get("context", {})
if context:
context_signature = json.dumps(context, sort_keys=True)
signature_parts.append(context_signature)
# Create hash
signature_string = "|".join(signature_parts)
return hashlib.sha256(signature_string.encode()).hexdigest()[:16]
async def _recommend_healing_strategy(self, errors: List[Dict[str, Any]]) -> RecoveryStrategy:
"""Recommend healing strategy based on error patterns"""
if not errors:
return RecoveryStrategy.RETRY
# Analyze error characteristics
error_types = list(set(err["error_type"] for err in errors))
severities = [err["severity"] for err in errors]
# Strategy selection logic
if "timeout" in error_types or "connection" in error_types:
return RecoveryStrategy.RETRY
elif "memory" in error_types or "resource" in error_types:
return RecoveryStrategy.RESOURCE_SCALE
elif "crash" in error_types or "fatal" in error_types:
return RecoveryStrategy.RESTART
elif any(s in ["critical", "high"] for s in severities):
return RecoveryStrategy.FAILOVER
elif len(errors) > 5: # High frequency errors
return RecoveryStrategy.CIRCUIT_BREAKER
else:
return RecoveryStrategy.RETRY
async def _calculate_healing_confidence(self, errors: List[Dict[str, Any]]) -> float:
"""Calculate confidence in healing based on error patterns"""
if not errors:
return 0.0
# Base confidence on consistency
error_types = [err["error_type"] for err in errors]
type_consistency = max(error_types.count(t) for t in set(error_types)) / len(error_types)
# Adjust for severity consistency
severities = [err["severity"] for err in errors]
severity_consistency = max(severities.count(s) for s in set(severities)) / len(severities)
# Adjust for sample size
sample_factor = min(1.0, len(errors) / 10)
# Combined confidence
confidence = (type_consistency * 0.4 + severity_consistency * 0.3 + sample_factor * 0.3)
return min(1.0, confidence)
async def _trigger_immediate_healing(self, error_data: Dict[str, Any]):
"""Trigger immediate healing for critical errors"""
if len(self.active_healing) >= self.max_concurrent_healings:
logger.warning("Maximum concurrent healings reached, queuing critical healing")
await asyncio.sleep(1.0)
return await self._trigger_immediate_healing(error_data)
# Determine strategy based on severity and type
severity = ErrorSeverity(error_data["severity"])
strategy = await self._recommend_healing_strategy([error_data])
# Create healing action
action = HealingAction(
strategy=strategy,
method=HealingMethod.REACTIVE,
target_capability_id=error_data["capability_id"],
trigger_error_id=error_data["error_id"],
execution_context=error_data["context"]
)
# Execute healing
healing_task = asyncio.create_task(self._execute_healing_action(action))
self.active_healing[action.action_id] = healing_task
async def _trigger_pattern_healing(self, pattern: ErrorPattern, error_data: Dict[str, Any]):
"""Trigger healing based on recognized pattern"""
if not pattern.auto_heal_enabled:
return
if len(self.active_healing) >= self.max_concurrent_healings:
logger.warning("Maximum concurrent healings reached, queuing pattern healing")
await asyncio.sleep(1.0)
return await self._trigger_pattern_healing(pattern, error_data)
# Create healing action based on pattern
action = HealingAction(
strategy=pattern.recommended_strategy,
method=HealingMethod.ADAPTIVE,
target_capability_id=error_data["capability_id"],
trigger_error_id=error_data["error_id"],
trigger_pattern_id=pattern.pattern_id,
execution_context=error_data["context"]
)
# Execute healing
healing_task = asyncio.create_task(self._execute_healing_action(action))
self.active_healing[action.action_id] = healing_task
async def _execute_healing_action(self, action: HealingAction) -> bool:
"""Execute a healing action"""
action.status = "executing"
action.started_at = datetime.now()
try:
# Get the healing strategy function
if action.strategy not in self.healing_strategies:
raise ValueError(f"Unknown healing strategy: {action.strategy}")
strategy_func = self.healing_strategies[action.strategy]
# Execute the strategy
success = await strategy_func(action)
# Update action status
action.success = success
action.status = "completed" if success else "failed"
action.completed_at = datetime.now()
action.execution_time = (action.completed_at - action.started_at).total_seconds()
# Update pattern statistics if applicable
if action.trigger_pattern_id and action.trigger_pattern_id in self.error_patterns:
pattern = self.error_patterns[action.trigger_pattern_id]
if success:
pattern.successful_heals += 1
else:
pattern.failed_heals += 1
# Verify healing if required
if action.verification_required and success:
action.verification_passed = await self._verify_healing(action)
logger.info(f"Healing action {action.action_id} completed: {action.status}")
return success
except Exception as e:
action.status = "failed"
action.error_message = str(e)
action.completed_at = datetime.now()
logger.error(f"Healing action {action.action_id} failed: {e}")
return False
finally:
# Clean up
if action.action_id in self.active_healing:
del self.active_healing[action.action_id]
# Store action
self.healing_actions[action.action_id] = action
async def _retry_strategy(self, action: HealingAction) -> bool:
"""Execute retry healing strategy"""
max_attempts = action.parameters.get("max_attempts", self.default_retry_attempts)
delay = action.parameters.get("delay", self.default_retry_delay)
for attempt in range(max_attempts):
try:
# Simulate retry attempt
await asyncio.sleep(min(delay * (2 ** attempt), 5.0)) # Exponential backoff
# In a real implementation, this would retry the failed operation
# For now, simulate success after a few attempts
if attempt >= 1:
return True
except Exception as e:
logger.warning(f"Retry attempt {attempt + 1} failed: {e}")
continue
return False
async def _restart_strategy(self, action: HealingAction) -> bool:
"""Execute restart healing strategy"""
# Simulate restart
await asyncio.sleep(2.0)
# In a real implementation, this would restart the capability/component
logger.info(f"Restarted capability {action.target_capability_id}")
return True
async def _failover_strategy(self, action: HealingAction) -> bool:
"""Execute failover healing strategy"""
# Simulate failover
await asyncio.sleep(1.0)
# In a real implementation, this would switch to backup/standby
logger.info(f"Failed over capability {action.target_capability_id}")
return True
async def _degrade_strategy(self, action: HealingAction) -> bool:
"""Execute degrade healing strategy"""
# Simulate degradation
await asyncio.sleep(0.5)
# In a real implementation, this would switch to degraded mode
logger.info(f"Degraded capability {action.target_capability_id}")
return True
async def _circuit_breaker_strategy(self, action: HealingAction) -> bool:
"""Execute circuit breaker healing strategy"""
capability_id = action.target_capability_id
# Check if circuit breaker already exists
if capability_id not in self.circuit_breakers:
self.circuit_breakers[capability_id] = {
"state": "closed", # closed, open, half_open
"failure_count": 0,
"last_failure": None,
"success_count": 0
}
breaker = self.circuit_breakers[capability_id]
# Update failure count
breaker["failure_count"] += 1
breaker["last_failure"] = datetime.now()
# Open circuit if threshold exceeded
if breaker["failure_count"] >= self.circuit_breaker_threshold:
breaker["state"] = "open"
logger.info(f"Circuit breaker opened for {capability_id}")
return True
async def _resource_scale_strategy(self, action: HealingAction) -> bool:
"""Execute resource scaling healing strategy"""
# Simulate resource scaling
await asyncio.sleep(3.0)
# In a real implementation, this would scale up resources
logger.info(f"Scaled resources for {action.target_capability_id}")
return True
async def _config_rollback_strategy(self, action: HealingAction) -> bool:
"""Execute configuration rollback healing strategy"""
# Simulate config rollback
await asyncio.sleep(1.5)
# In a real implementation, this would rollback configuration
logger.info(f"Rolled back configuration for {action.target_capability_id}")
return True
async def _cache_clear_strategy(self, action: HealingAction) -> bool:
"""Execute cache clearing healing strategy"""
# Simulate cache clearing
await asyncio.sleep(0.2)
# In a real implementation, this would clear caches
logger.info(f"Cleared cache for {action.target_capability_id}")
return True
async def _verify_healing(self, action: HealingAction) -> bool:
"""Verify that healing was successful"""
# Simulate verification
await asyncio.sleep(1.0)
# In a real implementation, this would check if the capability is working
# For now, assume healing is successful if the action reported success
return action.success
async def start_health_monitoring(self, capability_id: str):
"""Start health monitoring for a capability"""
if capability_id in self.health_monitors:
return # Already monitoring
monitor_task = asyncio.create_task(self._health_monitor_loop(capability_id))
self.health_monitors[capability_id] = monitor_task
logger.info(f"Started health monitoring for {capability_id}")
async def stop_health_monitoring(self, capability_id: str):
"""Stop health monitoring for a capability"""
if capability_id in self.health_monitors:
self.health_monitors[capability_id].cancel()
del self.health_monitors[capability_id]
logger.info(f"Stopped health monitoring for {capability_id}")
async def _health_monitor_loop(self, capability_id: str):
"""Health monitoring loop"""
while True:
try:
# Check health
health_status = await self._check_capability_health(capability_id)
self.health_status[f"{capability_id}_{health_status.timestamp.isoformat()}"] = health_status
# Trigger preventive healing if needed
if health_status.status_level in ["warning", "critical"]:
await self._trigger_preventive_healing(health_status)
# Wait for next check
await asyncio.sleep(self.health_check_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Health monitoring error for {capability_id}: {e}")
await asyncio.sleep(self.health_check_interval)
async def _check_capability_health(self, capability_id: str) -> HealthStatus:
"""Check the health of a capability"""
# Get recent errors
recent_errors = []
if capability_id in self.error_history:
cutoff_time = datetime.now() - timedelta(minutes=5)
recent_errors = [
err for timestamp, err in self.error_history[capability_id]
if timestamp > cutoff_time
]
# Get recent healing actions
recent_healings = [
action_id for action_id, action in self.healing_actions.items()
if (action.target_capability_id == capability_id and
action.completed_at and
(datetime.now() - action.completed_at).total_seconds() < 300)
]
# Calculate health metrics
error_rate = len(recent_errors) / 5.0 # errors per minute
healing_rate = len(recent_healings) / 5.0 # healings per minute
# Determine overall health
overall_health = max(0.0, 1.0 - (error_rate * 0.2) - (healing_rate * 0.1))
# Determine status level
if overall_health >= 0.9:
status_level = "healthy"
elif overall_health >= 0.7:
status_level = "warning"
elif overall_health >= 0.5:
status_level = "critical"
else:
status_level = "failed"
# Create health status
health_status = HealthStatus(
target_type="capability",
target_id=capability_id,
overall_health=overall_health,
availability=max(0.0, 1.0 - error_rate * 0.3),
performance=max(0.0, 1.0 - healing_rate * 0.2),
error_rate=min(1.0, error_rate / 10.0), # Normalize
recent_errors=[err["error_id"] for err in recent_errors[-5:]],
recent_healing_actions=recent_healings[-5:],
status_level=status_level
)
return health_status
async def _trigger_preventive_healing(self, health_status: HealthStatus):
"""Trigger preventive healing based on health status"""
if health_status.status_level != "critical":
return
# Determine preventive strategy
if health_status.error_rate > 0.5:
strategy = RecoveryStrategy.CIRCUIT_BREAKER
elif health_status.performance < 0.5:
strategy = RecoveryStrategy.RESOURCE_SCALE
else:
strategy = RecoveryStrategy.RESTART
# Create preventive healing action
action = HealingAction(
strategy=strategy,
method=HealingMethod.PREVENTIVE,
target_capability_id=health_status.target_id,
execution_context={"health_status": health_status.status_id}
)
# Execute healing
healing_task = asyncio.create_task(self._execute_healing_action(action))
self.active_healing[action.action_id] = healing_task
async def create_resilience_policy(self, policy: ResiliencePolicy) -> str:
"""Create a resilience policy"""
self.resilience_policies[policy.policy_id] = policy
# Start health monitoring if required
if policy.target_capability_id and policy.target_capability_id not in self.health_monitors:
await self.start_health_monitoring(policy.target_capability_id)
logger.info(f"Created resilience policy: {policy.policy_id}")
return policy.policy_id
async def get_healing_analytics(self) -> Dict[str, Any]:
"""Get comprehensive healing analytics"""
total_patterns = len(self.error_patterns)
total_actions = len(self.healing_actions)
total_policies = len(self.resilience_policies)
# Healing statistics
completed_actions = [
action for action in self.healing_actions.values()
if action.status == "completed"
]
successful_healings = len([a for a in completed_actions if a.success])
healing_success_rate = successful_healings / len(completed_actions) if completed_actions else 0
# Strategy effectiveness
strategy_stats = {}
for action in completed_actions:
strategy = action.strategy.value
if strategy not in strategy_stats:
strategy_stats[strategy] = {"total": 0, "successful": 0}
strategy_stats[strategy]["total"] += 1
if action.success:
strategy_stats[strategy]["successful"] += 1
# Calculate success rates by strategy
for strategy in strategy_stats:
stats = strategy_stats[strategy]
stats["success_rate"] = stats["successful"] / stats["total"] if stats["total"] > 0 else 0
# Pattern statistics
active_patterns = len([p for p in self.error_patterns.values() if p.status == "active"])
auto_heal_patterns = len([p for p in self.error_patterns.values() if p.auto_heal_enabled])
# Health monitoring
monitored_capabilities = len(self.health_monitors)
health_checks = len(self.health_status)
analytics = {
"summary": {
"total_patterns": total_patterns,
"total_actions": total_actions,
"total_policies": total_policies,
"healing_success_rate": healing_success_rate,
"monitored_capabilities": monitored_capabilities
},
"strategies": strategy_stats,
"patterns": {
"active": active_patterns,
"auto_heal_enabled": auto_heal_patterns,
"total_occurrences": sum(p.occurrence_count for p in self.error_patterns.values())
},
"health": {
"health_checks_performed": health_checks,
"currently_monitoring": list(self.health_monitors.keys())
},
"recent_activity": {
"actions_last_hour": len([
a for a in self.healing_actions.values()
if a.started_at and (datetime.now() - a.started_at).total_seconds() < 3600
]),
"errors_last_hour": sum(
len([e for t, e in errors if (datetime.now() - t).total_seconds() < 3600])
for errors in self.error_history.values()
)
}
}
return analytics
async def get_capability_health(self, capability_id: str) -> Optional[HealthStatus]:
"""Get current health status for a capability"""
# Find the most recent health status
recent_health = None
latest_timestamp = None
for health_id, health in self.health_status.items():
if health.target_id == capability_id:
if latest_timestamp is None or health.timestamp > latest_timestamp:
latest_timestamp = health.timestamp
recent_health = health
return recent_health
async def enable_auto_healing(self, pattern_id: str) -> bool:
"""Enable auto-healing for a pattern"""
if pattern_id not in self.error_patterns:
return False
pattern = self.error_patterns[pattern_id]
pattern.auto_heal_enabled = True
logger.info(f"Enabled auto-healing for pattern {pattern_id}")
return True
async def disable_auto_healing(self, pattern_id: str) -> bool:
"""Disable auto-healing for a pattern"""
if pattern_id not in self.error_patterns:
return False
pattern = self.error_patterns[pattern_id]
pattern.auto_heal_enabled = False
logger.info(f"Disabled auto-healing for pattern {pattern_id}")
return True