Skip to main content
Glama
temporal_memory.py23.7 kB
""" Temporal Operational Memory for KafkaIQ MCP Server This module provides stateful temporal tracking capabilities to an otherwise stateless MCP architecture, enabling trend-aware health assessment and longitudinal system reasoning without machine learning. Key Features: - Event storage with timestamps - Time-range queries - Recurring issue detection - Trend analysis - Transient vs persistent failure classification Academic Framing: "We augment tool-based MCP interaction with temporal operational memory, enabling trend-aware health assessment and longitudinal system reasoning." """ import json import time import threading from dataclasses import dataclass, asdict, field from typing import List, Dict, Any, Optional, Tuple from collections import defaultdict from pathlib import Path import uuid import logging logger = logging.getLogger(__name__) @dataclass class TemporalEvent: """ Represents a single temporal event in the operational memory. Events capture alerts, metrics, actions, and health checks over time, enabling temporal reasoning about system behavior. """ event_id: str event_type: str # ALERT | METRIC_SNAPSHOT | ACTION | HEALTH_CHECK timestamp: int # Unix timestamp in milliseconds severity: str # OK | WARN | CRITICAL | INFO code: str # Event code (e.g., "CONSUMER_LAG", "OFFLINE_PARTITIONS") message: str # Human-readable description data: Dict[str, Any] # Event-specific data metadata: Dict[str, Any] = field(default_factory=dict) # Optional context def to_dict(self) -> Dict[str, Any]: """Convert event to dictionary for JSON serialization.""" return asdict(self) @staticmethod def from_dict(data: Dict[str, Any]) -> 'TemporalEvent': """Create event from dictionary.""" return TemporalEvent(**data) class TemporalMemoryStore: """ Lightweight operational state store for temporal event tracking. Features: - Thread-safe event storage - Time-based retention policies - Efficient time-range queries - Event type and severity filtering - Automatic cleanup of old events """ def __init__( self, retention_hours: int = 24, max_events: int = 100000, auto_cleanup_interval_seconds: int = 300 # 5 minutes ): """ Initialize temporal memory store. Args: retention_hours: How long to keep events (default: 24 hours) max_events: Maximum number of events to store (default: 100k) auto_cleanup_interval_seconds: How often to run cleanup (default: 5 min) """ self.retention_hours = retention_hours self.max_events = max_events self.auto_cleanup_interval = auto_cleanup_interval_seconds # Event storage (ordered by timestamp) self._events: List[TemporalEvent] = [] self._lock = threading.Lock() # Index for fast lookups self._event_by_code: Dict[str, List[TemporalEvent]] = defaultdict(list) self._event_by_type: Dict[str, List[TemporalEvent]] = defaultdict(list) # Statistics self._total_events_added = 0 self._total_events_cleaned = 0 logger.info(f"Temporal memory store initialized: retention={retention_hours}h, max_events={max_events}") def add_event(self, event: TemporalEvent) -> None: """ Add a new event to temporal memory. Args: event: TemporalEvent to add """ with self._lock: self._events.append(event) self._event_by_code[event.code].append(event) self._event_by_type[event.event_type].append(event) self._total_events_added += 1 # Enforce max events limit if len(self._events) > self.max_events: self._cleanup_oldest_events(count=1000) logger.debug(f"Added event: type={event.event_type}, code={event.code}, severity={event.severity}") def get_events( self, start_time: Optional[int] = None, end_time: Optional[int] = None, event_type: Optional[str] = None, severity: Optional[str] = None, code: Optional[str] = None, limit: Optional[int] = None ) -> List[TemporalEvent]: """ Query events with filters. Args: start_time: Start timestamp (ms) - inclusive end_time: End timestamp (ms) - inclusive event_type: Filter by event type severity: Filter by severity level code: Filter by event code limit: Maximum number of events to return Returns: List of matching events, ordered by timestamp (newest first) """ with self._lock: # Start with all events or filtered by code for efficiency if code: candidates = self._event_by_code.get(code, []) elif event_type: candidates = self._event_by_type.get(event_type, []) else: candidates = self._events # Apply filters results = [] for event in candidates: # Time range filter if start_time and event.timestamp < start_time: continue if end_time and event.timestamp > end_time: continue # Type filter if event_type and event.event_type != event_type: continue # Severity filter if severity and event.severity != severity: continue # Code filter if code and event.code != code: continue results.append(event) # Sort by timestamp descending (newest first) results.sort(key=lambda e: e.timestamp, reverse=True) # Apply limit if limit: results = results[:limit] return results def get_event_frequency( self, code: str, time_window_hours: int = 1 ) -> int: """ Count occurrences of a specific event code within a time window. Args: code: Event code to count time_window_hours: Time window in hours Returns: Number of occurrences """ now = int(time.time() * 1000) start_time = now - (time_window_hours * 3600 * 1000) events = self.get_events( start_time=start_time, end_time=now, code=code ) return len(events) def get_recurring_issues( self, threshold: int = 3, time_window_hours: int = 1, severity_filter: Optional[str] = None ) -> List[Dict[str, Any]]: """ Identify issues that occurred multiple times within a time window. Args: threshold: Minimum occurrences to be considered recurring time_window_hours: Time window to analyze severity_filter: Optional severity filter (WARN, CRITICAL) Returns: List of recurring issues with statistics """ now = int(time.time() * 1000) start_time = now - (time_window_hours * 3600 * 1000) # Get all events in window events = self.get_events( start_time=start_time, end_time=now, severity=severity_filter ) # Group by code code_groups = defaultdict(list) for event in events: # Only consider alerts and health checks for recurring issues if event.event_type in ["ALERT", "HEALTH_CHECK"]: code_groups[event.code].append(event) # Find recurring issues recurring = [] for code, code_events in code_groups.items(): if len(code_events) >= threshold: # Sort by timestamp code_events.sort(key=lambda e: e.timestamp) # Calculate severity distribution severity_dist = defaultdict(int) for event in code_events: severity_dist[event.severity] += 1 recurring.append({ "code": code, "count": len(code_events), "first_seen": code_events[0].timestamp, "last_seen": code_events[-1].timestamp, "severity_distribution": dict(severity_dist), "sample_message": code_events[-1].message }) # Sort by count descending recurring.sort(key=lambda x: x["count"], reverse=True) return recurring def get_trend_analysis( self, metric_code: str, time_window_hours: int = 6 ) -> Dict[str, Any]: """ Analyze trends for a specific metric over time. Args: metric_code: Code of metric to analyze time_window_hours: Time window for analysis Returns: Trend analysis with min, max, avg, current value, and direction """ now = int(time.time() * 1000) start_time = now - (time_window_hours * 3600 * 1000) events = self.get_events( start_time=start_time, end_time=now, code=metric_code, event_type="METRIC_SNAPSHOT" ) if not events: return { "code": metric_code, "sample_count": 0, "message": "No data available for trend analysis" } # Extract values (assuming data contains a 'value' field) values = [] timestamps = [] for event in events: if "value" in event.data: values.append(event.data["value"]) timestamps.append(event.timestamp) if not values: return { "code": metric_code, "sample_count": len(events), "message": "Events found but no numeric values to analyze" } # Calculate statistics min_val = min(values) max_val = max(values) avg_val = sum(values) / len(values) current_val = values[0] # Most recent (events are sorted newest first) # Determine trend direction (simple: compare first half vs second half) if len(values) >= 4: mid = len(values) // 2 first_half_avg = sum(values[mid:]) / len(values[mid:]) second_half_avg = sum(values[:mid]) / mid if second_half_avg > first_half_avg * 1.1: trend = "increasing" elif second_half_avg < first_half_avg * 0.9: trend = "decreasing" else: trend = "stable" else: trend = "insufficient_data" return { "code": metric_code, "sample_count": len(values), "min": min_val, "max": max_val, "avg": avg_val, "current": current_val, "trend": trend, "time_window_hours": time_window_hours, "first_timestamp": timestamps[-1], "last_timestamp": timestamps[0] } def cleanup_old_events(self) -> int: """ Remove events older than retention period. Returns: Number of events removed """ now = int(time.time() * 1000) cutoff_time = now - (self.retention_hours * 3600 * 1000) with self._lock: initial_count = len(self._events) # Remove old events self._events = [e for e in self._events if e.timestamp >= cutoff_time] # Rebuild indexes self._event_by_code.clear() self._event_by_type.clear() for event in self._events: self._event_by_code[event.code].append(event) self._event_by_type[event.event_type].append(event) removed = initial_count - len(self._events) self._total_events_cleaned += removed if removed > 0: logger.info(f"Cleaned {removed} old events, {len(self._events)} remaining") return removed def _cleanup_oldest_events(self, count: int) -> None: """ Remove the oldest N events (called when max_events exceeded). Args: count: Number of events to remove """ if count >= len(self._events): return # Remove oldest events self._events = self._events[-count:] # Rebuild indexes self._event_by_code.clear() self._event_by_type.clear() for event in self._events: self._event_by_code[event.code].append(event) self._event_by_type[event.event_type].append(event) self._total_events_cleaned += count logger.info(f"Removed {count} oldest events to enforce max_events limit") def export_state(self, filepath: str) -> bool: """ Export temporal memory to JSON file. Args: filepath: Path to save file Returns: True if successful, False otherwise """ try: with self._lock: data = { "version": "1.0", "exported_at": int(time.time() * 1000), "retention_hours": self.retention_hours, "total_events": len(self._events), "events": [event.to_dict() for event in self._events], "statistics": { "total_events_added": self._total_events_added, "total_events_cleaned": self._total_events_cleaned } } Path(filepath).write_text(json.dumps(data, indent=2)) logger.info(f"Exported {len(self._events)} events to {filepath}") return True except Exception as e: logger.error(f"Failed to export state: {e}") return False def import_state(self, filepath: str) -> bool: """ Import temporal memory from JSON file. Args: filepath: Path to load file Returns: True if successful, False otherwise """ try: data = json.loads(Path(filepath).read_text()) with self._lock: # Clear existing events self._events.clear() self._event_by_code.clear() self._event_by_type.clear() # Import events for event_dict in data.get("events", []): event = TemporalEvent.from_dict(event_dict) self._events.append(event) self._event_by_code[event.code].append(event) self._event_by_type[event.event_type].append(event) # Restore statistics stats = data.get("statistics", {}) self._total_events_added = stats.get("total_events_added", len(self._events)) self._total_events_cleaned = stats.get("total_events_cleaned", 0) logger.info(f"Imported {len(self._events)} events from {filepath}") return True except Exception as e: logger.error(f"Failed to import state: {e}") return False def get_statistics(self) -> Dict[str, Any]: """Get memory store statistics.""" with self._lock: return { "total_events": len(self._events), "events_by_type": {k: len(v) for k, v in self._event_by_type.items()}, "unique_codes": len(self._event_by_code), "total_events_added": self._total_events_added, "total_events_cleaned": self._total_events_cleaned, "retention_hours": self.retention_hours, "max_events": self.max_events } def clear(self, confirm: bool = False) -> bool: """ Clear all temporal memory (requires confirmation). Args: confirm: Must be True to actually clear Returns: True if cleared, False if not confirmed """ if not confirm: return False with self._lock: count = len(self._events) self._events.clear() self._event_by_code.clear() self._event_by_type.clear() self._total_events_cleaned += count logger.warning(f"Cleared all temporal memory ({count} events)") return True class TemporalAnalyzer: """ Advanced temporal analysis capabilities for detecting patterns and anomalies. """ def __init__(self, store: TemporalMemoryStore): """ Initialize analyzer with a temporal store. Args: store: TemporalMemoryStore instance """ self.store = store def detect_lag_spikes( self, group_id: str, topic_name: str, time_window_hours: int = 6, spike_threshold_multiplier: float = 2.0 ) -> Dict[str, Any]: """ Detect consumer lag spikes for a specific group/topic. Args: group_id: Consumer group ID topic_name: Topic name time_window_hours: Analysis window spike_threshold_multiplier: How many times above average = spike Returns: Analysis with detected spikes """ code = f"CONSUMER_LAG_{group_id}_{topic_name}" now = int(time.time() * 1000) start_time = now - (time_window_hours * 3600 * 1000) events = self.store.get_events( start_time=start_time, code=code, event_type="METRIC_SNAPSHOT" ) if len(events) < 3: return { "group_id": group_id, "topic_name": topic_name, "spikes_detected": 0, "message": "Insufficient data for spike detection" } # Extract lag values lags = [(e.timestamp, e.data.get("total_lag", 0)) for e in events] lags.sort(key=lambda x: x[0]) # Sort by timestamp avg_lag = sum(lag for _, lag in lags) / len(lags) spike_threshold = avg_lag * spike_threshold_multiplier # Detect spikes spikes = [(ts, lag) for ts, lag in lags if lag > spike_threshold] return { "group_id": group_id, "topic_name": topic_name, "spikes_detected": len(spikes), "average_lag": avg_lag, "spike_threshold": spike_threshold, "spike_timestamps": [ts for ts, _ in spikes], "spike_values": [lag for _, lag in spikes], "time_window_hours": time_window_hours } def detect_partition_flapping( self, time_window_hours: int = 12, flap_threshold: int = 3 ) -> List[Dict[str, Any]]: """ Identify partitions that go offline/online repeatedly. Args: time_window_hours: Analysis window flap_threshold: Min offline events to consider flapping Returns: List of flapping partitions """ now = int(time.time() * 1000) start_time = now - (time_window_hours * 3600 * 1000) events = self.store.get_events( start_time=start_time, code="OFFLINE_PARTITIONS" ) # Group by topic/partition partition_events = defaultdict(list) for event in events: data = event.data if "offline_partitions" in data: for topic, partitions in data.get("by_topic", {}).items(): for partition in partitions: key = f"{topic}:{partition}" partition_events[key].append(event.timestamp) # Find flapping partitions flapping = [] for key, timestamps in partition_events.items(): if len(timestamps) >= flap_threshold: topic, partition = key.split(":", 1) flapping.append({ "topic": topic, "partition": int(partition), "offline_count": len(timestamps), "first_offline": min(timestamps), "last_offline": max(timestamps) }) flapping.sort(key=lambda x: x["offline_count"], reverse=True) return flapping def is_transient_vs_persistent( self, code: str, persistence_threshold_minutes: int = 30 ) -> Dict[str, Any]: """ Classify if an issue is transient or persistent. Args: code: Event code to analyze persistence_threshold_minutes: Duration threshold Returns: Classification with confidence """ # Get all occurrences events = self.store.get_events(code=code, limit=1000) if len(events) < 2: return { "code": code, "classification": "unknown", "confidence": 0.0, "message": "Insufficient occurrences" } # Sort by timestamp events.sort(key=lambda e: e.timestamp) # Calculate duration from first to last occurrence duration_ms = events[-1].timestamp - events[0].timestamp duration_minutes = duration_ms / (1000 * 60) # Calculate average time between occurrences if len(events) > 1: intervals = [events[i+1].timestamp - events[i].timestamp for i in range(len(events) - 1)] avg_interval_minutes = (sum(intervals) / len(intervals)) / (1000 * 60) else: avg_interval_minutes = 0 # Classification logic if duration_minutes >= persistence_threshold_minutes: classification = "persistent" confidence = min(0.95, duration_minutes / (persistence_threshold_minutes * 2)) else: classification = "transient" confidence = min(0.95, persistence_threshold_minutes / max(duration_minutes, 1)) return { "code": code, "classification": classification, "confidence": confidence, "occurrences": len(events), "duration_minutes": duration_minutes, "avg_interval_minutes": avg_interval_minutes, "first_seen": events[0].timestamp, "last_seen": events[-1].timestamp }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server