#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
KafkaIQ Web Dashboard Server
Serves the HTML dashboard and provides REST API for Kafka metrics
"""
import os
import sys
import json
from http.server import HTTPServer, SimpleHTTPRequestHandler
from kafka.admin import KafkaAdminClient
from kafka import KafkaConsumer, TopicPartition
# Fix Windows console encoding
if os.name == 'nt':
sys.stdout.reconfigure(encoding='utf-8')
class DashboardHandler(SimpleHTTPRequestHandler):
"""Custom HTTP handler for the dashboard"""
def do_GET(self):
"""Handle GET requests"""
if self.path == '/':
# Serve the dashboard HTML
self.path = '/dashboard.html'
return SimpleHTTPRequestHandler.do_GET(self)
elif self.path == '/api/dashboard-data':
# Serve dashboard data as JSON
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
data = self.get_dashboard_data()
self.wfile.write(json.dumps(data).encode('utf-8'))
else:
return SimpleHTTPRequestHandler.do_GET(self)
def get_dashboard_data(self):
"""Fetch Kafka metrics for the dashboard"""
try:
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id="dashboard-api",
request_timeout_ms=10000
)
# Get cluster details
cluster_meta = admin_client.describe_cluster()
cluster_id = cluster_meta.get('cluster_id', 'Unknown')
broker_count = len(cluster_meta.get('brokers', []))
# Get topics
topics_metadata = admin_client.list_topics()
topics_list = [topic for topic in topics_metadata if not topic.startswith('__')]
# Get topic details
topics_info = []
total_partitions = 0
try:
topics_desc = admin_client.describe_topics(topics_list)
for topic_info in topics_desc:
topic_name = topic_info.get('topic', 'unknown')
partitions = topic_info.get('partitions', [])
partition_count = len(partitions)
total_partitions += partition_count
topics_info.append({
'name': topic_name,
'partitions': partition_count
})
except Exception as e:
print(f"Error getting topic details: {e}")
# Get consumer groups
consumer_groups_data = []
try:
groups = admin_client.list_consumer_groups()
for group_id, protocol_type in groups:
group_topics = []
try:
offsets = admin_client.list_consumer_group_offsets(group_id)
topics_in_group = set(tp.topic for tp in offsets.keys())
for topic in topics_in_group:
# Calculate lag
consumer = KafkaConsumer(
bootstrap_servers="localhost:9092",
group_id=group_id,
enable_auto_commit=False,
consumer_timeout_ms=2000
)
partitions = consumer.partitions_for_topic(topic)
if partitions:
tps = [TopicPartition(topic, p) for p in partitions]
end_offsets = consumer.end_offsets(tps)
total_lag = 0
for tp in tps:
end = end_offsets.get(tp, 0)
committed = consumer.committed(tp)
lag = end if committed is None else max(end - committed, 0)
total_lag += lag
group_topics.append({
'name': topic,
'lag': total_lag
})
consumer.close()
except Exception as e:
print(f"Error getting lag for group {group_id}: {e}")
if group_topics:
consumer_groups_data.append({
'name': group_id,
'topics': group_topics
})
except Exception as e:
print(f"Error getting consumer groups: {e}")
admin_client.close()
return {
'cluster': {
'cluster_id': cluster_id,
'broker_count': broker_count,
'total_partitions': total_partitions
},
'topics': topics_info,
'consumer_groups': consumer_groups_data
}
except Exception as e:
print(f"Error fetching dashboard data: {e}")
return {
'cluster': {
'cluster_id': 'Error',
'broker_count': 0,
'total_partitions': 0
},
'topics': [],
'consumer_groups': [],
'error': str(e)
}
def log_message(self, format, *args):
"""Custom log message format"""
print(f"[{self.log_date_time_string()}] {format % args}")
def run_server(port=9000):
"""Start the dashboard web server"""
server_address = ('', port)
httpd = HTTPServer(server_address, DashboardHandler)
print("=" * 70)
print("KafkaIQ Web Dashboard Server")
print("=" * 70)
print(f"\n✓ Server running on http://localhost:{port}")
print(f"✓ Dashboard URL: http://localhost:{port}/")
print(f"✓ API Endpoint: http://localhost:{port}/api/dashboard-data")
print("\nPress Ctrl+C to stop the server")
print("=" * 70)
print()
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n\nShutting down server...")
httpd.shutdown()
if __name__ == "__main__":
# Change to the script directory to serve files
os.chdir(os.path.dirname(os.path.abspath(__file__)))
run_server(port=9000)