RabbitMQ MCP Server

import pika import ssl from typing import Optional class RabbitMQConnection: def __init__(self, host: str, port: int, username: str, password: str, use_tls: bool): self.protocol = "amqps" if use_tls else "amqp" self.url = f"{self.protocol}://{username}:{password}@{host}:{port}" self.parameters = pika.URLParameters(self.url) if use_tls: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') self.parameters.ssl_options = pika.SSLOptions(context=ssl_context) def get_channel(self) -> tuple[pika.BlockingConnection, pika.channel.Channel]: connection = pika.BlockingConnection(self.parameters) channel = connection.channel() return connection, channel def validate_rabbitmq_name(name: str, field_name: str) -> None: """Validate RabbitMQ queue/exchange names""" if not name or not name.strip(): raise ValueError(f"{field_name} cannot be empty") if not all(c.isalnum() or c in '-_.:' for c in name): raise ValueError(f"{field_name} can only contain letters, digits, hyphen, underscore, period, or colon") if len(name) > 255: raise ValueError(f"{field_name} must be less than 255 characters")