Skip to main content
Glama

Katamari MCP Server

by ciphernaut
self_healing.pyโ€ข34.9 kB
""" Enhanced Error Recovery and Self-Healing System - Phase 3 Advanced self-healing capabilities with predictive error prevention, automatic recovery, and adaptive resilience strategies. """ import asyncio import hashlib import json import traceback from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple, Callable from dataclasses import dataclass, field from enum import Enum import uuid import logging import statistics from .data_models import ( FeedbackEvent, PerformanceSnapshot, CapabilityProfile, LearningRecord, AdaptationProposal ) logger = logging.getLogger(__name__) class ErrorSeverity(Enum): """Severity levels for errors""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class RecoveryStrategy(Enum): """Types of recovery strategies""" RETRY = "retry" RESTART = "restart" FAILOVER = "failover" DEGRADE = "degrade" CIRCUIT_BREAKER = "circuit_breaker" RESOURCE_SCALE = "resource_scale" CONFIG_ROLLBACK = "config_rollback" CACHE_CLEAR = "cache_clear" class HealingMethod(Enum): """Self-healing methods""" REACTIVE = "reactive" # Heal after error occurs PREDICTIVE = "predictive" # Heal before error occurs PREVENTIVE = "preventive" # Prevent errors from occurring ADAPTIVE = "adaptive" # Adapt to prevent future errors @dataclass class ErrorPattern: """Pattern of errors that can be recognized and healed""" pattern_id: str = field(default_factory=lambda: str(uuid.uuid4())) created_at: datetime = field(default_factory=datetime.now) # Pattern definition error_type: str = "" error_signature: str = "" # Unique signature of the error context_patterns: Dict[str, Any] = field(default_factory=dict) # Pattern characteristics frequency_threshold: int = 3 # Minimum occurrences to recognize pattern time_window: int = 300 # Time window in seconds severity_distribution: Dict[str, int] = field(default_factory=dict) # Healing strategy recommended_strategy: RecoveryStrategy = RecoveryStrategy.RETRY healing_confidence: float = 0.0 # 0-1 auto_heal_enabled: bool = False # Pattern statistics occurrence_count: int = 0 successful_heals: int = 0 failed_heals: int = 0 last_seen: Optional[datetime] = None # Learning data contributing_factors: List[str] = field(default_factory=list) prevention_methods: List[str] = field(default_factory=list) # Status status: str = "active" # active, deprecated, resolved # Metadata metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class HealingAction: """A specific healing action to be taken""" action_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.now) # Action details strategy: RecoveryStrategy = RecoveryStrategy.RETRY method: HealingMethod = HealingMethod.REACTIVE target_capability_id: str = "" target_component: str = "" # Action parameters parameters: Dict[str, Any] = field(default_factory=dict) timeout: float = 30.0 max_attempts: int = 3 # Execution context trigger_error_id: str = "" trigger_pattern_id: Optional[str] = None execution_context: Dict[str, Any] = field(default_factory=dict) # Results status: str = "pending" # pending, executing, completed, failed, cancelled started_at: Optional[datetime] = None completed_at: Optional[datetime] = None execution_time: float = 0.0 # Outcome success: bool = False error_message: Optional[str] = None side_effects: List[str] = field(default_factory=list) # Verification verification_required: bool = True verification_passed: Optional[bool] = None verification_metrics: Dict[str, Any] = field(default_factory=dict) # Metadata metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class HealthStatus: """Health status of a capability or component""" status_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.now) # Target target_type: str = "" # capability, component, system target_id: str = "" # Health indicators overall_health: float = 1.0 # 0-1, 1 = perfectly healthy availability: float = 1.0 # 0-1 performance: float = 1.0 # 0-1 error_rate: float = 0.0 # 0-1 resource_utilization: float = 0.5 # 0-1 # Health trends health_trend: str = "stable" # improving, stable, degrading performance_trend: str = "stable" error_trend: str = "stable" # Recent issues recent_errors: List[str] = field(default_factory=list) recent_healing_actions: List[str] = field(default_factory=list) active_alerts: List[str] = field(default_factory=list) # Predictions predicted_health: Optional[float] = None time_to_degradation: Optional[float] = None # minutes risk_factors: List[str] = field(default_factory=list) # Status classification status_level: str = "healthy" # healthy, warning, critical, failed # Metadata metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ResiliencePolicy: """Policy for building resilience""" policy_id: str = field(default_factory=lambda: str(uuid.uuid4())) created_at: datetime = field(default_factory=datetime.now) # Policy scope target_capability_id: str = "" target_components: List[str] = field(default_factory=list) # Resilience requirements availability_target: float = 0.99 # 99% availability recovery_time_objective: float = 5.0 # RTO in minutes recovery_point_objective: float = 1.0 # RPO in minutes # Healing strategies allowed_strategies: List[RecoveryStrategy] = field(default_factory=list) strategy_priorities: Dict[RecoveryStrategy, int] = field(default_factory=dict) auto_heal_threshold: ErrorSeverity = ErrorSeverity.HIGH # Prevention measures preventive_actions: List[str] = field(default_factory=list) monitoring_requirements: List[str] = field(default_factory=list) # Adaptation rules adaptation_enabled: bool = True learning_enabled: bool = True strategy_evolution: bool = True # Status status: str = "active" # active, disabled, deprecated # Metadata metadata: Dict[str, Any] = field(default_factory=dict) class SelfHealingEngine: """Advanced self-healing and error recovery system""" def __init__(self): # Storage self.error_patterns: Dict[str, ErrorPattern] = {} self.healing_actions: Dict[str, HealingAction] = {} self.health_status: Dict[str, HealthStatus] = {} self.resilience_policies: Dict[str, ResiliencePolicy] = {} # Error tracking self.error_history: Dict[str, List[Tuple[datetime, Dict[str, Any]]]] = {} self.active_errors: Dict[str, Dict[str, Any]] = {} # Healing state self.healing_lock = asyncio.Lock() self.active_healing: Dict[str, asyncio.Task] = {} self.circuit_breakers: Dict[str, Dict[str, Any]] = {} # Health monitoring self.health_monitors: Dict[str, asyncio.Task] = {} self.health_check_interval = 60 # seconds # Configuration self.max_concurrent_healings = 3 self.default_retry_attempts = 3 self.default_retry_delay = 1.0 self.circuit_breaker_threshold = 5 self.circuit_breaker_timeout = 300 # seconds # Healing strategies registry self.healing_strategies: Dict[RecoveryStrategy, Callable] = {} self._register_healing_strategies() logger.info("SelfHealingEngine initialized") def _register_healing_strategies(self): """Register available healing strategies""" self.healing_strategies[RecoveryStrategy.RETRY] = self._retry_strategy self.healing_strategies[RecoveryStrategy.RESTART] = self._restart_strategy self.healing_strategies[RecoveryStrategy.FAILOVER] = self._failover_strategy self.healing_strategies[RecoveryStrategy.DEGRADE] = self._degrade_strategy self.healing_strategies[RecoveryStrategy.CIRCUIT_BREAKER] = self._circuit_breaker_strategy self.healing_strategies[RecoveryStrategy.RESOURCE_SCALE] = self._resource_scale_strategy self.healing_strategies[RecoveryStrategy.CONFIG_ROLLBACK] = self._config_rollback_strategy self.healing_strategies[RecoveryStrategy.CACHE_CLEAR] = self._cache_clear_strategy async def report_error( self, capability_id: str, error_type: str, error_message: str, context: Optional[Dict[str, Any]] = None, severity: ErrorSeverity = ErrorSeverity.MEDIUM ) -> str: """Report an error for healing analysis""" error_id = str(uuid.uuid4()) timestamp = datetime.now() error_data = { "error_id": error_id, "capability_id": capability_id, "error_type": error_type, "error_message": error_message, "severity": severity.value, "context": context or {}, "timestamp": timestamp, "stack_trace": traceback.format_exc() if traceback.format_exc().strip() != "NoneType: None" else None } # Store error if capability_id not in self.error_history: self.error_history[capability_id] = [] self.error_history[capability_id].append((timestamp, error_data)) self.active_errors[error_id] = error_data # Keep history manageable if len(self.error_history[capability_id]) > 1000: self.error_history[capability_id] = self.error_history[capability_id][-500:] # Analyze for patterns asyncio.create_task(self._analyze_error_pattern(error_data)) # Trigger immediate healing if critical if severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]: asyncio.create_task(self._trigger_immediate_healing(error_data)) logger.info(f"Error reported: {error_type} for {capability_id} (severity: {severity.value})") return error_id async def _analyze_error_pattern(self, error_data: Dict[str, Any]): """Analyze error for pattern recognition""" async with self.healing_lock: capability_id = error_data["capability_id"] error_type = error_data["error_type"] # Generate error signature signature = self._generate_error_signature(error_data) # Look for existing patterns matching_pattern = None for pattern in self.error_patterns.values(): if (pattern.error_type == error_type and pattern.error_signature == signature and pattern.status == "active"): matching_pattern = pattern break if matching_pattern: # Update existing pattern matching_pattern.occurrence_count += 1 matching_pattern.last_seen = error_data["timestamp"] # Update severity distribution severity = error_data["severity"] matching_pattern.severity_distribution[severity] = ( matching_pattern.severity_distribution.get(severity, 0) + 1 ) # Check if pattern meets healing threshold if (matching_pattern.occurrence_count >= matching_pattern.frequency_threshold and matching_pattern.auto_heal_enabled): await self._trigger_pattern_healing(matching_pattern, error_data) else: # Check if we have enough occurrences to create a pattern recent_errors = [ err for _, err in self.error_history.get(capability_id, []) if (error_data["timestamp"] - err["timestamp"]).total_seconds() < 300 # 5 minutes ] similar_errors = [ err for err in recent_errors if (err["error_type"] == error_type and self._generate_error_signature(err) == signature) ] if len(similar_errors) >= 3: # Minimum pattern threshold # Create new pattern pattern = ErrorPattern( error_type=error_type, error_signature=signature, occurrence_count=len(similar_errors), last_seen=error_data["timestamp"], severity_distribution={error_data["severity"]: 1} ) # Determine recommended strategy pattern.recommended_strategy = await self._recommend_healing_strategy(similar_errors) pattern.healing_confidence = await self._calculate_healing_confidence(similar_errors) self.error_patterns[pattern.pattern_id] = pattern logger.info(f"Created new error pattern: {pattern.pattern_id}") def _generate_error_signature(self, error_data: Dict[str, Any]) -> str: """Generate a unique signature for an error""" # Use error type and key context elements signature_parts = [ error_data["error_type"], error_data.get("capability_id", ""), ] # Add context signature if available context = error_data.get("context", {}) if context: context_signature = json.dumps(context, sort_keys=True) signature_parts.append(context_signature) # Create hash signature_string = "|".join(signature_parts) return hashlib.sha256(signature_string.encode()).hexdigest()[:16] async def _recommend_healing_strategy(self, errors: List[Dict[str, Any]]) -> RecoveryStrategy: """Recommend healing strategy based on error patterns""" if not errors: return RecoveryStrategy.RETRY # Analyze error characteristics error_types = list(set(err["error_type"] for err in errors)) severities = [err["severity"] for err in errors] # Strategy selection logic if "timeout" in error_types or "connection" in error_types: return RecoveryStrategy.RETRY elif "memory" in error_types or "resource" in error_types: return RecoveryStrategy.RESOURCE_SCALE elif "crash" in error_types or "fatal" in error_types: return RecoveryStrategy.RESTART elif any(s in ["critical", "high"] for s in severities): return RecoveryStrategy.FAILOVER elif len(errors) > 5: # High frequency errors return RecoveryStrategy.CIRCUIT_BREAKER else: return RecoveryStrategy.RETRY async def _calculate_healing_confidence(self, errors: List[Dict[str, Any]]) -> float: """Calculate confidence in healing based on error patterns""" if not errors: return 0.0 # Base confidence on consistency error_types = [err["error_type"] for err in errors] type_consistency = max(error_types.count(t) for t in set(error_types)) / len(error_types) # Adjust for severity consistency severities = [err["severity"] for err in errors] severity_consistency = max(severities.count(s) for s in set(severities)) / len(severities) # Adjust for sample size sample_factor = min(1.0, len(errors) / 10) # Combined confidence confidence = (type_consistency * 0.4 + severity_consistency * 0.3 + sample_factor * 0.3) return min(1.0, confidence) async def _trigger_immediate_healing(self, error_data: Dict[str, Any]): """Trigger immediate healing for critical errors""" if len(self.active_healing) >= self.max_concurrent_healings: logger.warning("Maximum concurrent healings reached, queuing critical healing") await asyncio.sleep(1.0) return await self._trigger_immediate_healing(error_data) # Determine strategy based on severity and type severity = ErrorSeverity(error_data["severity"]) strategy = await self._recommend_healing_strategy([error_data]) # Create healing action action = HealingAction( strategy=strategy, method=HealingMethod.REACTIVE, target_capability_id=error_data["capability_id"], trigger_error_id=error_data["error_id"], execution_context=error_data["context"] ) # Execute healing healing_task = asyncio.create_task(self._execute_healing_action(action)) self.active_healing[action.action_id] = healing_task async def _trigger_pattern_healing(self, pattern: ErrorPattern, error_data: Dict[str, Any]): """Trigger healing based on recognized pattern""" if not pattern.auto_heal_enabled: return if len(self.active_healing) >= self.max_concurrent_healings: logger.warning("Maximum concurrent healings reached, queuing pattern healing") await asyncio.sleep(1.0) return await self._trigger_pattern_healing(pattern, error_data) # Create healing action based on pattern action = HealingAction( strategy=pattern.recommended_strategy, method=HealingMethod.ADAPTIVE, target_capability_id=error_data["capability_id"], trigger_error_id=error_data["error_id"], trigger_pattern_id=pattern.pattern_id, execution_context=error_data["context"] ) # Execute healing healing_task = asyncio.create_task(self._execute_healing_action(action)) self.active_healing[action.action_id] = healing_task async def _execute_healing_action(self, action: HealingAction) -> bool: """Execute a healing action""" action.status = "executing" action.started_at = datetime.now() try: # Get the healing strategy function if action.strategy not in self.healing_strategies: raise ValueError(f"Unknown healing strategy: {action.strategy}") strategy_func = self.healing_strategies[action.strategy] # Execute the strategy success = await strategy_func(action) # Update action status action.success = success action.status = "completed" if success else "failed" action.completed_at = datetime.now() action.execution_time = (action.completed_at - action.started_at).total_seconds() # Update pattern statistics if applicable if action.trigger_pattern_id and action.trigger_pattern_id in self.error_patterns: pattern = self.error_patterns[action.trigger_pattern_id] if success: pattern.successful_heals += 1 else: pattern.failed_heals += 1 # Verify healing if required if action.verification_required and success: action.verification_passed = await self._verify_healing(action) logger.info(f"Healing action {action.action_id} completed: {action.status}") return success except Exception as e: action.status = "failed" action.error_message = str(e) action.completed_at = datetime.now() logger.error(f"Healing action {action.action_id} failed: {e}") return False finally: # Clean up if action.action_id in self.active_healing: del self.active_healing[action.action_id] # Store action self.healing_actions[action.action_id] = action async def _retry_strategy(self, action: HealingAction) -> bool: """Execute retry healing strategy""" max_attempts = action.parameters.get("max_attempts", self.default_retry_attempts) delay = action.parameters.get("delay", self.default_retry_delay) for attempt in range(max_attempts): try: # Simulate retry attempt await asyncio.sleep(min(delay * (2 ** attempt), 5.0)) # Exponential backoff # In a real implementation, this would retry the failed operation # For now, simulate success after a few attempts if attempt >= 1: return True except Exception as e: logger.warning(f"Retry attempt {attempt + 1} failed: {e}") continue return False async def _restart_strategy(self, action: HealingAction) -> bool: """Execute restart healing strategy""" # Simulate restart await asyncio.sleep(2.0) # In a real implementation, this would restart the capability/component logger.info(f"Restarted capability {action.target_capability_id}") return True async def _failover_strategy(self, action: HealingAction) -> bool: """Execute failover healing strategy""" # Simulate failover await asyncio.sleep(1.0) # In a real implementation, this would switch to backup/standby logger.info(f"Failed over capability {action.target_capability_id}") return True async def _degrade_strategy(self, action: HealingAction) -> bool: """Execute degrade healing strategy""" # Simulate degradation await asyncio.sleep(0.5) # In a real implementation, this would switch to degraded mode logger.info(f"Degraded capability {action.target_capability_id}") return True async def _circuit_breaker_strategy(self, action: HealingAction) -> bool: """Execute circuit breaker healing strategy""" capability_id = action.target_capability_id # Check if circuit breaker already exists if capability_id not in self.circuit_breakers: self.circuit_breakers[capability_id] = { "state": "closed", # closed, open, half_open "failure_count": 0, "last_failure": None, "success_count": 0 } breaker = self.circuit_breakers[capability_id] # Update failure count breaker["failure_count"] += 1 breaker["last_failure"] = datetime.now() # Open circuit if threshold exceeded if breaker["failure_count"] >= self.circuit_breaker_threshold: breaker["state"] = "open" logger.info(f"Circuit breaker opened for {capability_id}") return True async def _resource_scale_strategy(self, action: HealingAction) -> bool: """Execute resource scaling healing strategy""" # Simulate resource scaling await asyncio.sleep(3.0) # In a real implementation, this would scale up resources logger.info(f"Scaled resources for {action.target_capability_id}") return True async def _config_rollback_strategy(self, action: HealingAction) -> bool: """Execute configuration rollback healing strategy""" # Simulate config rollback await asyncio.sleep(1.5) # In a real implementation, this would rollback configuration logger.info(f"Rolled back configuration for {action.target_capability_id}") return True async def _cache_clear_strategy(self, action: HealingAction) -> bool: """Execute cache clearing healing strategy""" # Simulate cache clearing await asyncio.sleep(0.2) # In a real implementation, this would clear caches logger.info(f"Cleared cache for {action.target_capability_id}") return True async def _verify_healing(self, action: HealingAction) -> bool: """Verify that healing was successful""" # Simulate verification await asyncio.sleep(1.0) # In a real implementation, this would check if the capability is working # For now, assume healing is successful if the action reported success return action.success async def start_health_monitoring(self, capability_id: str): """Start health monitoring for a capability""" if capability_id in self.health_monitors: return # Already monitoring monitor_task = asyncio.create_task(self._health_monitor_loop(capability_id)) self.health_monitors[capability_id] = monitor_task logger.info(f"Started health monitoring for {capability_id}") async def stop_health_monitoring(self, capability_id: str): """Stop health monitoring for a capability""" if capability_id in self.health_monitors: self.health_monitors[capability_id].cancel() del self.health_monitors[capability_id] logger.info(f"Stopped health monitoring for {capability_id}") async def _health_monitor_loop(self, capability_id: str): """Health monitoring loop""" while True: try: # Check health health_status = await self._check_capability_health(capability_id) self.health_status[f"{capability_id}_{health_status.timestamp.isoformat()}"] = health_status # Trigger preventive healing if needed if health_status.status_level in ["warning", "critical"]: await self._trigger_preventive_healing(health_status) # Wait for next check await asyncio.sleep(self.health_check_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Health monitoring error for {capability_id}: {e}") await asyncio.sleep(self.health_check_interval) async def _check_capability_health(self, capability_id: str) -> HealthStatus: """Check the health of a capability""" # Get recent errors recent_errors = [] if capability_id in self.error_history: cutoff_time = datetime.now() - timedelta(minutes=5) recent_errors = [ err for timestamp, err in self.error_history[capability_id] if timestamp > cutoff_time ] # Get recent healing actions recent_healings = [ action_id for action_id, action in self.healing_actions.items() if (action.target_capability_id == capability_id and action.completed_at and (datetime.now() - action.completed_at).total_seconds() < 300) ] # Calculate health metrics error_rate = len(recent_errors) / 5.0 # errors per minute healing_rate = len(recent_healings) / 5.0 # healings per minute # Determine overall health overall_health = max(0.0, 1.0 - (error_rate * 0.2) - (healing_rate * 0.1)) # Determine status level if overall_health >= 0.9: status_level = "healthy" elif overall_health >= 0.7: status_level = "warning" elif overall_health >= 0.5: status_level = "critical" else: status_level = "failed" # Create health status health_status = HealthStatus( target_type="capability", target_id=capability_id, overall_health=overall_health, availability=max(0.0, 1.0 - error_rate * 0.3), performance=max(0.0, 1.0 - healing_rate * 0.2), error_rate=min(1.0, error_rate / 10.0), # Normalize recent_errors=[err["error_id"] for err in recent_errors[-5:]], recent_healing_actions=recent_healings[-5:], status_level=status_level ) return health_status async def _trigger_preventive_healing(self, health_status: HealthStatus): """Trigger preventive healing based on health status""" if health_status.status_level != "critical": return # Determine preventive strategy if health_status.error_rate > 0.5: strategy = RecoveryStrategy.CIRCUIT_BREAKER elif health_status.performance < 0.5: strategy = RecoveryStrategy.RESOURCE_SCALE else: strategy = RecoveryStrategy.RESTART # Create preventive healing action action = HealingAction( strategy=strategy, method=HealingMethod.PREVENTIVE, target_capability_id=health_status.target_id, execution_context={"health_status": health_status.status_id} ) # Execute healing healing_task = asyncio.create_task(self._execute_healing_action(action)) self.active_healing[action.action_id] = healing_task async def create_resilience_policy(self, policy: ResiliencePolicy) -> str: """Create a resilience policy""" self.resilience_policies[policy.policy_id] = policy # Start health monitoring if required if policy.target_capability_id and policy.target_capability_id not in self.health_monitors: await self.start_health_monitoring(policy.target_capability_id) logger.info(f"Created resilience policy: {policy.policy_id}") return policy.policy_id async def get_healing_analytics(self) -> Dict[str, Any]: """Get comprehensive healing analytics""" total_patterns = len(self.error_patterns) total_actions = len(self.healing_actions) total_policies = len(self.resilience_policies) # Healing statistics completed_actions = [ action for action in self.healing_actions.values() if action.status == "completed" ] successful_healings = len([a for a in completed_actions if a.success]) healing_success_rate = successful_healings / len(completed_actions) if completed_actions else 0 # Strategy effectiveness strategy_stats = {} for action in completed_actions: strategy = action.strategy.value if strategy not in strategy_stats: strategy_stats[strategy] = {"total": 0, "successful": 0} strategy_stats[strategy]["total"] += 1 if action.success: strategy_stats[strategy]["successful"] += 1 # Calculate success rates by strategy for strategy in strategy_stats: stats = strategy_stats[strategy] stats["success_rate"] = stats["successful"] / stats["total"] if stats["total"] > 0 else 0 # Pattern statistics active_patterns = len([p for p in self.error_patterns.values() if p.status == "active"]) auto_heal_patterns = len([p for p in self.error_patterns.values() if p.auto_heal_enabled]) # Health monitoring monitored_capabilities = len(self.health_monitors) health_checks = len(self.health_status) analytics = { "summary": { "total_patterns": total_patterns, "total_actions": total_actions, "total_policies": total_policies, "healing_success_rate": healing_success_rate, "monitored_capabilities": monitored_capabilities }, "strategies": strategy_stats, "patterns": { "active": active_patterns, "auto_heal_enabled": auto_heal_patterns, "total_occurrences": sum(p.occurrence_count for p in self.error_patterns.values()) }, "health": { "health_checks_performed": health_checks, "currently_monitoring": list(self.health_monitors.keys()) }, "recent_activity": { "actions_last_hour": len([ a for a in self.healing_actions.values() if a.started_at and (datetime.now() - a.started_at).total_seconds() < 3600 ]), "errors_last_hour": sum( len([e for t, e in errors if (datetime.now() - t).total_seconds() < 3600]) for errors in self.error_history.values() ) } } return analytics async def get_capability_health(self, capability_id: str) -> Optional[HealthStatus]: """Get current health status for a capability""" # Find the most recent health status recent_health = None latest_timestamp = None for health_id, health in self.health_status.items(): if health.target_id == capability_id: if latest_timestamp is None or health.timestamp > latest_timestamp: latest_timestamp = health.timestamp recent_health = health return recent_health async def enable_auto_healing(self, pattern_id: str) -> bool: """Enable auto-healing for a pattern""" if pattern_id not in self.error_patterns: return False pattern = self.error_patterns[pattern_id] pattern.auto_heal_enabled = True logger.info(f"Enabled auto-healing for pattern {pattern_id}") return True async def disable_auto_healing(self, pattern_id: str) -> bool: """Disable auto-healing for a pattern""" if pattern_id not in self.error_patterns: return False pattern = self.error_patterns[pattern_id] pattern.auto_heal_enabled = False logger.info(f"Disabled auto-healing for pattern {pattern_id}") return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server