"""
Temporal Operational Memory for KafkaIQ MCP Server
This module provides stateful temporal tracking capabilities to an otherwise
stateless MCP architecture, enabling trend-aware health assessment and
longitudinal system reasoning without machine learning.
Key Features:
- Event storage with timestamps
- Time-range queries
- Recurring issue detection
- Trend analysis
- Transient vs persistent failure classification
Academic Framing:
"We augment tool-based MCP interaction with temporal operational memory,
enabling trend-aware health assessment and longitudinal system reasoning."
"""
import json
import time
import threading
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional, Tuple
from collections import defaultdict
from pathlib import Path
import uuid
import logging
logger = logging.getLogger(__name__)
@dataclass
class TemporalEvent:
"""
Represents a single temporal event in the operational memory.
Events capture alerts, metrics, actions, and health checks over time,
enabling temporal reasoning about system behavior.
"""
event_id: str
event_type: str # ALERT | METRIC_SNAPSHOT | ACTION | HEALTH_CHECK
timestamp: int # Unix timestamp in milliseconds
severity: str # OK | WARN | CRITICAL | INFO
code: str # Event code (e.g., "CONSUMER_LAG", "OFFLINE_PARTITIONS")
message: str # Human-readable description
data: Dict[str, Any] # Event-specific data
metadata: Dict[str, Any] = field(default_factory=dict) # Optional context
def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary for JSON serialization."""
return asdict(self)
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'TemporalEvent':
"""Create event from dictionary."""
return TemporalEvent(**data)
class TemporalMemoryStore:
"""
Lightweight operational state store for temporal event tracking.
Features:
- Thread-safe event storage
- Time-based retention policies
- Efficient time-range queries
- Event type and severity filtering
- Automatic cleanup of old events
"""
def __init__(
self,
retention_hours: int = 24,
max_events: int = 100000,
auto_cleanup_interval_seconds: int = 300 # 5 minutes
):
"""
Initialize temporal memory store.
Args:
retention_hours: How long to keep events (default: 24 hours)
max_events: Maximum number of events to store (default: 100k)
auto_cleanup_interval_seconds: How often to run cleanup (default: 5 min)
"""
self.retention_hours = retention_hours
self.max_events = max_events
self.auto_cleanup_interval = auto_cleanup_interval_seconds
# Event storage (ordered by timestamp)
self._events: List[TemporalEvent] = []
self._lock = threading.Lock()
# Index for fast lookups
self._event_by_code: Dict[str, List[TemporalEvent]] = defaultdict(list)
self._event_by_type: Dict[str, List[TemporalEvent]] = defaultdict(list)
# Statistics
self._total_events_added = 0
self._total_events_cleaned = 0
logger.info(f"Temporal memory store initialized: retention={retention_hours}h, max_events={max_events}")
def add_event(self, event: TemporalEvent) -> None:
"""
Add a new event to temporal memory.
Args:
event: TemporalEvent to add
"""
with self._lock:
self._events.append(event)
self._event_by_code[event.code].append(event)
self._event_by_type[event.event_type].append(event)
self._total_events_added += 1
# Enforce max events limit
if len(self._events) > self.max_events:
self._cleanup_oldest_events(count=1000)
logger.debug(f"Added event: type={event.event_type}, code={event.code}, severity={event.severity}")
def get_events(
self,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
event_type: Optional[str] = None,
severity: Optional[str] = None,
code: Optional[str] = None,
limit: Optional[int] = None
) -> List[TemporalEvent]:
"""
Query events with filters.
Args:
start_time: Start timestamp (ms) - inclusive
end_time: End timestamp (ms) - inclusive
event_type: Filter by event type
severity: Filter by severity level
code: Filter by event code
limit: Maximum number of events to return
Returns:
List of matching events, ordered by timestamp (newest first)
"""
with self._lock:
# Start with all events or filtered by code for efficiency
if code:
candidates = self._event_by_code.get(code, [])
elif event_type:
candidates = self._event_by_type.get(event_type, [])
else:
candidates = self._events
# Apply filters
results = []
for event in candidates:
# Time range filter
if start_time and event.timestamp < start_time:
continue
if end_time and event.timestamp > end_time:
continue
# Type filter
if event_type and event.event_type != event_type:
continue
# Severity filter
if severity and event.severity != severity:
continue
# Code filter
if code and event.code != code:
continue
results.append(event)
# Sort by timestamp descending (newest first)
results.sort(key=lambda e: e.timestamp, reverse=True)
# Apply limit
if limit:
results = results[:limit]
return results
def get_event_frequency(
self,
code: str,
time_window_hours: int = 1
) -> int:
"""
Count occurrences of a specific event code within a time window.
Args:
code: Event code to count
time_window_hours: Time window in hours
Returns:
Number of occurrences
"""
now = int(time.time() * 1000)
start_time = now - (time_window_hours * 3600 * 1000)
events = self.get_events(
start_time=start_time,
end_time=now,
code=code
)
return len(events)
def get_recurring_issues(
self,
threshold: int = 3,
time_window_hours: int = 1,
severity_filter: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Identify issues that occurred multiple times within a time window.
Args:
threshold: Minimum occurrences to be considered recurring
time_window_hours: Time window to analyze
severity_filter: Optional severity filter (WARN, CRITICAL)
Returns:
List of recurring issues with statistics
"""
now = int(time.time() * 1000)
start_time = now - (time_window_hours * 3600 * 1000)
# Get all events in window
events = self.get_events(
start_time=start_time,
end_time=now,
severity=severity_filter
)
# Group by code
code_groups = defaultdict(list)
for event in events:
# Only consider alerts and health checks for recurring issues
if event.event_type in ["ALERT", "HEALTH_CHECK"]:
code_groups[event.code].append(event)
# Find recurring issues
recurring = []
for code, code_events in code_groups.items():
if len(code_events) >= threshold:
# Sort by timestamp
code_events.sort(key=lambda e: e.timestamp)
# Calculate severity distribution
severity_dist = defaultdict(int)
for event in code_events:
severity_dist[event.severity] += 1
recurring.append({
"code": code,
"count": len(code_events),
"first_seen": code_events[0].timestamp,
"last_seen": code_events[-1].timestamp,
"severity_distribution": dict(severity_dist),
"sample_message": code_events[-1].message
})
# Sort by count descending
recurring.sort(key=lambda x: x["count"], reverse=True)
return recurring
def get_trend_analysis(
self,
metric_code: str,
time_window_hours: int = 6
) -> Dict[str, Any]:
"""
Analyze trends for a specific metric over time.
Args:
metric_code: Code of metric to analyze
time_window_hours: Time window for analysis
Returns:
Trend analysis with min, max, avg, current value, and direction
"""
now = int(time.time() * 1000)
start_time = now - (time_window_hours * 3600 * 1000)
events = self.get_events(
start_time=start_time,
end_time=now,
code=metric_code,
event_type="METRIC_SNAPSHOT"
)
if not events:
return {
"code": metric_code,
"sample_count": 0,
"message": "No data available for trend analysis"
}
# Extract values (assuming data contains a 'value' field)
values = []
timestamps = []
for event in events:
if "value" in event.data:
values.append(event.data["value"])
timestamps.append(event.timestamp)
if not values:
return {
"code": metric_code,
"sample_count": len(events),
"message": "Events found but no numeric values to analyze"
}
# Calculate statistics
min_val = min(values)
max_val = max(values)
avg_val = sum(values) / len(values)
current_val = values[0] # Most recent (events are sorted newest first)
# Determine trend direction (simple: compare first half vs second half)
if len(values) >= 4:
mid = len(values) // 2
first_half_avg = sum(values[mid:]) / len(values[mid:])
second_half_avg = sum(values[:mid]) / mid
if second_half_avg > first_half_avg * 1.1:
trend = "increasing"
elif second_half_avg < first_half_avg * 0.9:
trend = "decreasing"
else:
trend = "stable"
else:
trend = "insufficient_data"
return {
"code": metric_code,
"sample_count": len(values),
"min": min_val,
"max": max_val,
"avg": avg_val,
"current": current_val,
"trend": trend,
"time_window_hours": time_window_hours,
"first_timestamp": timestamps[-1],
"last_timestamp": timestamps[0]
}
def cleanup_old_events(self) -> int:
"""
Remove events older than retention period.
Returns:
Number of events removed
"""
now = int(time.time() * 1000)
cutoff_time = now - (self.retention_hours * 3600 * 1000)
with self._lock:
initial_count = len(self._events)
# Remove old events
self._events = [e for e in self._events if e.timestamp >= cutoff_time]
# Rebuild indexes
self._event_by_code.clear()
self._event_by_type.clear()
for event in self._events:
self._event_by_code[event.code].append(event)
self._event_by_type[event.event_type].append(event)
removed = initial_count - len(self._events)
self._total_events_cleaned += removed
if removed > 0:
logger.info(f"Cleaned {removed} old events, {len(self._events)} remaining")
return removed
def _cleanup_oldest_events(self, count: int) -> None:
"""
Remove the oldest N events (called when max_events exceeded).
Args:
count: Number of events to remove
"""
if count >= len(self._events):
return
# Remove oldest events
self._events = self._events[-count:]
# Rebuild indexes
self._event_by_code.clear()
self._event_by_type.clear()
for event in self._events:
self._event_by_code[event.code].append(event)
self._event_by_type[event.event_type].append(event)
self._total_events_cleaned += count
logger.info(f"Removed {count} oldest events to enforce max_events limit")
def export_state(self, filepath: str) -> bool:
"""
Export temporal memory to JSON file.
Args:
filepath: Path to save file
Returns:
True if successful, False otherwise
"""
try:
with self._lock:
data = {
"version": "1.0",
"exported_at": int(time.time() * 1000),
"retention_hours": self.retention_hours,
"total_events": len(self._events),
"events": [event.to_dict() for event in self._events],
"statistics": {
"total_events_added": self._total_events_added,
"total_events_cleaned": self._total_events_cleaned
}
}
Path(filepath).write_text(json.dumps(data, indent=2))
logger.info(f"Exported {len(self._events)} events to {filepath}")
return True
except Exception as e:
logger.error(f"Failed to export state: {e}")
return False
def import_state(self, filepath: str) -> bool:
"""
Import temporal memory from JSON file.
Args:
filepath: Path to load file
Returns:
True if successful, False otherwise
"""
try:
data = json.loads(Path(filepath).read_text())
with self._lock:
# Clear existing events
self._events.clear()
self._event_by_code.clear()
self._event_by_type.clear()
# Import events
for event_dict in data.get("events", []):
event = TemporalEvent.from_dict(event_dict)
self._events.append(event)
self._event_by_code[event.code].append(event)
self._event_by_type[event.event_type].append(event)
# Restore statistics
stats = data.get("statistics", {})
self._total_events_added = stats.get("total_events_added", len(self._events))
self._total_events_cleaned = stats.get("total_events_cleaned", 0)
logger.info(f"Imported {len(self._events)} events from {filepath}")
return True
except Exception as e:
logger.error(f"Failed to import state: {e}")
return False
def get_statistics(self) -> Dict[str, Any]:
"""Get memory store statistics."""
with self._lock:
return {
"total_events": len(self._events),
"events_by_type": {k: len(v) for k, v in self._event_by_type.items()},
"unique_codes": len(self._event_by_code),
"total_events_added": self._total_events_added,
"total_events_cleaned": self._total_events_cleaned,
"retention_hours": self.retention_hours,
"max_events": self.max_events
}
def clear(self, confirm: bool = False) -> bool:
"""
Clear all temporal memory (requires confirmation).
Args:
confirm: Must be True to actually clear
Returns:
True if cleared, False if not confirmed
"""
if not confirm:
return False
with self._lock:
count = len(self._events)
self._events.clear()
self._event_by_code.clear()
self._event_by_type.clear()
self._total_events_cleaned += count
logger.warning(f"Cleared all temporal memory ({count} events)")
return True
class TemporalAnalyzer:
"""
Advanced temporal analysis capabilities for detecting patterns and anomalies.
"""
def __init__(self, store: TemporalMemoryStore):
"""
Initialize analyzer with a temporal store.
Args:
store: TemporalMemoryStore instance
"""
self.store = store
def detect_lag_spikes(
self,
group_id: str,
topic_name: str,
time_window_hours: int = 6,
spike_threshold_multiplier: float = 2.0
) -> Dict[str, Any]:
"""
Detect consumer lag spikes for a specific group/topic.
Args:
group_id: Consumer group ID
topic_name: Topic name
time_window_hours: Analysis window
spike_threshold_multiplier: How many times above average = spike
Returns:
Analysis with detected spikes
"""
code = f"CONSUMER_LAG_{group_id}_{topic_name}"
now = int(time.time() * 1000)
start_time = now - (time_window_hours * 3600 * 1000)
events = self.store.get_events(
start_time=start_time,
code=code,
event_type="METRIC_SNAPSHOT"
)
if len(events) < 3:
return {
"group_id": group_id,
"topic_name": topic_name,
"spikes_detected": 0,
"message": "Insufficient data for spike detection"
}
# Extract lag values
lags = [(e.timestamp, e.data.get("total_lag", 0)) for e in events]
lags.sort(key=lambda x: x[0]) # Sort by timestamp
avg_lag = sum(lag for _, lag in lags) / len(lags)
spike_threshold = avg_lag * spike_threshold_multiplier
# Detect spikes
spikes = [(ts, lag) for ts, lag in lags if lag > spike_threshold]
return {
"group_id": group_id,
"topic_name": topic_name,
"spikes_detected": len(spikes),
"average_lag": avg_lag,
"spike_threshold": spike_threshold,
"spike_timestamps": [ts for ts, _ in spikes],
"spike_values": [lag for _, lag in spikes],
"time_window_hours": time_window_hours
}
def detect_partition_flapping(
self,
time_window_hours: int = 12,
flap_threshold: int = 3
) -> List[Dict[str, Any]]:
"""
Identify partitions that go offline/online repeatedly.
Args:
time_window_hours: Analysis window
flap_threshold: Min offline events to consider flapping
Returns:
List of flapping partitions
"""
now = int(time.time() * 1000)
start_time = now - (time_window_hours * 3600 * 1000)
events = self.store.get_events(
start_time=start_time,
code="OFFLINE_PARTITIONS"
)
# Group by topic/partition
partition_events = defaultdict(list)
for event in events:
data = event.data
if "offline_partitions" in data:
for topic, partitions in data.get("by_topic", {}).items():
for partition in partitions:
key = f"{topic}:{partition}"
partition_events[key].append(event.timestamp)
# Find flapping partitions
flapping = []
for key, timestamps in partition_events.items():
if len(timestamps) >= flap_threshold:
topic, partition = key.split(":", 1)
flapping.append({
"topic": topic,
"partition": int(partition),
"offline_count": len(timestamps),
"first_offline": min(timestamps),
"last_offline": max(timestamps)
})
flapping.sort(key=lambda x: x["offline_count"], reverse=True)
return flapping
def is_transient_vs_persistent(
self,
code: str,
persistence_threshold_minutes: int = 30
) -> Dict[str, Any]:
"""
Classify if an issue is transient or persistent.
Args:
code: Event code to analyze
persistence_threshold_minutes: Duration threshold
Returns:
Classification with confidence
"""
# Get all occurrences
events = self.store.get_events(code=code, limit=1000)
if len(events) < 2:
return {
"code": code,
"classification": "unknown",
"confidence": 0.0,
"message": "Insufficient occurrences"
}
# Sort by timestamp
events.sort(key=lambda e: e.timestamp)
# Calculate duration from first to last occurrence
duration_ms = events[-1].timestamp - events[0].timestamp
duration_minutes = duration_ms / (1000 * 60)
# Calculate average time between occurrences
if len(events) > 1:
intervals = [events[i+1].timestamp - events[i].timestamp
for i in range(len(events) - 1)]
avg_interval_minutes = (sum(intervals) / len(intervals)) / (1000 * 60)
else:
avg_interval_minutes = 0
# Classification logic
if duration_minutes >= persistence_threshold_minutes:
classification = "persistent"
confidence = min(0.95, duration_minutes / (persistence_threshold_minutes * 2))
else:
classification = "transient"
confidence = min(0.95, persistence_threshold_minutes / max(duration_minutes, 1))
return {
"code": code,
"classification": classification,
"confidence": confidence,
"occurrences": len(events),
"duration_minutes": duration_minutes,
"avg_interval_minutes": avg_interval_minutes,
"first_seen": events[0].timestamp,
"last_seen": events[-1].timestamp
}