Skip to main content
Glama
evaluate_live_system.py5.56 kB
import time import json import uuid import sys import mcp_server from temporal_memory import TemporalMemoryStore, TemporalAnalyzer, TemporalEvent def print_header(title): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") class LiveSystemEvaluator: def __init__(self): self.results = {} self.test_topic = f"bench-topic-{uuid.uuid4().hex[:8]}" self.inspector = None self.store = None self.analyzer = None def setup(self): print("Connecting to LIVE Kafka Broker (localhost:9092)...") try: self.inspector = mcp_server.KafkaInspector("localhost:9092") # Initialize temporal memory self.store = TemporalMemoryStore(retention_hours=24) self.analyzer = TemporalAnalyzer(self.store) # Inject into mcp_server module just in case mcp_server.kafka_inspector = self.inspector mcp_server.temporal_store = self.store mcp_server.temporal_analyzer = self.analyzer print("Connection successful and Temporal Memory initialized.") except Exception as e: print(f"[FATAL] Failed to connect: {e}") sys.exit(1) def eval_core_operations(self): print_header("BENCHMARK: Core Kafka Operations") # 1. Create Topic print(f"1. Creating topic '{self.test_topic}'...") start = time.time() try: res = self.inspector.create_topic(self.test_topic, num_partitions=3, replication_factor=1) duration = time.time() - start self.results["Core_CreateTopic_Latency_ms"] = duration * 1000 print(f" -> Latency: {duration*1000:.2f} ms") print(f" -> Response: {res}") except Exception as e: print(f" -> FAILED: {e}") # 2. List Topics print("2. Listing topics...") start = time.time() topics = self.inspector.list_topics() duration = time.time() - start self.results["Core_ListTopics_Latency_ms"] = duration * 1000 print(f" -> Found {len(topics)} topics") print(f" -> Latency: {duration*1000:.2f} ms") # 3. Describe Topic print(f"3. Describing '{self.test_topic}'...") start = time.time() self.inspector.describe_topic(self.test_topic) duration = time.time() - start self.results["Core_DescribeTopic_Latency_ms"] = duration * 1000 print(f" -> Latency: {duration*1000:.2f} ms") def eval_temporal_intelligence(self): print_header("BENCHMARK: Temporal Intelligence") # 1. Generate Activity (Health Checks + Logging) print("1. Generating 5 Health Checks (populating memory)...") start = time.time() for i in range(5): # Simulate the MCP tool logic: Get summary -> Log to Store summary = self.inspector.alert_summary() # Log overall self.store.add_event(TemporalEvent( str(uuid.uuid4()), "HEALTH_CHECK", int(time.time()*1000), summary.get("status", "OK"), "CLUSTER_HEALTH", "Check", summary.get("summary", {}) )) # Log signals for s in summary.get("signals", []): self.store.add_event(TemporalEvent( str(uuid.uuid4()), "ALERT", int(time.time()*1000), s["level"], s["code"], s["message"], s["data"] )) duration = time.time() - start avg_latency = (duration / 5) * 1000 self.results["Temporal_HealthCheck_Avg_Latency_ms"] = avg_latency print(f" -> Avg Latency per check (Fetch+Log): {avg_latency:.2f} ms") # 2. Query Temporal Trends print("2. Querying Temporal Health Summary...") start = time.time() # Direct query equivalent to get_temporal_health_summary tool health_events = self.store.get_events(event_type="HEALTH_CHECK") alerts = self.store.get_events(event_type="ALERT") duration = time.time() - start self.results["Temporal_Query_Latency_ms"] = duration * 1000 print(f" -> Latency: {duration*1000:.2f} ms") print(f" -> Events Retrieved: {len(health_events) + len(alerts)}") # 3. Memory Stats stats = self.store.get_statistics() self.results["Temporal_Events_Stored"] = stats.get("total_events", 0) print(f" -> Total Events in RAM: {stats.get('total_events', 0)}") def cleanup(self): print_header("CLEANUP") print(f"Deleting test topic '{self.test_topic}'...") try: self.inspector.delete_topic(self.test_topic) except Exception as e: print(f"Failed to delete topic: {e}") print("Done.") def run(self): try: self.setup() self.eval_core_operations() self.eval_temporal_intelligence() # Save metrics with open("live_evaluation_metrics.json", "w") as f: json.dump(self.results, f, indent=2) print_header("SUCCESS") print("Saved metrics to 'live_evaluation_metrics.json'") except Exception as e: print(f"\n[ERROR] {e}") import traceback traceback.print_exc() finally: if self.inspector: self.cleanup() if __name__ == "__main__": LiveSystemEvaluator().run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server