RabbitMQ MCP Server
by kenliao94
- mcp-server-rabbitmq
- src
- mcp_server_rabbitmq
import requests
from typing import Optional, List, Dict
import base64
class RabbitMQAdmin:
def __init__(self, host: str, port: int, username: str, password: str, use_tls: bool):
self.protocol = "https" if use_tls else "http"
self.base_url = f"{self.protocol}://{host}:{port}/api"
self.auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.headers = {
"Authorization": f"Basic {self.auth}",
"Content-Type": "application/json"
}
def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> requests.Response:
url = f"{self.base_url}/{endpoint}"
response = requests.request(method, url, headers=self.headers, json=data, verify=True)
response.raise_for_status()
return response
def list_queues(self) -> List[Dict]:
"""List all queues in the RabbitMQ server"""
response = self._make_request("GET", "queues")
return response.json()
def list_exchanges(self) -> List[Dict]:
"""List all exchanges in the RabbitMQ server"""
response = self._make_request("GET", "exchanges")
return response.json()
def get_queue_info(self, queue: str, vhost: str = "/") -> Dict:
"""Get detailed information about a specific queue"""
vhost_encoded = requests.utils.quote(vhost, safe='')
response = self._make_request("GET", f"queues/{vhost_encoded}/{queue}")
return response.json()
def delete_queue(self, queue: str, vhost: str = "/") -> None:
"""Delete a queue"""
validate_rabbitmq_name(queue, "Queue name")
vhost_encoded = requests.utils.quote(vhost, safe='')
self._make_request("DELETE", f"queues/{vhost_encoded}/{queue}")
def purge_queue(self, queue: str, vhost: str = "/") -> None:
"""Remove all messages from a queue"""
validate_rabbitmq_name(queue, "Queue name")
vhost_encoded = requests.utils.quote(vhost, safe='')
self._make_request("DELETE", f"queues/{vhost_encoded}/{queue}/contents")
def get_exchange_info(self, exchange: str, vhost: str = "/") -> Dict:
"""Get detailed information about a specific exchange"""
vhost_encoded = requests.utils.quote(vhost, safe='')
response = self._make_request("GET", f"exchanges/{vhost_encoded}/{exchange}")
return response.json()
def delete_exchange(self, exchange: str, vhost: str = "/") -> None:
"""Delete an exchange"""
validate_rabbitmq_name(exchange, "Exchange name")
vhost_encoded = requests.utils.quote(vhost, safe='')
self._make_request("DELETE", f"exchanges/{vhost_encoded}/{exchange}")
def get_bindings(self, queue: Optional[str] = None, exchange: Optional[str] = None, vhost: str = "/") -> List[Dict]:
"""Get bindings, optionally filtered by queue or exchange"""
vhost_encoded = requests.utils.quote(vhost, safe='')
if queue:
validate_rabbitmq_name(queue, "Queue name")
response = self._make_request("GET", f"queues/{vhost_encoded}/{queue}/bindings")
elif exchange:
validate_rabbitmq_name(exchange, "Exchange name")
response = self._make_request("GET", f"exchanges/{vhost_encoded}/{exchange}/bindings/source")
else:
response = self._make_request("GET", f"bindings/{vhost_encoded}")
return response.json()
def get_overview(self) -> Dict:
"""Get overview of RabbitMQ server including version, stats, and listeners"""
response = self._make_request("GET", "overview")
return response.json()