RabbitMQ MCP Server

from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import ( TextContent, Tool, ) import ssl from .models import ( Enqueue, Fanout, ListQueues, ListExchanges, GetQueueInfo, DeleteQueue, PurgeQueue, DeleteExchange, GetExchangeInfo ) from .logger import Logger, LOG_LEVEL from .connection import RabbitMQConnection, validate_rabbitmq_name from .handlers import ( handle_enqueue, handle_fanout, handle_list_queues, handle_list_exchanges, handle_get_queue_info, handle_delete_queue, handle_purge_queue, handle_delete_exchange, handle_get_exchange_info ) from .admin import RabbitMQAdmin async def serve(rabbitmq_host: str, port: int, username: str, password: str, use_tls: bool, log_level: str = LOG_LEVEL.DEBUG.name, api_port: int = 15671) -> None: # Setup server server = Server("mcp-rabbitmq") # Setup logger is_log_level_exception = False try: log_level = LOG_LEVEL[log_level] except Exception: is_log_level_exception = True log_level = LOG_LEVEL.WARNING logger = Logger("server.log", log_level) if is_log_level_exception: logger.warning("Wrong log_level received. Default to WARNING") @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="enqueue", description="""Enqueue a message to a queue hosted on RabbitMQ""", inputSchema=Enqueue.model_json_schema(), ), Tool( name="fanout", description="""Publish a message to an exchange with fanout type""", inputSchema=Fanout.model_json_schema(), ), Tool( name="list_queues", description="""List all the queues in the broker""", inputSchema=ListQueues.model_json_schema(), ), Tool( name="list_exchanges", description="""List all the exchanges in the broker""", inputSchema=ListExchanges.model_json_schema(), ), Tool( name="get_queue_info", description="""Get detailed information about a specific queue""", inputSchema=GetQueueInfo.model_json_schema(), ), Tool( name="delete_queue", description="""Delete a specific queue""", inputSchema=DeleteQueue.model_json_schema(), ), Tool( name="purge_queue", description="""Remove all messages from a specific queue""", inputSchema=PurgeQueue.model_json_schema(), ), Tool( name="delete_exchange", description="""Delete a specific exchange""", inputSchema=DeleteExchange.model_json_schema(), ), Tool( name="get_exchange_info", description="""Get detailed information about a specific exchange""", inputSchema=GetExchangeInfo.model_json_schema(), ) ] @server.call_tool() async def call_tool( name: str, arguments: dict ) -> list[TextContent]: if name == "enqueue": logger.debug("Executing enqueue tool") message = arguments["message"] queue = arguments["queue"] validate_rabbitmq_name(queue, "Queue name") try: # Setup RabbitMQ connection rabbitmq = RabbitMQConnection(rabbitmq_host, port, username, password, use_tls) handle_enqueue(rabbitmq, queue, message) return [TextContent(type="text", text=str("suceeded"))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] elif name == "fanout": logger.debug("Executing fanout tool") message = arguments["message"] exchange = arguments["exchange"] validate_rabbitmq_name(exchange, "Exchange name") try: # Setup RabbitMQ connection rabbitmq = RabbitMQConnection(rabbitmq_host, port, username, password, use_tls) handle_fanout(rabbitmq, exchange, message) return [TextContent(type="text", text=str("suceeded"))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] elif name == "list_queues": try: admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls) result = handle_list_queues(admin) return [TextContent(type="text", text=str(result))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] return [TextContent(type="text", text=str("succeeded"))] elif name == "list_exchanges": try: admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls) result = handle_list_exchanges(admin) return [TextContent(type="text", text=str(result))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] elif name == "get_queue_info": try: admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls) queue = arguments["queue"] vhost = arguments.get("vhost", "/") validate_rabbitmq_name(queue, "Queue name") result = handle_get_queue_info(admin, queue, vhost) return [TextContent(type="text", text=str(result))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] elif name == "delete_queue": try: admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls) queue = arguments["queue"] vhost = arguments.get("vhost", "/") validate_rabbitmq_name(queue, "Queue name") handle_delete_queue(admin, queue, vhost) return [TextContent(type="text", text=str("succeeded"))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] elif name == "purge_queue": try: admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls) queue = arguments["queue"] vhost = arguments.get("vhost", "/") validate_rabbitmq_name(queue, "Queue name") handle_purge_queue(admin, queue, vhost) return [TextContent(type="text", text=str("succeeded"))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] elif name == "delete_exchange": try: admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls) exchange = arguments["exchange"] vhost = arguments.get("vhost", "/") validate_rabbitmq_name(exchange, "Exchange name") handle_delete_exchange(admin, exchange, vhost) return [TextContent(type="text", text=str("succeeded"))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] elif name == "get_exchange_info": try: admin = RabbitMQAdmin(rabbitmq_host, api_port, username, password, use_tls) exchange = arguments["exchange"] vhost = arguments.get("vhost", "/") validate_rabbitmq_name(exchange, "Exchange name") result = handle_get_exchange_info(admin, exchange, vhost) return [TextContent(type="text", text=str(result))] except Exception as e: logger.error(f"{e}") return [TextContent(type="text", text=str("failed"))] raise ValueError(f"Tool not found: {name}") options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, options, raise_exceptions=True)