RabbitMQ MCP Server

from .connection import RabbitMQConnection from .admin import RabbitMQAdmin from typing import List def handle_enqueue(rabbitmq: RabbitMQConnection, queue: str, message: str): connection, channel = rabbitmq.get_channel() channel.queue_declare(queue) channel.basic_publish(exchange="", routing_key=queue, body=message) connection.close() def handle_fanout(rabbitmq: RabbitMQConnection, exchange: str, message: str): connection, channel = rabbitmq.get_channel() channel.exchange_declare(exchange=exchange, exchange_type="fanout") channel.basic_publish(exchange=exchange, routing_key="", body=message) connection.close() def handle_list_queues(rabbitmq_admin: RabbitMQAdmin) -> List[str]: result = rabbitmq_admin.list_queues() return [queue['name'] for queue in result] def handle_list_exchanges(rabbitmq_admin: RabbitMQAdmin) -> List[str]: result = rabbitmq_admin.list_exchanges() return [exchange['name'] for exchange in result] def handle_get_queue_info(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> dict: return rabbitmq_admin.get_queue_info(queue, vhost) def handle_delete_queue(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> None: rabbitmq_admin.delete_queue(queue, vhost) def handle_purge_queue(rabbitmq_admin: RabbitMQAdmin, queue: str, vhost: str = "/") -> None: rabbitmq_admin.purge_queue(queue, vhost) def handle_delete_exchange(rabbitmq_admin: RabbitMQAdmin, exchange: str, vhost: str = "/") -> None: rabbitmq_admin.delete_exchange(exchange, vhost) def handle_get_exchange_info(rabbitmq_admin: RabbitMQAdmin, exchange: str, vhost: str = "/") -> dict: return rabbitmq_admin.get_exchange_info(exchange, vhost)