Skip to main content
Glama
evaluate_kafkaiq_suite.py8.02 kB
import time import json import uuid import random import sys from typing import Dict, List, Any from unittest.mock import MagicMock, patch # Import the code to be tested # We need to mock kafka-python modules BEFORE importing mcp_server # because it might try to initialize things or checking imports sys.modules["kafka"] = MagicMock() sys.modules["kafka.admin"] = MagicMock() sys.modules["kafka.errors"] = MagicMock() import mcp_server from mcp_server import KafkaInspector, TemporalMemoryStore, TemporalAnalyzer from temporal_memory import TemporalEvent # --- Helper Classes for Mocking --- class MockKafkaAdmin: def __init__(self): self.topics = {f"topic-{i}": {"partitions": 3, "replication": 2} for i in range(50)} def describe_topics(self, topics=None): # Simulate API response structure result = [] topic_list = topics if topics else list(self.topics.keys()) for t in topic_list: result.append({ "topic": t, "partitions": [ {"partition": p, "leader": 1, "replicas": [1, 2], "isr": [1, 2]} for p in range(self.topics.get(t, {}).get("partitions", 1)) ] }) return result def list_topics(self): return list(self.topics.keys()) def create_topics(self, new_topics): time.sleep(0.01) # Simulate network latency for t in new_topics: self.topics[t.name] = {"partitions": t.num_partitions, "replication": t.replication_factor} def delete_topics(self, topics): time.sleep(0.01) # Simulate network for t in topics: if t in self.topics: del self.topics[t] def describe_cluster(self): return { "cluster_id": "mock-cluster-uuid", "controller_id": 1, "brokers": [ {"node_id": 1, "host": "broker1", "port": 9092}, {"node_id": 2, "host": "broker2", "port": 9092}, {"node_id": 3, "host": "broker3", "port": 9092} ] } class MockKafkaConsumer: def metrics(self): return {} def list_topics(self): return {} # --- Evaluation Suite --- class KafkaIQEvaluator: def __init__(self): self.results = {} def print_header(self, title): print(f"\n{'='*60}") print(f" EVALUATING: {title}") print(f"{'='*60}") def setup_mocks(self): # Patch the KafkaInspector to use our mocks self.inspector = KafkaInspector("localhost:9092") self.inspector._admin = MockKafkaAdmin() self.inspector._consumer = MockKafkaConsumer() # Initialize temporal memory for full context mcp_server.temporal_store = TemporalMemoryStore() mcp_server.temporal_analyzer = TemporalAnalyzer(mcp_server.temporal_store) def eval_core_operations(self): self.print_header("Core Kafka Operations") # 1. Topic Listing Performance start = time.time() topics = self.inspector.list_topics() duration = time.time() - start self.results["Core_ListTopics_Latency_ms"] = duration * 1000 print(f"-> Listed {len(topics)} topics in {duration*1000:.2f} ms") # 2. Topic Creation Latency start = time.time() self.inspector.create_topic("bench-topic", num_partitions=3, replication_factor=2) duration = time.time() - start self.results["Core_CreateTopic_Latency_ms"] = duration * 1000 print(f"-> Created topic in {duration*1000:.2f} ms") # 3. Topic Description Overhead start = time.time() details = self.inspector.describe_topic("topic-1") duration = time.time() - start self.results["Core_DescribeTopic_Latency_ms"] = duration * 1000 print(f"-> Described topic in {duration*1000:.2f} ms") def eval_intelligent_inspection(self): self.print_header("Intelligent Inspection & Alerting") # 1. Cluster Details (Health Check) start = time.time() details = self.inspector.get_cluster_details() duration = time.time() - start self.results["Insp_ClusterHealth_Latency_ms"] = duration * 1000 print(f"-> Generated full cluster health report in {duration*1000:.2f} ms") # 2. Mock 'Unhealthy' State for Accuracy Test # Inject an offline partition def mock_describe_topics_unhealthy(topics=None): return [{ "topic": "critical-topic", "partitions": [{"partition": 0, "leader": -1, "replicas": [1], "isr": []}] }] with patch.object(self.inspector._admin, 'describe_topics', side_effect=mock_describe_topics_unhealthy): summary = self.inspector.alert_summary() is_critical = summary['status'] == 'CRITICAL' has_signal = any(s['code'] == 'OFFLINE_PARTITIONS' for s in summary['signals']) self.results["Insp_Alert_Accuracy_Offline"] = "PASS" if is_critical and has_signal else "FAIL" print(f"-> Offline Partition Detection: {'PASS' if is_critical and has_signal else 'FAIL'}") def eval_analytics_throughput(self): self.print_header("Advanced Analytics throughput") # Simulate analyzing lag for 100 groups * 50 topics # We'll mock the get_consumer_lag internal calls to be fast # to measure the 'analysis' overhead, not the network start = time.time() # Call valid temporal analysis methods mcp_server.temporal_store.add_event(TemporalEvent( str(uuid.uuid4()), "METRIC", int(time.time()*1000), "OK", "TEST", "msg", {"val": 100} )) for _ in range(1000): mcp_server.get_lag_trend_analysis("group1", "topic1", 1) duration = time.time() - start ops_sec = 1000 / duration self.results["Analytics_TrendQuery_Throughput_ops"] = ops_sec print(f"-> Temporal Trend Analysis Throughput: {ops_sec:.0f} ops/sec") def eval_temporal_overhead(self): self.print_header("Temporal Memory Overhead") import psutil import os process = psutil.Process(os.getpid()) base_mem = process.memory_info().rss / 1024 / 1024 # Inject 50,000 events for i in range(50000): mcp_server.temporal_store.add_event(TemporalEvent( str(uuid.uuid4()), "ALERT", int(time.time()*1000), "WARN", "TEST_CODE", "Test message", {} )) final_mem = process.memory_info().rss / 1024 / 1024 growth = final_mem - base_mem self.results["Temporal_Memory_Growth_MB_50k_Events"] = growth self.results["Temporal_Bytes_Per_Event"] = (growth * 1024 * 1024) / 50000 print(f"-> Memory Growth for 50k events: {growth:.2f} MB") print(f"-> Avg Size per Event: {(growth * 1024 * 1024) / 50000:.2f} bytes") def run(self): self.setup_mocks() self.eval_core_operations() self.eval_intelligent_inspection() self.eval_analytics_throughput() self.eval_temporal_overhead() # Save results with open("comprehensive_evaluation.json", "w") as f: json.dump(self.results, f, indent=2) print("\nSaved full results to 'comprehensive_evaluation.json'") if __name__ == "__main__": # Need to install psutil for memory check if not exists, but we'll try/except inside or just assume standard lib # Standard lib doesn't have easy memory check, so we'll mock that part if psutil missing try: import psutil except ImportError: print("Installing psutil for memory profiling...") import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"]) KafkaIQEvaluator().run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server