Skip to main content
Glama
dashboard_server.py6.88 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ KafkaIQ Web Dashboard Server Serves the HTML dashboard and provides REST API for Kafka metrics """ import os import sys import json from http.server import HTTPServer, SimpleHTTPRequestHandler from kafka.admin import KafkaAdminClient from kafka import KafkaConsumer, TopicPartition # Fix Windows console encoding if os.name == 'nt': sys.stdout.reconfigure(encoding='utf-8') class DashboardHandler(SimpleHTTPRequestHandler): """Custom HTTP handler for the dashboard""" def do_GET(self): """Handle GET requests""" if self.path == '/': # Serve the dashboard HTML self.path = '/dashboard.html' return SimpleHTTPRequestHandler.do_GET(self) elif self.path == '/api/dashboard-data': # Serve dashboard data as JSON self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() data = self.get_dashboard_data() self.wfile.write(json.dumps(data).encode('utf-8')) else: return SimpleHTTPRequestHandler.do_GET(self) def get_dashboard_data(self): """Fetch Kafka metrics for the dashboard""" try: admin_client = KafkaAdminClient( bootstrap_servers="localhost:9092", client_id="dashboard-api", request_timeout_ms=10000 ) # Get cluster details cluster_meta = admin_client.describe_cluster() cluster_id = cluster_meta.get('cluster_id', 'Unknown') broker_count = len(cluster_meta.get('brokers', [])) # Get topics topics_metadata = admin_client.list_topics() topics_list = [topic for topic in topics_metadata if not topic.startswith('__')] # Get topic details topics_info = [] total_partitions = 0 try: topics_desc = admin_client.describe_topics(topics_list) for topic_info in topics_desc: topic_name = topic_info.get('topic', 'unknown') partitions = topic_info.get('partitions', []) partition_count = len(partitions) total_partitions += partition_count topics_info.append({ 'name': topic_name, 'partitions': partition_count }) except Exception as e: print(f"Error getting topic details: {e}") # Get consumer groups consumer_groups_data = [] try: groups = admin_client.list_consumer_groups() for group_id, protocol_type in groups: group_topics = [] try: offsets = admin_client.list_consumer_group_offsets(group_id) topics_in_group = set(tp.topic for tp in offsets.keys()) for topic in topics_in_group: # Calculate lag consumer = KafkaConsumer( bootstrap_servers="localhost:9092", group_id=group_id, enable_auto_commit=False, consumer_timeout_ms=2000 ) partitions = consumer.partitions_for_topic(topic) if partitions: tps = [TopicPartition(topic, p) for p in partitions] end_offsets = consumer.end_offsets(tps) total_lag = 0 for tp in tps: end = end_offsets.get(tp, 0) committed = consumer.committed(tp) lag = end if committed is None else max(end - committed, 0) total_lag += lag group_topics.append({ 'name': topic, 'lag': total_lag }) consumer.close() except Exception as e: print(f"Error getting lag for group {group_id}: {e}") if group_topics: consumer_groups_data.append({ 'name': group_id, 'topics': group_topics }) except Exception as e: print(f"Error getting consumer groups: {e}") admin_client.close() return { 'cluster': { 'cluster_id': cluster_id, 'broker_count': broker_count, 'total_partitions': total_partitions }, 'topics': topics_info, 'consumer_groups': consumer_groups_data } except Exception as e: print(f"Error fetching dashboard data: {e}") return { 'cluster': { 'cluster_id': 'Error', 'broker_count': 0, 'total_partitions': 0 }, 'topics': [], 'consumer_groups': [], 'error': str(e) } def log_message(self, format, *args): """Custom log message format""" print(f"[{self.log_date_time_string()}] {format % args}") def run_server(port=9000): """Start the dashboard web server""" server_address = ('', port) httpd = HTTPServer(server_address, DashboardHandler) print("=" * 70) print("KafkaIQ Web Dashboard Server") print("=" * 70) print(f"\n✓ Server running on http://localhost:{port}") print(f"✓ Dashboard URL: http://localhost:{port}/") print(f"✓ API Endpoint: http://localhost:{port}/api/dashboard-data") print("\nPress Ctrl+C to stop the server") print("=" * 70) print() try: httpd.serve_forever() except KeyboardInterrupt: print("\n\nShutting down server...") httpd.shutdown() if __name__ == "__main__": # Change to the script directory to serve files os.chdir(os.path.dirname(os.path.abspath(__file__))) run_server(port=9000)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server