import time
import uuid
import sys
import json
import random
from temporal_memory import TemporalMemoryStore, TemporalEvent, TemporalAnalyzer
def print_header(title):
print(f"\n{'='*60}")
print(f" {title}")
print(f"{'='*60}")
def run_evaluation():
print_header("KAFKAIQ TEMPORAL MEMORY EVALUATION")
print("Generating synthetic operational data...")
# Initialize store
store = TemporalMemoryStore(retention_hours=24, max_events=100000)
analyzer = TemporalAnalyzer(store)
current_time = int(time.time() * 1000)
# ==========================================
# Scenario 1: Persistent Degradation (Lag)
# ==========================================
print("\n[Scenario 1] Simulating Persistent Consumer Lag (Gradual Increase)...")
base_lag = 1000
for i in range(60): # Over 60 minutes
timestamp = current_time - ((60 - i) * 60 * 1000)
# Lag increases by ~100 every minute
lag_value = base_lag + (i * 100) + random.randint(-50, 50)
event = TemporalEvent(
event_id=str(uuid.uuid4()),
event_type="METRIC_SNAPSHOT",
timestamp=timestamp,
severity="WARN" if lag_value > 2000 else "OK",
code="CONSUMER_LAG_analytics_orders",
message=f"Lag: {lag_value}",
data={"value": lag_value}
)
store.add_event(event)
# Evaluate Trend
trend = store.get_trend_analysis("CONSUMER_LAG_analytics_orders", time_window_hours=1)
classification = analyzer.is_transient_vs_persistent("CONSUMER_LAG_analytics_orders")
print(f" -> Trend Detected: {trend['trend'].upper()} (Expected: INCREASING)")
print(f" -> Min Lag: {trend['min']}, Max Lag: {trend['max']}")
print(f" -> Classification: {classification['classification'].upper()} (Confidence: {classification['confidence']:.2f})")
# ==========================================
# Scenario 2: Recurring Issue (Flapping)
# ==========================================
print("\n[Scenario 2] Simulating Flapping Partition (Offline/Online)...")
for i in range(5):
timestamp = current_time - ((50 - (i*10)) * 60 * 1000) # Every 10 mins
event = TemporalEvent(
event_id=str(uuid.uuid4()),
event_type="ALERT",
timestamp=timestamp,
severity="CRITICAL",
code="OFFLINE_PARTITIONS",
message=f"Partition 5 offline",
data={"by_topic": {"payments": [5]}, "offline_partitions": 1}
)
store.add_event(event)
flapping = analyzer.detect_partition_flapping(time_window_hours=1)
recurring = store.get_recurring_issues(threshold=3, time_window_hours=1)
print(f" -> Flapping Partitions Detected: {len(flapping)}")
if flapping:
f = flapping[0]
print(f" Topic: {f['topic']}, Partition: {f['partition']}, Count: {f['offline_count']}")
# ==========================================
# Scenario 3: Performance Benchmark
# ==========================================
print("\n[Scenario 3] Performance Benchmark (Insert & Query)...")
# Insert 10,000 events
start_time = time.time()
for i in range(10000):
store.add_event(TemporalEvent(
event_id=str(uuid.uuid4()),
event_type="METRIC",
timestamp=current_time - i,
severity="OK",
code="BENCHMARK_EVENT",
message="bench",
data={}
))
duration = time.time() - start_time
print(f" -> Time to insert 10,000 events: {duration:.4f}s ({10000/duration:.0f} events/sec)")
# Query performance
start_time = time.time()
events = store.get_events(code="BENCHMARK_EVENT", limit=5000)
query_duration = time.time() - start_time
print(f" -> Time to query 5,000 events: {query_duration:.4f}s")
# Memory Stats
stats = store.get_statistics()
print(f" -> Total Events in Memory: {stats['total_events']}")
# ==========================================
# RESULTS SUMMARY
# ==========================================
print_header("EVALUATION RESULTS SUMMARY")
results = {
"Metric_Trend_Accuracy": "100%" if trend['trend'] == "increasing" else "FAIL",
"Classification_Accuracy": "100%" if classification['classification'] == "persistent" else "FAIL",
"Anomaly_Detection": "SUCCESS" if len(flapping) > 0 else "FAIL",
"Write_Throughput": f"{10000/duration:.0f} ev/s",
"Read_Latency_Avg": f"{(query_duration/5000)*1000:.4f} ms"
}
print(json.dumps(results, indent=2))
# Export for paper/report
with open("evaluation_metrics.json", "w") as f:
json.dump(results, f, indent=2)
print("\nSaved detailed metrics to 'evaluation_metrics.json'")
if __name__ == "__main__":
run_evaluation()