Skip to main content
Glama
admin.py8.43 kB
import logging from typing import Optional, List, Dict, Any from kafka.admin import AIOKafkaAdminClient, NewTopic, TopicPartition from kafka.errors import UnknownTopicOrPartitionError, TopicAlreadyExistsError logger = logging.getLogger(__name__) class RabbitMQAdmin: def __init__(self, host: str, port: int, username: str, password: str, use_tls: bool): self.protocol = "https" if use_tls else "http" self.base_url = f"{self.protocol}://{host}:{port}/api" self.auth = base64.b64encode(f"{username}:{password}".encode()).decode() self.headers = { "Authorization": f"Basic {self.auth}", "Content-Type": "application/json" } def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> requests.Response: url = f"{self.base_url}/{endpoint}" response = requests.request(method, url, headers=self.headers, json=data, verify=True) response.raise_for_status() return response def list_queues(self) -> List[Dict]: """List all queues in the RabbitMQ server""" response = self._make_request("GET", "queues") return response.json() def list_exchanges(self) -> List[Dict]: """List all exchanges in the RabbitMQ server""" response = self._make_request("GET", "exchanges") return response.json() def get_queue_info(self, queue: str, vhost: str = "/") -> Dict: """Get detailed information about a specific queue""" vhost_encoded = requests.utils.quote(vhost, safe='') response = self._make_request("GET", f"queues/{vhost_encoded}/{queue}") return response.json() def delete_queue(self, queue: str, vhost: str = "/") -> None: """Delete a queue""" validate_rabbitmq_name(queue, "Queue name") vhost_encoded = requests.utils.quote(vhost, safe='') self._make_request("DELETE", f"queues/{vhost_encoded}/{queue}") def purge_queue(self, queue: str, vhost: str = "/") -> None: """Remove all messages from a queue""" validate_rabbitmq_name(queue, "Queue name") vhost_encoded = requests.utils.quote(vhost, safe='') self._make_request("DELETE", f"queues/{vhost_encoded}/{queue}/contents") def get_exchange_info(self, exchange: str, vhost: str = "/") -> Dict: """Get detailed information about a specific exchange""" vhost_encoded = requests.utils.quote(vhost, safe='') response = self._make_request("GET", f"exchanges/{vhost_encoded}/{exchange}") return response.json() def delete_exchange(self, exchange: str, vhost: str = "/") -> None: """Delete an exchange""" validate_rabbitmq_name(exchange, "Exchange name") vhost_encoded = requests.utils.quote(vhost, safe='') self._make_request("DELETE", f"exchanges/{vhost_encoded}/{exchange}") def get_bindings(self, queue: Optional[str] = None, exchange: Optional[str] = None, vhost: str = "/") -> List[Dict]: """Get bindings, optionally filtered by queue or exchange""" vhost_encoded = requests.utils.quote(vhost, safe='') if queue: validate_rabbitmq_name(queue, "Queue name") response = self._make_request("GET", f"queues/{vhost_encoded}/{queue}/bindings") elif exchange: validate_rabbitmq_name(exchange, "Exchange name") response = self._make_request("GET", f"exchanges/{vhost_encoded}/{exchange}/bindings/source") else: response = self._make_request("GET", f"bindings/{vhost_encoded}") return response.json() def get_overview(self) -> Dict: """Get overview of RabbitMQ server including version, stats, and listeners""" response = self._make_request("GET", "overview") return response.json() async def handle_get_topic_info(admin_client: AIOKafkaAdminClient, topic: str) -> Dict[str, Any]: """Gets detailed information about a specific Kafka topic.""" logger.info(f"Fetching information for topic '{topic}'...") try: # describe_topics returns a list, even for a single topic topic_metadata_list = await admin_client.describe_topics([topic]) if not topic_metadata_list: raise UnknownTopicOrPartitionError(f"Topic '{topic}' not found.") # Extract the metadata for the requested topic topic_metadata = topic_metadata_list[0] # Describe configs for the topic config_desc = await admin_client.describe_configs(resource_configs=[('topic', topic)]) topic_config = config_desc[0]['configs'] if config_desc else {} # Get consumer group information related to the topic (optional, can be complex) # groups = await admin_client.list_consumer_groups() # topic_groups = [] # for group_id, _ in groups: # try: # group_desc = await admin_client.describe_consumer_groups([group_id]) # if group_desc and group_desc[0]['state'] != 'Dead': # # Check if group consumes the topic (requires offset fetching) # # This part can be involved and might require listing/fetching offsets # pass # Simplified for now # except Exception as group_err: # logger.warning(f"Could not describe group '{group_id}': {group_err}") result = { "topic_name": topic_metadata['topic'], "is_internal": topic_metadata['is_internal'], "partitions": [ { "partition_id": p_meta['partition'], "leader": p_meta['leader'], "replicas": p_meta['replicas'], "isr": p_meta['isr'], # Add partition offset info if needed (requires consumer API) } for p_meta in topic_metadata['partitions'] ], "configuration": topic_config, # "consumer_groups": topic_groups # Add if implemented } logger.info(f"Successfully fetched information for topic '{topic}'.") return result except UnknownTopicOrPartitionError as e: logger.warning(f"Topic '{topic}' not found.") raise e # Re-raise to be handled by the server except Exception as e: logger.error(f"Failed to get information for topic '{topic}': {e}", exc_info=True) raise async def handle_delete_topic(admin_client: AIOKafkaAdminClient, topic: str) -> None: """Deletes a Kafka topic.""" logger.info(f"Attempting to delete topic '{topic}'...") try: await admin_client.delete_topics(topics=[topic], timeout_ms=30000) # 30 second timeout logger.info(f"Successfully submitted deletion request for topic '{topic}'. Note: Deletion might take time on the broker.") except UnknownTopicOrPartitionError: logger.warning(f"Cannot delete topic '{topic}': Topic not found.") raise # Re-raise to inform the user except Exception as e: # Add more specific error handling if needed (e.g., PolicyViolationError) logger.error(f"Failed to delete topic '{topic}': {e}", exc_info=True) raise async def handle_create_topic( admin_client: AIOKafkaAdminClient, topic: str, num_partitions: int = 1, replication_factor: int = 1, config: Optional[Dict[str, str]] = None ) -> None: """Creates a new Kafka topic.""" logger.info(f"Attempting to create topic '{topic}' with {num_partitions} partitions and replication factor {replication_factor}...") new_topic = NewTopic( name=topic, num_partitions=num_partitions, replication_factor=replication_factor, topic_configs=config or {} ) try: await admin_client.create_topics(new_topics=[new_topic], validate_only=False) logger.info(f"Successfully created topic '{topic}'.") except TopicAlreadyExistsError: logger.warning(f"Topic '{topic}' already exists.") # Decide if this should be an error or just a warning # raise # Uncomment to make it an error except Exception as e: # Add more specific error handling (e.g., PolicyViolationError, InvalidReplicationFactorError) logger.error(f"Failed to create topic '{topic}': {e}", exc_info=True) raise # Add other admin functions as needed (e.g., handle_alter_topic_config)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DevarshKumbhare/MCP-Kafka'

If you have feedback or need assistance with the MCP directory API, please join our Discord server