Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
message_broker.py13.1 kB
"""메시지 브로커 구현""" import asyncio import json import time import logging from datetime import datetime from typing import Dict, List, Any, Optional, Callable, Set, Tuple from enum import Enum from collections import defaultdict, deque import heapq import aiofiles class MessagePriority(Enum): """메시지 우선순위""" LOW = 1 NORMAL = 2 HIGH = 3 CRITICAL = 4 class MessageBroker: """메시지 브로커 클래스""" def __init__(self, max_queue_size: int = 10000, enable_persistence: bool = False, persistence_file: str = "message_queue.json"): """ Args: max_queue_size: 최대 큐 크기 enable_persistence: 메시지 지속성 여부 persistence_file: 지속성 파일 경로 """ self.max_queue_size = max_queue_size self.enable_persistence = enable_persistence self.persistence_file = persistence_file # 우선순위 큐 (우선순위, 타임스탬프, 메시지) self.message_queue: List[Tuple[int, float, Dict[str, Any]]] = [] # 토픽별 구독자 self.subscribers: Dict[str, Set[Callable]] = defaultdict(set) # 메시지 처리 상태 self.is_running = False self._processing_task: Optional[asyncio.Task] = None self._persistence_task: Optional[asyncio.Task] = None # 통계 self.stats = { "messages_published": 0, "messages_processed": 0, "messages_dropped": 0, "active_subscribers": 0, "queue_size": 0, "processing_errors": 0 } # 로깅 self.logger = logging.getLogger(__name__) # 메시지 필터 self.message_filters: List[Callable] = [] # 처리 지연 시간 self.processing_delay = 0.1 # 100ms async def start(self) -> bool: """브로커 시작""" try: if self.is_running: return True self.is_running = True # 지속성 파일에서 로드 if self.enable_persistence: await self._load_from_persistence() # 메시지 처리 태스크 시작 self._processing_task = asyncio.create_task(self._process_messages()) # 지속성 태스크 시작 if self.enable_persistence: self._persistence_task = asyncio.create_task(self._persistence_loop()) self.logger.info("Message broker started") return True except Exception as e: self.logger.error(f"Failed to start message broker: {e}") return False async def stop(self) -> bool: """브로커 중지""" try: self.is_running = False # 처리 태스크 중지 if self._processing_task: self._processing_task.cancel() try: await self._processing_task except asyncio.CancelledError: pass self._processing_task = None # 지속성 태스크 중지 if self._persistence_task: self._persistence_task.cancel() try: await self._persistence_task except asyncio.CancelledError: pass self._persistence_task = None # 남은 메시지 지속성 저장 if self.enable_persistence and self.message_queue: await self._save_to_persistence() self.logger.info("Message broker stopped") return True except Exception as e: self.logger.error(f"Error stopping message broker: {e}") return False async def publish(self, topic: str, message: Dict[str, Any], priority: MessagePriority = MessagePriority.NORMAL) -> bool: """메시지 발행""" try: if not self.is_running: return False # 큐 크기 확인 if len(self.message_queue) >= self.max_queue_size: self.stats["messages_dropped"] += 1 self.logger.warning("Message queue full, dropping message") return False # 메시지 준비 full_message = { "topic": topic, "payload": message, "timestamp": time.time(), "priority": priority.value, "id": self._generate_message_id() } # 필터 적용 if not await self._apply_filters(full_message): return False # 우선순위 큐에 추가 (음수로 최대 힙을 최소 힙처럼 사용) heapq.heappush( self.message_queue, (-priority.value, time.time(), full_message) ) self.stats["messages_published"] += 1 self.stats["queue_size"] = len(self.message_queue) return True except Exception as e: self.logger.error(f"Error publishing message: {e}") return False def subscribe(self, topic: str, callback: Callable) -> bool: """토픽 구독""" try: self.subscribers[topic].add(callback) self.stats["active_subscribers"] = sum(len(subs) for subs in self.subscribers.values()) return True except Exception as e: self.logger.error(f"Error subscribing to topic: {e}") return False def unsubscribe(self, topic: str, callback: Callable) -> bool: """구독 해제""" try: if topic in self.subscribers and callback in self.subscribers[topic]: self.subscribers[topic].remove(callback) # 빈 토픽 정리 if not self.subscribers[topic]: del self.subscribers[topic] self.stats["active_subscribers"] = sum(len(subs) for subs in self.subscribers.values()) return True return False except Exception as e: self.logger.error(f"Error unsubscribing from topic: {e}") return False def get_topics(self) -> List[str]: """토픽 목록 조회""" return list(self.subscribers.keys()) def get_subscriber_count(self, topic: str) -> int: """토픽 구독자 수 조회""" return len(self.subscribers.get(topic, set())) def get_queue_size(self) -> int: """큐 크기 조회""" return len(self.message_queue) def get_stats(self) -> Dict[str, Any]: """브로커 통계 조회""" return { **self.stats, "queue_size": len(self.message_queue), "topics_count": len(self.subscribers), "is_running": self.is_running } def add_message_filter(self, filter_func: Callable[[Dict[str, Any]], bool]): """메시지 필터 추가""" self.message_filters.append(filter_func) def remove_message_filter(self, filter_func: Callable): """메시지 필터 제거""" if filter_func in self.message_filters: self.message_filters.remove(filter_func) def clear_queue(self): """큐 비우기""" self.message_queue.clear() self.stats["queue_size"] = 0 async def get_pending_messages(self, topic: str = None) -> List[Dict[str, Any]]: """대기 중인 메시지 조회""" result = [] for _, _, message in self.message_queue: if topic is None or message.get("topic") == topic: result.append(message) return result async def _process_messages(self): """메시지 처리 루프""" while self.is_running: try: if not self.message_queue: await asyncio.sleep(self.processing_delay) continue # 가장 높은 우선순위 메시지 가져오기 _, _, message = heapq.heappop(self.message_queue) self.stats["queue_size"] = len(self.message_queue) # 메시지 처리 await self._deliver_message(message) self.stats["messages_processed"] += 1 # 처리 지연 if self.processing_delay > 0: await asyncio.sleep(self.processing_delay) except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Message processing error: {e}") self.stats["processing_errors"] += 1 await asyncio.sleep(self.processing_delay) async def _deliver_message(self, message: Dict[str, Any]): """메시지 전달""" topic = message.get("topic") if not topic or topic not in self.subscribers: return # 모든 구독자에게 전달 callbacks = self.subscribers[topic].copy() for callback in callbacks: try: if asyncio.iscoroutinefunction(callback): await callback(message) else: callback(message) except Exception as e: self.logger.error(f"Error delivering message to subscriber: {e}") self.stats["processing_errors"] += 1 async def _apply_filters(self, message: Dict[str, Any]) -> bool: """메시지 필터 적용""" for filter_func in self.message_filters: try: if asyncio.iscoroutinefunction(filter_func): if not await filter_func(message): return False else: if not filter_func(message): return False except Exception as e: self.logger.error(f"Filter error: {e}") # 필터 오류 시 메시지를 통과시킴 return True def _generate_message_id(self) -> str: """메시지 ID 생성""" import uuid return str(uuid.uuid4()) async def _save_to_persistence(self): """지속성 파일에 저장""" try: if not self.message_queue: return # 메시지 직렬화 messages_to_save = [] for priority, timestamp, message in self.message_queue: messages_to_save.append({ "priority": -priority, # 원래 우선순위로 복원 "timestamp": timestamp, "message": message }) async with aiofiles.open(self.persistence_file, 'w') as f: await f.write(json.dumps(messages_to_save, default=str, indent=2)) self.logger.info(f"Saved {len(messages_to_save)} messages to persistence") except Exception as e: self.logger.error(f"Error saving to persistence: {e}") async def _load_from_persistence(self): """지속성 파일에서 로드""" try: async with aiofiles.open(self.persistence_file, 'r') as f: content = await f.read() messages_data = json.loads(content) # 메시지 복원 for msg_data in messages_data: priority = msg_data["priority"] timestamp = msg_data["timestamp"] message = msg_data["message"] heapq.heappush( self.message_queue, (-priority, timestamp, message) ) self.stats["queue_size"] = len(self.message_queue) self.logger.info(f"Loaded {len(messages_data)} messages from persistence") except FileNotFoundError: self.logger.info("No persistence file found, starting fresh") except Exception as e: self.logger.error(f"Error loading from persistence: {e}") async def _persistence_loop(self): """지속성 저장 루프""" while self.is_running: try: await self._save_to_persistence() await asyncio.sleep(60) # 1분마다 저장 except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Persistence loop error: {e}") await asyncio.sleep(60)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server