import time
import json
import uuid
import random
import sys
from typing import Dict, List, Any
from unittest.mock import MagicMock, patch
# Import the code to be tested
# We need to mock kafka-python modules BEFORE importing mcp_server
# because it might try to initialize things or checking imports
sys.modules["kafka"] = MagicMock()
sys.modules["kafka.admin"] = MagicMock()
sys.modules["kafka.errors"] = MagicMock()
import mcp_server
from mcp_server import KafkaInspector, TemporalMemoryStore, TemporalAnalyzer
from temporal_memory import TemporalEvent
# --- Helper Classes for Mocking ---
class MockKafkaAdmin:
def __init__(self):
self.topics = {f"topic-{i}": {"partitions": 3, "replication": 2} for i in range(50)}
def describe_topics(self, topics=None):
# Simulate API response structure
result = []
topic_list = topics if topics else list(self.topics.keys())
for t in topic_list:
result.append({
"topic": t,
"partitions": [
{"partition": p, "leader": 1, "replicas": [1, 2], "isr": [1, 2]}
for p in range(self.topics.get(t, {}).get("partitions", 1))
]
})
return result
def list_topics(self):
return list(self.topics.keys())
def create_topics(self, new_topics):
time.sleep(0.01) # Simulate network latency
for t in new_topics:
self.topics[t.name] = {"partitions": t.num_partitions, "replication": t.replication_factor}
def delete_topics(self, topics):
time.sleep(0.01) # Simulate network
for t in topics:
if t in self.topics:
del self.topics[t]
def describe_cluster(self):
return {
"cluster_id": "mock-cluster-uuid",
"controller_id": 1,
"brokers": [
{"node_id": 1, "host": "broker1", "port": 9092},
{"node_id": 2, "host": "broker2", "port": 9092},
{"node_id": 3, "host": "broker3", "port": 9092}
]
}
class MockKafkaConsumer:
def metrics(self):
return {}
def list_topics(self):
return {}
# --- Evaluation Suite ---
class KafkaIQEvaluator:
def __init__(self):
self.results = {}
def print_header(self, title):
print(f"\n{'='*60}")
print(f" EVALUATING: {title}")
print(f"{'='*60}")
def setup_mocks(self):
# Patch the KafkaInspector to use our mocks
self.inspector = KafkaInspector("localhost:9092")
self.inspector._admin = MockKafkaAdmin()
self.inspector._consumer = MockKafkaConsumer()
# Initialize temporal memory for full context
mcp_server.temporal_store = TemporalMemoryStore()
mcp_server.temporal_analyzer = TemporalAnalyzer(mcp_server.temporal_store)
def eval_core_operations(self):
self.print_header("Core Kafka Operations")
# 1. Topic Listing Performance
start = time.time()
topics = self.inspector.list_topics()
duration = time.time() - start
self.results["Core_ListTopics_Latency_ms"] = duration * 1000
print(f"-> Listed {len(topics)} topics in {duration*1000:.2f} ms")
# 2. Topic Creation Latency
start = time.time()
self.inspector.create_topic("bench-topic", num_partitions=3, replication_factor=2)
duration = time.time() - start
self.results["Core_CreateTopic_Latency_ms"] = duration * 1000
print(f"-> Created topic in {duration*1000:.2f} ms")
# 3. Topic Description Overhead
start = time.time()
details = self.inspector.describe_topic("topic-1")
duration = time.time() - start
self.results["Core_DescribeTopic_Latency_ms"] = duration * 1000
print(f"-> Described topic in {duration*1000:.2f} ms")
def eval_intelligent_inspection(self):
self.print_header("Intelligent Inspection & Alerting")
# 1. Cluster Details (Health Check)
start = time.time()
details = self.inspector.get_cluster_details()
duration = time.time() - start
self.results["Insp_ClusterHealth_Latency_ms"] = duration * 1000
print(f"-> Generated full cluster health report in {duration*1000:.2f} ms")
# 2. Mock 'Unhealthy' State for Accuracy Test
# Inject an offline partition
def mock_describe_topics_unhealthy(topics=None):
return [{
"topic": "critical-topic",
"partitions": [{"partition": 0, "leader": -1, "replicas": [1], "isr": []}]
}]
with patch.object(self.inspector._admin, 'describe_topics', side_effect=mock_describe_topics_unhealthy):
summary = self.inspector.alert_summary()
is_critical = summary['status'] == 'CRITICAL'
has_signal = any(s['code'] == 'OFFLINE_PARTITIONS' for s in summary['signals'])
self.results["Insp_Alert_Accuracy_Offline"] = "PASS" if is_critical and has_signal else "FAIL"
print(f"-> Offline Partition Detection: {'PASS' if is_critical and has_signal else 'FAIL'}")
def eval_analytics_throughput(self):
self.print_header("Advanced Analytics throughput")
# Simulate analyzing lag for 100 groups * 50 topics
# We'll mock the get_consumer_lag internal calls to be fast
# to measure the 'analysis' overhead, not the network
start = time.time()
# Call valid temporal analysis methods
mcp_server.temporal_store.add_event(TemporalEvent(
str(uuid.uuid4()), "METRIC", int(time.time()*1000), "OK", "TEST", "msg", {"val": 100}
))
for _ in range(1000):
mcp_server.get_lag_trend_analysis("group1", "topic1", 1)
duration = time.time() - start
ops_sec = 1000 / duration
self.results["Analytics_TrendQuery_Throughput_ops"] = ops_sec
print(f"-> Temporal Trend Analysis Throughput: {ops_sec:.0f} ops/sec")
def eval_temporal_overhead(self):
self.print_header("Temporal Memory Overhead")
import psutil
import os
process = psutil.Process(os.getpid())
base_mem = process.memory_info().rss / 1024 / 1024
# Inject 50,000 events
for i in range(50000):
mcp_server.temporal_store.add_event(TemporalEvent(
str(uuid.uuid4()), "ALERT", int(time.time()*1000), "WARN", "TEST_CODE", "Test message", {}
))
final_mem = process.memory_info().rss / 1024 / 1024
growth = final_mem - base_mem
self.results["Temporal_Memory_Growth_MB_50k_Events"] = growth
self.results["Temporal_Bytes_Per_Event"] = (growth * 1024 * 1024) / 50000
print(f"-> Memory Growth for 50k events: {growth:.2f} MB")
print(f"-> Avg Size per Event: {(growth * 1024 * 1024) / 50000:.2f} bytes")
def run(self):
self.setup_mocks()
self.eval_core_operations()
self.eval_intelligent_inspection()
self.eval_analytics_throughput()
self.eval_temporal_overhead()
# Save results
with open("comprehensive_evaluation.json", "w") as f:
json.dump(self.results, f, indent=2)
print("\nSaved full results to 'comprehensive_evaluation.json'")
if __name__ == "__main__":
# Need to install psutil for memory check if not exists, but we'll try/except inside or just assume standard lib
# Standard lib doesn't have easy memory check, so we'll mock that part if psutil missing
try:
import psutil
except ImportError:
print("Installing psutil for memory profiling...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
KafkaIQEvaluator().run()