from typing import Dict, Any, Optional
import json
import asyncio
from datetime import datetime
from app.config import settings
class RabbitMQClient:
"""Mock RabbitMQ client for testing purposes."""
def __init__(self, connection_url: str = None):
self.connection_url = connection_url or settings.rabbitmq_url
self.connected = False
self.queues: Dict[str, list] = {}
self.exchanges: Dict[str, Dict] = {}
async def connect(self) -> bool:
"""Simulate connection to RabbitMQ."""
await asyncio.sleep(0.1) # Simulate connection delay
self.connected = True
return True
async def disconnect(self) -> None:
"""Simulate disconnection from RabbitMQ."""
await asyncio.sleep(0.1) # Simulate disconnection delay
self.connected = False
self.queues.clear()
self.exchanges.clear()
async def declare_queue(self, queue_name: str, durable: bool = True) -> bool:
"""Declare a queue."""
if not self.connected:
raise ConnectionError("Not connected to RabbitMQ")
if queue_name not in self.queues:
self.queues[queue_name] = []
return True
async def declare_exchange(self, exchange_name: str, exchange_type: str = "direct") -> bool:
"""Declare an exchange."""
if not self.connected:
raise ConnectionError("Not connected to RabbitMQ")
self.exchanges[exchange_name] = {
"type": exchange_type,
"bindings": {}
}
return True
async def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str = "") -> bool:
"""Bind a queue to an exchange."""
if not self.connected:
raise ConnectionError("Not connected to RabbitMQ")
if exchange_name not in self.exchanges:
await self.declare_exchange(exchange_name)
if queue_name not in self.queues:
await self.declare_queue(queue_name)
if routing_key not in self.exchanges[exchange_name]["bindings"]:
self.exchanges[exchange_name]["bindings"][routing_key] = []
if queue_name not in self.exchanges[exchange_name]["bindings"][routing_key]:
self.exchanges[exchange_name]["bindings"][routing_key].append(queue_name)
return True
async def publish_message(
self,
message: Dict[str, Any],
exchange_name: str = "",
routing_key: str = "default",
properties: Optional[Dict] = None
) -> bool:
"""Publish a message to an exchange."""
if not self.connected:
raise ConnectionError("Not connected to RabbitMQ")
# Add metadata to message
enriched_message = {
"body": message,
"properties": properties or {},
"timestamp": datetime.utcnow().isoformat(),
"exchange": exchange_name,
"routing_key": routing_key
}
# If no exchange specified, publish directly to queue
if not exchange_name:
if routing_key in self.queues:
self.queues[routing_key].append(enriched_message)
else:
# Publish to exchange
if exchange_name in self.exchanges:
bindings = self.exchanges[exchange_name]["bindings"]
if routing_key in bindings:
for queue_name in bindings[routing_key]:
self.queues[queue_name].append(enriched_message)
return True
async def consume_message(self, queue_name: str) -> Optional[Dict[str, Any]]:
"""Consume a message from a queue."""
if not self.connected:
raise ConnectionError("Not connected to RabbitMQ")
if queue_name not in self.queues:
await self.declare_queue(queue_name)
if self.queues[queue_name]:
return self.queues[queue_name].pop(0)
return None
async def get_queue_info(self, queue_name: str) -> Dict[str, Any]:
"""Get information about a queue."""
if not self.connected:
raise ConnectionError("Not connected to RabbitMQ")
if queue_name not in self.queues:
return {"name": queue_name, "exists": False}
return {
"name": queue_name,
"exists": True,
"message_count": len(self.queues[queue_name]),
"messages": self.queues[queue_name].copy()
}
async def purge_queue(self, queue_name: str) -> int:
"""Purge all messages from a queue."""
if not self.connected:
raise ConnectionError("Not connected to RabbitMQ")
if queue_name in self.queues:
count = len(self.queues[queue_name])
self.queues[queue_name].clear()
return count
return 0
def is_connected(self) -> bool:
"""Check if connected to RabbitMQ."""
return self.connected
# Global client instance
rabbitmq_client = RabbitMQClient()
async def get_rabbitmq_client() -> RabbitMQClient:
"""Dependency to get RabbitMQ client."""
if not rabbitmq_client.is_connected():
await rabbitmq_client.connect()
return rabbitmq_client