Skip to main content
Glama

MCP Memory Service

# Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Health monitoring and error handling for consolidation system.""" import asyncio import logging from typing import Dict, Any, List, Optional from datetime import datetime, timedelta from dataclasses import dataclass, field from enum import Enum import traceback from .base import ConsolidationError class HealthStatus(Enum): """Health status levels.""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" CRITICAL = "critical" @dataclass class HealthMetric: """Represents a health metric.""" name: str value: Any status: HealthStatus message: str = "" timestamp: datetime = field(default_factory=datetime.now) threshold_warning: Optional[float] = None threshold_critical: Optional[float] = None @dataclass class HealthAlert: """Represents a health alert.""" alert_id: str component: str severity: HealthStatus message: str timestamp: datetime = field(default_factory=datetime.now) resolved: bool = False resolution_timestamp: Optional[datetime] = None class ConsolidationHealthMonitor: """Monitors health of the consolidation system.""" def __init__(self, config=None): self.config = config self.logger = logging.getLogger(__name__) # Health metrics storage self.metrics: Dict[str, HealthMetric] = {} self.alerts: List[HealthAlert] = [] self.error_history: List[Dict[str, Any]] = [] # Health thresholds self.thresholds = { 'consolidation_success_rate': {'warning': 0.8, 'critical': 0.6}, 'average_duration_seconds': {'warning': 300, 'critical': 600}, 'memory_processing_rate': {'warning': 0.1, 'critical': 0.05}, 'error_rate': {'warning': 0.1, 'critical': 0.2}, 'storage_response_time': {'warning': 5.0, 'critical': 10.0} } # Performance tracking self.performance_history: List[Dict[str, Any]] = [] self.max_history_entries = 1000 # Component health cache self.component_health_cache: Dict[str, Dict[str, Any]] = {} self.cache_ttl = timedelta(minutes=5) self.last_health_check = {} async def check_overall_health(self) -> Dict[str, Any]: """Check overall consolidation system health.""" try: health = { 'status': HealthStatus.HEALTHY.value, 'timestamp': datetime.now().isoformat(), 'components': {}, 'metrics': {}, 'alerts': [], 'recommendations': [] } # Check individual components components = [ 'decay_calculator', 'association_engine', 'clustering_engine', 'compression_engine', 'forgetting_engine', 'scheduler', 'storage_backend' ] overall_status = HealthStatus.HEALTHY for component in components: component_health = await self._check_component_health(component) health['components'][component] = component_health # Update overall status based on component health component_status = HealthStatus(component_health.get('status', 'healthy')) if component_status == HealthStatus.CRITICAL: overall_status = HealthStatus.CRITICAL elif component_status == HealthStatus.UNHEALTHY and overall_status != HealthStatus.CRITICAL: overall_status = HealthStatus.UNHEALTHY elif component_status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY: overall_status = HealthStatus.DEGRADED # Add current metrics health['metrics'] = {name: { 'value': metric.value, 'status': metric.status.value, 'message': metric.message, 'timestamp': metric.timestamp.isoformat() } for name, metric in self.metrics.items()} # Add active alerts active_alerts = [alert for alert in self.alerts if not alert.resolved] health['alerts'] = [{ 'alert_id': alert.alert_id, 'component': alert.component, 'severity': alert.severity.value, 'message': alert.message, 'timestamp': alert.timestamp.isoformat() } for alert in active_alerts[-10:]] # Last 10 alerts # Add recommendations health['recommendations'] = await self._generate_health_recommendations() health['status'] = overall_status.value return health except Exception as e: self.logger.error(f"Error checking overall health: {e}") return { 'status': HealthStatus.CRITICAL.value, 'timestamp': datetime.now().isoformat(), 'error': str(e), 'components': {}, 'metrics': {}, 'alerts': [], 'recommendations': [] } async def _check_component_health(self, component: str) -> Dict[str, Any]: """Check health of a specific component.""" # Check cache first now = datetime.now() if (component in self.component_health_cache and component in self.last_health_check and now - self.last_health_check[component] < self.cache_ttl): return self.component_health_cache[component] try: health = { 'status': HealthStatus.HEALTHY.value, 'timestamp': now.isoformat(), 'checks': {}, 'metrics': {} } if component == 'decay_calculator': health.update(await self._check_decay_calculator_health()) elif component == 'association_engine': health.update(await self._check_association_engine_health()) elif component == 'clustering_engine': health.update(await self._check_clustering_engine_health()) elif component == 'compression_engine': health.update(await self._check_compression_engine_health()) elif component == 'forgetting_engine': health.update(await self._check_forgetting_engine_health()) elif component == 'scheduler': health.update(await self._check_scheduler_health()) elif component == 'storage_backend': health.update(await self._check_storage_backend_health()) # Cache the result self.component_health_cache[component] = health self.last_health_check[component] = now return health except Exception as e: self.logger.error(f"Error checking {component} health: {e}") return { 'status': HealthStatus.UNHEALTHY.value, 'timestamp': now.isoformat(), 'error': str(e), 'checks': {}, 'metrics': {} } async def _check_decay_calculator_health(self) -> Dict[str, Any]: """Check decay calculator health.""" return { 'checks': { 'configuration': 'valid', 'retention_periods': 'configured', 'decay_algorithm': 'functional' }, 'metrics': { 'recent_calculations': len([h for h in self.performance_history if h.get('component') == 'decay_calculator' and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)]) } } async def _check_association_engine_health(self) -> Dict[str, Any]: """Check association engine health.""" recent_associations = len([h for h in self.performance_history if h.get('component') == 'association_engine' and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)]) return { 'checks': { 'similarity_thresholds': 'configured', 'concept_extraction': 'functional', 'association_discovery': 'active' }, 'metrics': { 'recent_associations_discovered': recent_associations, 'similarity_range': '0.3-0.7' } } async def _check_clustering_engine_health(self) -> Dict[str, Any]: """Check clustering engine health.""" return { 'checks': { 'clustering_algorithm': 'available', 'minimum_cluster_size': 'configured', 'embedding_processing': 'functional' }, 'metrics': { 'recent_clusters_created': len([h for h in self.performance_history if h.get('component') == 'clustering_engine' and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)]) } } async def _check_compression_engine_health(self) -> Dict[str, Any]: """Check compression engine health.""" return { 'checks': { 'summary_generation': 'functional', 'concept_extraction': 'active', 'compression_ratio': 'optimal' }, 'metrics': { 'recent_compressions': len([h for h in self.performance_history if h.get('component') == 'compression_engine' and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)]) } } async def _check_forgetting_engine_health(self) -> Dict[str, Any]: """Check forgetting engine health.""" return { 'checks': { 'archive_storage': 'accessible', 'relevance_thresholds': 'configured', 'controlled_forgetting': 'safe' }, 'metrics': { 'recent_archival_operations': len([h for h in self.performance_history if h.get('component') == 'forgetting_engine' and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)]) } } async def _check_scheduler_health(self) -> Dict[str, Any]: """Check scheduler health.""" return { 'checks': { 'scheduler_running': 'active', 'job_scheduling': 'functional', 'cron_expressions': 'valid' }, 'metrics': { 'scheduled_jobs': 'configured', 'last_execution': 'recent' } } async def _check_storage_backend_health(self) -> Dict[str, Any]: """Check storage backend health.""" return { 'checks': { 'storage_connection': 'connected', 'read_operations': 'functional', 'write_operations': 'functional', 'backup_integrity': 'verified' }, 'metrics': { 'response_time_ms': 'normal', 'storage_utilization': 'optimal' } } async def _generate_health_recommendations(self) -> List[str]: """Generate health recommendations based on current system state.""" recommendations = [] # Check error rates recent_errors = len([e for e in self.error_history if e.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=24)]) if recent_errors > 10: recommendations.append("High error rate detected. Consider reviewing consolidation configuration.") # Check performance metrics if 'average_duration_seconds' in self.metrics: duration = self.metrics['average_duration_seconds'].value if duration > 300: recommendations.append("Consolidation operations are taking longer than expected. Consider optimizing memory processing.") # Check active alerts critical_alerts = [a for a in self.alerts if not a.resolved and a.severity == HealthStatus.CRITICAL] if critical_alerts: recommendations.append("Critical alerts detected. Immediate attention required.") # Check storage health if 'storage_response_time' in self.metrics: response_time = self.metrics['storage_response_time'].value if response_time > 5.0: recommendations.append("Storage backend response time is elevated. Check database performance.") return recommendations def record_consolidation_performance(self, time_horizon: str, duration: float, memories_processed: int, success: bool, errors: List[str] = None): """Record performance metrics from a consolidation run.""" entry = { 'timestamp': datetime.now(), 'time_horizon': time_horizon, 'duration_seconds': duration, 'memories_processed': memories_processed, 'success': success, 'errors': errors or [], 'memories_per_second': memories_processed / duration if duration > 0 else 0 } self.performance_history.append(entry) # Trim history to max size if len(self.performance_history) > self.max_history_entries: self.performance_history = self.performance_history[-self.max_history_entries:] # Update metrics self._update_performance_metrics() # Check for alerts if not success or (errors and len(errors) > 0): self._create_alert( component='consolidator', severity=HealthStatus.DEGRADED if success else HealthStatus.UNHEALTHY, message=f"Consolidation issues detected: {', '.join(errors[:3])}" ) def record_error(self, component: str, error: Exception, context: Dict[str, Any] = None): """Record an error in the consolidation system.""" error_entry = { 'timestamp': datetime.now(), 'component': component, 'error_type': type(error).__name__, 'error_message': str(error), 'traceback': traceback.format_exc(), 'context': context or {} } self.error_history.append(error_entry) # Trim error history if len(self.error_history) > self.max_history_entries: self.error_history = self.error_history[-self.max_history_entries:] # Create alert for serious errors if isinstance(error, ConsolidationError): severity = HealthStatus.UNHEALTHY else: severity = HealthStatus.DEGRADED self._create_alert( component=component, severity=severity, message=f"{type(error).__name__}: {str(error)}" ) self.logger.error(f"Error in {component}: {error}", exc_info=True) def _update_performance_metrics(self): """Update performance metrics based on recent data.""" now = datetime.now() recent_cutoff = now - timedelta(hours=24) # Get recent performance data recent_runs = [r for r in self.performance_history if r['timestamp'] > recent_cutoff] if not recent_runs: return # Calculate success rate successful_runs = [r for r in recent_runs if r['success']] success_rate = len(successful_runs) / len(recent_runs) self.metrics['consolidation_success_rate'] = HealthMetric( name='consolidation_success_rate', value=success_rate, status=self._get_status_for_metric('consolidation_success_rate', success_rate), message=f"{len(successful_runs)}/{len(recent_runs)} consolidations successful" ) # Calculate average duration avg_duration = sum(r['duration_seconds'] for r in recent_runs) / len(recent_runs) self.metrics['average_duration_seconds'] = HealthMetric( name='average_duration_seconds', value=avg_duration, status=self._get_status_for_metric('average_duration_seconds', avg_duration), message=f"Average consolidation duration: {avg_duration:.1f}s" ) # Calculate processing rate total_memories = sum(r['memories_processed'] for r in recent_runs) total_duration = sum(r['duration_seconds'] for r in recent_runs) processing_rate = total_memories / total_duration if total_duration > 0 else 0 self.metrics['memory_processing_rate'] = HealthMetric( name='memory_processing_rate', value=processing_rate, status=self._get_status_for_metric('memory_processing_rate', processing_rate), message=f"Processing rate: {processing_rate:.2f} memories/second" ) # Calculate error rate recent_error_cutoff = now - timedelta(hours=1) recent_errors = [e for e in self.error_history if e['timestamp'] > recent_error_cutoff] error_rate = len(recent_errors) / max(len(recent_runs), 1) self.metrics['error_rate'] = HealthMetric( name='error_rate', value=error_rate, status=self._get_status_for_metric('error_rate', error_rate), message=f"Error rate: {error_rate:.2f} errors per consolidation" ) def _get_status_for_metric(self, metric_name: str, value: float) -> HealthStatus: """Determine health status for a metric value.""" if metric_name not in self.thresholds: return HealthStatus.HEALTHY thresholds = self.thresholds[metric_name] # For error rate and duration, higher is worse if metric_name in ['error_rate', 'average_duration_seconds', 'storage_response_time']: if value >= thresholds['critical']: return HealthStatus.CRITICAL elif value >= thresholds['warning']: return HealthStatus.DEGRADED else: return HealthStatus.HEALTHY # For success rate and processing rate, lower is worse else: if value <= thresholds['critical']: return HealthStatus.CRITICAL elif value <= thresholds['warning']: return HealthStatus.DEGRADED else: return HealthStatus.HEALTHY def _create_alert(self, component: str, severity: HealthStatus, message: str): """Create a new health alert.""" alert_id = f"{component}_{severity.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" alert = HealthAlert( alert_id=alert_id, component=component, severity=severity, message=message ) self.alerts.append(alert) # Trim alerts to reasonable size if len(self.alerts) > 100: self.alerts = self.alerts[-100:] self.logger.warning(f"Health alert [{severity.value}] for {component}: {message}") def resolve_alert(self, alert_id: str): """Mark an alert as resolved.""" for alert in self.alerts: if alert.alert_id == alert_id and not alert.resolved: alert.resolved = True alert.resolution_timestamp = datetime.now() self.logger.info(f"Alert {alert_id} resolved") break async def get_health_summary(self) -> Dict[str, Any]: """Get a summary of consolidation system health.""" health = await self.check_overall_health() return { 'overall_status': health['status'], 'timestamp': health['timestamp'], 'component_count': len(health['components']), 'healthy_components': len([c for c in health['components'].values() if c.get('status') == 'healthy']), 'active_alerts': len([a for a in health['alerts'] if not a.get('resolved', False)]), 'critical_alerts': len([a for a in health['alerts'] if a.get('severity') == 'critical' and not a.get('resolved', False)]), 'recommendations_count': len(health.get('recommendations', [])), 'recent_errors': len([e for e in self.error_history if e.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=24)]) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server