#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Topic creation script for KafkaIQ Dashboard
Creates all necessary topics for the demo
"""
import sys
import os
# Fix Windows console encoding
if os.name == 'nt':
sys.stdout.reconfigure(encoding='utf-8')
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError, KafkaError
def create_topics():
"""Create all required topics for the KafkaIQ dashboard"""
# Initialize Kafka Admin Client
print("Connecting to Kafka cluster at localhost:9092...")
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id="topic-creator"
)
# Define topics to create
topics_config = [
{"name": "orders", "partitions": 3, "replication": 1},
{"name": "users", "partitions": 3, "replication": 1},
{"name": "events", "partitions": 3, "replication": 1},
{"name": "payments", "partitions": 3, "replication": 1},
{"name": "notifications", "partitions": 5, "replication": 1},
]
print("\nCreating topics...")
print("=" * 60)
topic_list = []
for topic_config in topics_config:
topic_list.append(NewTopic(
name=topic_config["name"],
num_partitions=topic_config["partitions"],
replication_factor=topic_config["replication"]
))
try:
admin_client.create_topics(new_topics=topic_list, validate_only=False)
for topic_config in topics_config:
print(f"✓ Created: {topic_config['name']} (P={topic_config['partitions']}, RF={topic_config['replication']})")
except TopicAlreadyExistsError:
print("⚠ Some topics already exist")
for topic_config in topics_config:
print(f" {topic_config['name']} (P={topic_config['partitions']}, RF={topic_config['replication']})")
except KafkaError as e:
print(f"✗ Error creating topics: {e}")
print("=" * 60)
# List all topics to verify
print("\nVerifying topics...")
try:
topics_metadata = admin_client.list_topics()
topics_list = [topic for topic in topics_metadata if not topic.startswith('__')]
print(f"Total topics created: {len(topics_list)}")
print(f"Topics: {', '.join(sorted(topics_list))}")
except Exception as e:
print(f"Error listing topics: {e}")
# Get cluster details
print("\nCluster Details:")
try:
cluster_meta = admin_client.describe_cluster()
print(f" Cluster ID: {cluster_meta.get('cluster_id')}")
print(f" Brokers: {len(cluster_meta.get('brokers', []))}")
except Exception as e:
print(f" Error getting cluster details: {e}")
admin_client.close()
print("\n✓ Topic creation complete!")
if __name__ == "__main__":
try:
create_topics()
except Exception as e:
print(f"\n✗ Error: {e}")
sys.exit(1)