import time
import json
import uuid
import sys
import mcp_server
from temporal_memory import TemporalMemoryStore, TemporalAnalyzer, TemporalEvent
def print_header(title):
print(f"\n{'='*60}")
print(f" {title}")
print(f"{'='*60}")
class LiveSystemEvaluator:
def __init__(self):
self.results = {}
self.test_topic = f"bench-topic-{uuid.uuid4().hex[:8]}"
self.inspector = None
self.store = None
self.analyzer = None
def setup(self):
print("Connecting to LIVE Kafka Broker (localhost:9092)...")
try:
self.inspector = mcp_server.KafkaInspector("localhost:9092")
# Initialize temporal memory
self.store = TemporalMemoryStore(retention_hours=24)
self.analyzer = TemporalAnalyzer(self.store)
# Inject into mcp_server module just in case
mcp_server.kafka_inspector = self.inspector
mcp_server.temporal_store = self.store
mcp_server.temporal_analyzer = self.analyzer
print("Connection successful and Temporal Memory initialized.")
except Exception as e:
print(f"[FATAL] Failed to connect: {e}")
sys.exit(1)
def eval_core_operations(self):
print_header("BENCHMARK: Core Kafka Operations")
# 1. Create Topic
print(f"1. Creating topic '{self.test_topic}'...")
start = time.time()
try:
res = self.inspector.create_topic(self.test_topic, num_partitions=3, replication_factor=1)
duration = time.time() - start
self.results["Core_CreateTopic_Latency_ms"] = duration * 1000
print(f" -> Latency: {duration*1000:.2f} ms")
print(f" -> Response: {res}")
except Exception as e:
print(f" -> FAILED: {e}")
# 2. List Topics
print("2. Listing topics...")
start = time.time()
topics = self.inspector.list_topics()
duration = time.time() - start
self.results["Core_ListTopics_Latency_ms"] = duration * 1000
print(f" -> Found {len(topics)} topics")
print(f" -> Latency: {duration*1000:.2f} ms")
# 3. Describe Topic
print(f"3. Describing '{self.test_topic}'...")
start = time.time()
self.inspector.describe_topic(self.test_topic)
duration = time.time() - start
self.results["Core_DescribeTopic_Latency_ms"] = duration * 1000
print(f" -> Latency: {duration*1000:.2f} ms")
def eval_temporal_intelligence(self):
print_header("BENCHMARK: Temporal Intelligence")
# 1. Generate Activity (Health Checks + Logging)
print("1. Generating 5 Health Checks (populating memory)...")
start = time.time()
for i in range(5):
# Simulate the MCP tool logic: Get summary -> Log to Store
summary = self.inspector.alert_summary()
# Log overall
self.store.add_event(TemporalEvent(
str(uuid.uuid4()), "HEALTH_CHECK", int(time.time()*1000),
summary.get("status", "OK"), "CLUSTER_HEALTH", "Check", summary.get("summary", {})
))
# Log signals
for s in summary.get("signals", []):
self.store.add_event(TemporalEvent(
str(uuid.uuid4()), "ALERT", int(time.time()*1000), s["level"], s["code"], s["message"], s["data"]
))
duration = time.time() - start
avg_latency = (duration / 5) * 1000
self.results["Temporal_HealthCheck_Avg_Latency_ms"] = avg_latency
print(f" -> Avg Latency per check (Fetch+Log): {avg_latency:.2f} ms")
# 2. Query Temporal Trends
print("2. Querying Temporal Health Summary...")
start = time.time()
# Direct query equivalent to get_temporal_health_summary tool
health_events = self.store.get_events(event_type="HEALTH_CHECK")
alerts = self.store.get_events(event_type="ALERT")
duration = time.time() - start
self.results["Temporal_Query_Latency_ms"] = duration * 1000
print(f" -> Latency: {duration*1000:.2f} ms")
print(f" -> Events Retrieved: {len(health_events) + len(alerts)}")
# 3. Memory Stats
stats = self.store.get_statistics()
self.results["Temporal_Events_Stored"] = stats.get("total_events", 0)
print(f" -> Total Events in RAM: {stats.get('total_events', 0)}")
def cleanup(self):
print_header("CLEANUP")
print(f"Deleting test topic '{self.test_topic}'...")
try:
self.inspector.delete_topic(self.test_topic)
except Exception as e:
print(f"Failed to delete topic: {e}")
print("Done.")
def run(self):
try:
self.setup()
self.eval_core_operations()
self.eval_temporal_intelligence()
# Save metrics
with open("live_evaluation_metrics.json", "w") as f:
json.dump(self.results, f, indent=2)
print_header("SUCCESS")
print("Saved metrics to 'live_evaluation_metrics.json'")
except Exception as e:
print(f"\n[ERROR] {e}")
import traceback
traceback.print_exc()
finally:
if self.inspector:
self.cleanup()
if __name__ == "__main__":
LiveSystemEvaluator().run()