message_broker.py•13.1 kB
"""메시지 브로커 구현"""
import asyncio
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional, Callable, Set, Tuple
from enum import Enum
from collections import defaultdict, deque
import heapq
import aiofiles
class MessagePriority(Enum):
"""메시지 우선순위"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
class MessageBroker:
"""메시지 브로커 클래스"""
def __init__(self, max_queue_size: int = 10000, enable_persistence: bool = False,
persistence_file: str = "message_queue.json"):
"""
Args:
max_queue_size: 최대 큐 크기
enable_persistence: 메시지 지속성 여부
persistence_file: 지속성 파일 경로
"""
self.max_queue_size = max_queue_size
self.enable_persistence = enable_persistence
self.persistence_file = persistence_file
# 우선순위 큐 (우선순위, 타임스탬프, 메시지)
self.message_queue: List[Tuple[int, float, Dict[str, Any]]] = []
# 토픽별 구독자
self.subscribers: Dict[str, Set[Callable]] = defaultdict(set)
# 메시지 처리 상태
self.is_running = False
self._processing_task: Optional[asyncio.Task] = None
self._persistence_task: Optional[asyncio.Task] = None
# 통계
self.stats = {
"messages_published": 0,
"messages_processed": 0,
"messages_dropped": 0,
"active_subscribers": 0,
"queue_size": 0,
"processing_errors": 0
}
# 로깅
self.logger = logging.getLogger(__name__)
# 메시지 필터
self.message_filters: List[Callable] = []
# 처리 지연 시간
self.processing_delay = 0.1 # 100ms
async def start(self) -> bool:
"""브로커 시작"""
try:
if self.is_running:
return True
self.is_running = True
# 지속성 파일에서 로드
if self.enable_persistence:
await self._load_from_persistence()
# 메시지 처리 태스크 시작
self._processing_task = asyncio.create_task(self._process_messages())
# 지속성 태스크 시작
if self.enable_persistence:
self._persistence_task = asyncio.create_task(self._persistence_loop())
self.logger.info("Message broker started")
return True
except Exception as e:
self.logger.error(f"Failed to start message broker: {e}")
return False
async def stop(self) -> bool:
"""브로커 중지"""
try:
self.is_running = False
# 처리 태스크 중지
if self._processing_task:
self._processing_task.cancel()
try:
await self._processing_task
except asyncio.CancelledError:
pass
self._processing_task = None
# 지속성 태스크 중지
if self._persistence_task:
self._persistence_task.cancel()
try:
await self._persistence_task
except asyncio.CancelledError:
pass
self._persistence_task = None
# 남은 메시지 지속성 저장
if self.enable_persistence and self.message_queue:
await self._save_to_persistence()
self.logger.info("Message broker stopped")
return True
except Exception as e:
self.logger.error(f"Error stopping message broker: {e}")
return False
async def publish(self, topic: str, message: Dict[str, Any],
priority: MessagePriority = MessagePriority.NORMAL) -> bool:
"""메시지 발행"""
try:
if not self.is_running:
return False
# 큐 크기 확인
if len(self.message_queue) >= self.max_queue_size:
self.stats["messages_dropped"] += 1
self.logger.warning("Message queue full, dropping message")
return False
# 메시지 준비
full_message = {
"topic": topic,
"payload": message,
"timestamp": time.time(),
"priority": priority.value,
"id": self._generate_message_id()
}
# 필터 적용
if not await self._apply_filters(full_message):
return False
# 우선순위 큐에 추가 (음수로 최대 힙을 최소 힙처럼 사용)
heapq.heappush(
self.message_queue,
(-priority.value, time.time(), full_message)
)
self.stats["messages_published"] += 1
self.stats["queue_size"] = len(self.message_queue)
return True
except Exception as e:
self.logger.error(f"Error publishing message: {e}")
return False
def subscribe(self, topic: str, callback: Callable) -> bool:
"""토픽 구독"""
try:
self.subscribers[topic].add(callback)
self.stats["active_subscribers"] = sum(len(subs) for subs in self.subscribers.values())
return True
except Exception as e:
self.logger.error(f"Error subscribing to topic: {e}")
return False
def unsubscribe(self, topic: str, callback: Callable) -> bool:
"""구독 해제"""
try:
if topic in self.subscribers and callback in self.subscribers[topic]:
self.subscribers[topic].remove(callback)
# 빈 토픽 정리
if not self.subscribers[topic]:
del self.subscribers[topic]
self.stats["active_subscribers"] = sum(len(subs) for subs in self.subscribers.values())
return True
return False
except Exception as e:
self.logger.error(f"Error unsubscribing from topic: {e}")
return False
def get_topics(self) -> List[str]:
"""토픽 목록 조회"""
return list(self.subscribers.keys())
def get_subscriber_count(self, topic: str) -> int:
"""토픽 구독자 수 조회"""
return len(self.subscribers.get(topic, set()))
def get_queue_size(self) -> int:
"""큐 크기 조회"""
return len(self.message_queue)
def get_stats(self) -> Dict[str, Any]:
"""브로커 통계 조회"""
return {
**self.stats,
"queue_size": len(self.message_queue),
"topics_count": len(self.subscribers),
"is_running": self.is_running
}
def add_message_filter(self, filter_func: Callable[[Dict[str, Any]], bool]):
"""메시지 필터 추가"""
self.message_filters.append(filter_func)
def remove_message_filter(self, filter_func: Callable):
"""메시지 필터 제거"""
if filter_func in self.message_filters:
self.message_filters.remove(filter_func)
def clear_queue(self):
"""큐 비우기"""
self.message_queue.clear()
self.stats["queue_size"] = 0
async def get_pending_messages(self, topic: str = None) -> List[Dict[str, Any]]:
"""대기 중인 메시지 조회"""
result = []
for _, _, message in self.message_queue:
if topic is None or message.get("topic") == topic:
result.append(message)
return result
async def _process_messages(self):
"""메시지 처리 루프"""
while self.is_running:
try:
if not self.message_queue:
await asyncio.sleep(self.processing_delay)
continue
# 가장 높은 우선순위 메시지 가져오기
_, _, message = heapq.heappop(self.message_queue)
self.stats["queue_size"] = len(self.message_queue)
# 메시지 처리
await self._deliver_message(message)
self.stats["messages_processed"] += 1
# 처리 지연
if self.processing_delay > 0:
await asyncio.sleep(self.processing_delay)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Message processing error: {e}")
self.stats["processing_errors"] += 1
await asyncio.sleep(self.processing_delay)
async def _deliver_message(self, message: Dict[str, Any]):
"""메시지 전달"""
topic = message.get("topic")
if not topic or topic not in self.subscribers:
return
# 모든 구독자에게 전달
callbacks = self.subscribers[topic].copy()
for callback in callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(message)
else:
callback(message)
except Exception as e:
self.logger.error(f"Error delivering message to subscriber: {e}")
self.stats["processing_errors"] += 1
async def _apply_filters(self, message: Dict[str, Any]) -> bool:
"""메시지 필터 적용"""
for filter_func in self.message_filters:
try:
if asyncio.iscoroutinefunction(filter_func):
if not await filter_func(message):
return False
else:
if not filter_func(message):
return False
except Exception as e:
self.logger.error(f"Filter error: {e}")
# 필터 오류 시 메시지를 통과시킴
return True
def _generate_message_id(self) -> str:
"""메시지 ID 생성"""
import uuid
return str(uuid.uuid4())
async def _save_to_persistence(self):
"""지속성 파일에 저장"""
try:
if not self.message_queue:
return
# 메시지 직렬화
messages_to_save = []
for priority, timestamp, message in self.message_queue:
messages_to_save.append({
"priority": -priority, # 원래 우선순위로 복원
"timestamp": timestamp,
"message": message
})
async with aiofiles.open(self.persistence_file, 'w') as f:
await f.write(json.dumps(messages_to_save, default=str, indent=2))
self.logger.info(f"Saved {len(messages_to_save)} messages to persistence")
except Exception as e:
self.logger.error(f"Error saving to persistence: {e}")
async def _load_from_persistence(self):
"""지속성 파일에서 로드"""
try:
async with aiofiles.open(self.persistence_file, 'r') as f:
content = await f.read()
messages_data = json.loads(content)
# 메시지 복원
for msg_data in messages_data:
priority = msg_data["priority"]
timestamp = msg_data["timestamp"]
message = msg_data["message"]
heapq.heappush(
self.message_queue,
(-priority, timestamp, message)
)
self.stats["queue_size"] = len(self.message_queue)
self.logger.info(f"Loaded {len(messages_data)} messages from persistence")
except FileNotFoundError:
self.logger.info("No persistence file found, starting fresh")
except Exception as e:
self.logger.error(f"Error loading from persistence: {e}")
async def _persistence_loop(self):
"""지속성 저장 루프"""
while self.is_running:
try:
await self._save_to_persistence()
await asyncio.sleep(60) # 1분마다 저장
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Persistence loop error: {e}")
await asyncio.sleep(60)