Skip to main content
Glama
create_topics.py2.97 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Topic creation script for KafkaIQ Dashboard Creates all necessary topics for the demo """ import sys import os # Fix Windows console encoding if os.name == 'nt': sys.stdout.reconfigure(encoding='utf-8') from kafka.admin import KafkaAdminClient, NewTopic from kafka.errors import TopicAlreadyExistsError, KafkaError def create_topics(): """Create all required topics for the KafkaIQ dashboard""" # Initialize Kafka Admin Client print("Connecting to Kafka cluster at localhost:9092...") admin_client = KafkaAdminClient( bootstrap_servers="localhost:9092", client_id="topic-creator" ) # Define topics to create topics_config = [ {"name": "orders", "partitions": 3, "replication": 1}, {"name": "users", "partitions": 3, "replication": 1}, {"name": "events", "partitions": 3, "replication": 1}, {"name": "payments", "partitions": 3, "replication": 1}, {"name": "notifications", "partitions": 5, "replication": 1}, ] print("\nCreating topics...") print("=" * 60) topic_list = [] for topic_config in topics_config: topic_list.append(NewTopic( name=topic_config["name"], num_partitions=topic_config["partitions"], replication_factor=topic_config["replication"] )) try: admin_client.create_topics(new_topics=topic_list, validate_only=False) for topic_config in topics_config: print(f"✓ Created: {topic_config['name']} (P={topic_config['partitions']}, RF={topic_config['replication']})") except TopicAlreadyExistsError: print("⚠ Some topics already exist") for topic_config in topics_config: print(f" {topic_config['name']} (P={topic_config['partitions']}, RF={topic_config['replication']})") except KafkaError as e: print(f"✗ Error creating topics: {e}") print("=" * 60) # List all topics to verify print("\nVerifying topics...") try: topics_metadata = admin_client.list_topics() topics_list = [topic for topic in topics_metadata if not topic.startswith('__')] print(f"Total topics created: {len(topics_list)}") print(f"Topics: {', '.join(sorted(topics_list))}") except Exception as e: print(f"Error listing topics: {e}") # Get cluster details print("\nCluster Details:") try: cluster_meta = admin_client.describe_cluster() print(f" Cluster ID: {cluster_meta.get('cluster_id')}") print(f" Brokers: {len(cluster_meta.get('brokers', []))}") except Exception as e: print(f" Error getting cluster details: {e}") admin_client.close() print("\n✓ Topic creation complete!") if __name__ == "__main__": try: create_topics() except Exception as e: print(f"\n✗ Error: {e}") sys.exit(1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server