Skip to main content
Glama
handlers.py4.19 kB
from .connection import RabbitMQConnection from .admin import RabbitMQAdmin from typing import List, Optional, Any import logging from kafka.producer import AIOKafkaProducer from kafka.admin import AIOKafkaAdminClient logger = logging.getLogger(__name__) def handle_enqueue(rabbitmq: RabbitMQConnection, queue: str, message: str): connection, channel = rabbitmq.get_channel() channel.queue_declare(queue) channel.basic_publish(exchange="", routing_key=queue, body=message) connection.close() def handle_fanout(rabbitmq: RabbitMQConnection, exchange: str, message: str): connection, channel = rabbitmq.get_channel() channel.exchange_declare(exchange=exchange, exchange_type="fanout") channel.basic_publish(exchange=exchange, routing_key="", body=message) connection.close() def handle_list_queues(rabbitmq_admin: RabbitMQAdmin) -> List[str]: result = rabbitmq_admin.list_queues() return [queue['name'] for queue in result] def handle_list_exchanges(rabbitmq_admin: RabbitMQAdmin) -> List[str]: result = rabbitmq_admin.list_exchanges() return [exchange['name'] for exchange in result] def handle_get_queue_info(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> dict: return rabbitmq_admin.get_queue_info(queue, vhost) def handle_delete_queue(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> None: rabbitmq_admin.delete_queue(queue, vhost) def handle_purge_queue(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> None: rabbitmq_admin.purge_queue(queue, vhost) def handle_delete_exchange(rabbitmq_admin: RabbitMQAdmin, exchange: str, vhost: str = "/") -> None: rabbitmq_admin.delete_exchange(exchange, vhost) def handle_get_exchange_info(rabbitmq_admin: RabbitMQAdmin, exchange: str, vhost: str = "/") -> dict: return rabbitmq_admin.get_exchange_info(exchange, vhost) async def handle_publish_message(producer: AIOKafkaProducer, topic: str, message: str, key: Optional[str] = None): """Publishes a message to a Kafka topic.""" logger.info(f"Publishing message to topic '{topic}'...") try: # Encode message and key (if provided) to bytes message_bytes = message.encode('utf-8') key_bytes = key.encode('utf-8') if key else None await producer.send_and_wait(topic, value=message_bytes, key=key_bytes) logger.info(f"Message successfully published to topic '{topic}'.") except Exception as e: logger.error(f"Failed to publish message to topic '{topic}': {e}", exc_info=True) raise # Re-raise the exception to be caught in server.py async def handle_list_topics(admin_client: AIOKafkaAdminClient) -> List[str]: """Lists all available topics in the Kafka cluster.""" logger.info("Fetching list of Kafka topics...") try: topics = await admin_client.list_topics() logger.info(f"Successfully fetched {len(topics)} topics.") return topics except Exception as e: logger.error(f"Failed to list Kafka topics: {e}", exc_info=True) raise # Placeholder for other handlers (like consume, describe topic, etc.) # We will need corresponding admin functions in admin.py for some of these # Example: # async def handle_consume_messages(consumer: AIOKafkaConsumer, max_messages: int = 10, timeout_ms: int = 1000): # """Consumes messages from the subscribed topics.""" # logger.info(f"Consuming messages...") # messages = [] # try: # async for msg in consumer: # messages.append({ # "topic": msg.topic, # "partition": msg.partition, # "offset": msg.offset, # "key": msg.key.decode('utf-8') if msg.key else None, # "value": msg.value.decode('utf-8') # Assuming utf-8 encoded messages # }) # if len(messages) >= max_messages: # break # # Potentially add a timeout mechanism if no messages arrive # logger.info(f"Consumed {len(messages)} messages.") # return messages # except Exception as e: # logger.error(f"Failed to consume messages: {e}", exc_info=True) # raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DevarshKumbhare/MCP-Kafka'

If you have feedback or need assistance with the MCP directory API, please join our Discord server