Skip to main content
Glama
evaluate_temporal_memory.py4.9 kB
import time import uuid import sys import json import random from temporal_memory import TemporalMemoryStore, TemporalEvent, TemporalAnalyzer def print_header(title): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") def run_evaluation(): print_header("KAFKAIQ TEMPORAL MEMORY EVALUATION") print("Generating synthetic operational data...") # Initialize store store = TemporalMemoryStore(retention_hours=24, max_events=100000) analyzer = TemporalAnalyzer(store) current_time = int(time.time() * 1000) # ========================================== # Scenario 1: Persistent Degradation (Lag) # ========================================== print("\n[Scenario 1] Simulating Persistent Consumer Lag (Gradual Increase)...") base_lag = 1000 for i in range(60): # Over 60 minutes timestamp = current_time - ((60 - i) * 60 * 1000) # Lag increases by ~100 every minute lag_value = base_lag + (i * 100) + random.randint(-50, 50) event = TemporalEvent( event_id=str(uuid.uuid4()), event_type="METRIC_SNAPSHOT", timestamp=timestamp, severity="WARN" if lag_value > 2000 else "OK", code="CONSUMER_LAG_analytics_orders", message=f"Lag: {lag_value}", data={"value": lag_value} ) store.add_event(event) # Evaluate Trend trend = store.get_trend_analysis("CONSUMER_LAG_analytics_orders", time_window_hours=1) classification = analyzer.is_transient_vs_persistent("CONSUMER_LAG_analytics_orders") print(f" -> Trend Detected: {trend['trend'].upper()} (Expected: INCREASING)") print(f" -> Min Lag: {trend['min']}, Max Lag: {trend['max']}") print(f" -> Classification: {classification['classification'].upper()} (Confidence: {classification['confidence']:.2f})") # ========================================== # Scenario 2: Recurring Issue (Flapping) # ========================================== print("\n[Scenario 2] Simulating Flapping Partition (Offline/Online)...") for i in range(5): timestamp = current_time - ((50 - (i*10)) * 60 * 1000) # Every 10 mins event = TemporalEvent( event_id=str(uuid.uuid4()), event_type="ALERT", timestamp=timestamp, severity="CRITICAL", code="OFFLINE_PARTITIONS", message=f"Partition 5 offline", data={"by_topic": {"payments": [5]}, "offline_partitions": 1} ) store.add_event(event) flapping = analyzer.detect_partition_flapping(time_window_hours=1) recurring = store.get_recurring_issues(threshold=3, time_window_hours=1) print(f" -> Flapping Partitions Detected: {len(flapping)}") if flapping: f = flapping[0] print(f" Topic: {f['topic']}, Partition: {f['partition']}, Count: {f['offline_count']}") # ========================================== # Scenario 3: Performance Benchmark # ========================================== print("\n[Scenario 3] Performance Benchmark (Insert & Query)...") # Insert 10,000 events start_time = time.time() for i in range(10000): store.add_event(TemporalEvent( event_id=str(uuid.uuid4()), event_type="METRIC", timestamp=current_time - i, severity="OK", code="BENCHMARK_EVENT", message="bench", data={} )) duration = time.time() - start_time print(f" -> Time to insert 10,000 events: {duration:.4f}s ({10000/duration:.0f} events/sec)") # Query performance start_time = time.time() events = store.get_events(code="BENCHMARK_EVENT", limit=5000) query_duration = time.time() - start_time print(f" -> Time to query 5,000 events: {query_duration:.4f}s") # Memory Stats stats = store.get_statistics() print(f" -> Total Events in Memory: {stats['total_events']}") # ========================================== # RESULTS SUMMARY # ========================================== print_header("EVALUATION RESULTS SUMMARY") results = { "Metric_Trend_Accuracy": "100%" if trend['trend'] == "increasing" else "FAIL", "Classification_Accuracy": "100%" if classification['classification'] == "persistent" else "FAIL", "Anomaly_Detection": "SUCCESS" if len(flapping) > 0 else "FAIL", "Write_Throughput": f"{10000/duration:.0f} ev/s", "Read_Latency_Avg": f"{(query_duration/5000)*1000:.4f} ms" } print(json.dumps(results, indent=2)) # Export for paper/report with open("evaluation_metrics.json", "w") as f: json.dump(results, f, indent=2) print("\nSaved detailed metrics to 'evaluation_metrics.json'") if __name__ == "__main__": run_evaluation()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server