Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
connection_manager.py13.3 kB
"""연결 관리자 구현""" import asyncio import time import logging from datetime import datetime, timedelta from typing import Dict, List, Set, Any, Optional, Callable from collections import defaultdict, deque from dataclasses import dataclass @dataclass class ConnectionInfo: """연결 정보""" connection_id: str remote_address: str connect_time: float last_activity: float request_count: int = 0 data_sent: int = 0 data_received: int = 0 is_active: bool = True class ConnectionManager: """연결 관리자 클래스""" def __init__(self, max_connections: int = 1000, rate_limit_requests: int = 100, rate_limit_window: int = 60, cleanup_interval: int = 300): """ Args: max_connections: 최대 연결 수 rate_limit_requests: 속도 제한 요청 수 rate_limit_window: 속도 제한 시간 창 (초) cleanup_interval: 정리 간격 (초) """ self.max_connections = max_connections self.rate_limit_requests = rate_limit_requests self.rate_limit_window = rate_limit_window self.cleanup_interval = cleanup_interval # 연결 관리 self.connections: Dict[str, ConnectionInfo] = {} self.connection_by_address: Dict[str, Set[str]] = defaultdict(set) # 속도 제한 관리 self.rate_limit_data: Dict[str, deque] = defaultdict(lambda: deque(maxlen=rate_limit_requests)) # 통계 self.stats = { "total_connections": 0, "active_connections": 0, "rejected_connections": 0, "rate_limited_requests": 0, "cleanup_runs": 0, "connections_cleaned": 0 } # 백그라운드 태스크 self._cleanup_task: Optional[asyncio.Task] = None self.is_running = False # 콜백 self.on_connection_limit_exceeded: Optional[Callable] = None self.on_rate_limit_exceeded: Optional[Callable] = None self.on_connection_cleanup: Optional[Callable] = None # 로깅 self.logger = logging.getLogger(__name__) async def start(self) -> bool: """관리자 시작""" try: self.is_running = True # 정리 태스크 시작 self._cleanup_task = asyncio.create_task(self._cleanup_loop()) self.logger.info("Connection manager started") return True except Exception as e: self.logger.error(f"Failed to start connection manager: {e}") return False async def stop(self) -> bool: """관리자 중지""" try: self.is_running = False # 정리 태스크 중지 if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass self._cleanup_task = None self.logger.info("Connection manager stopped") return True except Exception as e: self.logger.error(f"Error stopping connection manager: {e}") return False async def add_connection(self, connection_id: str, remote_address: str) -> bool: """연결 추가""" try: # 연결 수 제한 확인 if len(self.connections) >= self.max_connections: self.stats["rejected_connections"] += 1 if self.on_connection_limit_exceeded: await self._safe_callback(self.on_connection_limit_exceeded, connection_id, remote_address) return False # 연결 정보 생성 current_time = time.time() connection_info = ConnectionInfo( connection_id=connection_id, remote_address=remote_address, connect_time=current_time, last_activity=current_time ) # 연결 등록 self.connections[connection_id] = connection_info self.connection_by_address[remote_address].add(connection_id) # 통계 업데이트 self.stats["total_connections"] += 1 self.stats["active_connections"] = len(self.connections) self.logger.debug(f"Added connection {connection_id} from {remote_address}") return True except Exception as e: self.logger.error(f"Error adding connection: {e}") return False async def remove_connection(self, connection_id: str) -> bool: """연결 제거""" try: if connection_id not in self.connections: return False connection_info = self.connections[connection_id] remote_address = connection_info.remote_address # 연결 제거 del self.connections[connection_id] self.connection_by_address[remote_address].discard(connection_id) # 빈 주소 정리 if not self.connection_by_address[remote_address]: del self.connection_by_address[remote_address] # 속도 제한 데이터 정리 if connection_id in self.rate_limit_data: del self.rate_limit_data[connection_id] # 통계 업데이트 self.stats["active_connections"] = len(self.connections) self.logger.debug(f"Removed connection {connection_id}") return True except Exception as e: self.logger.error(f"Error removing connection: {e}") return False async def check_rate_limit(self, connection_id: str) -> bool: """속도 제한 확인""" try: if connection_id not in self.connections: return False current_time = time.time() # 요청 시간 추가 self.rate_limit_data[connection_id].append(current_time) # 시간 창 밖의 요청 제거 cutoff_time = current_time - self.rate_limit_window while (self.rate_limit_data[connection_id] and self.rate_limit_data[connection_id][0] < cutoff_time): self.rate_limit_data[connection_id].popleft() # 속도 제한 확인 request_count = len(self.rate_limit_data[connection_id]) if request_count > self.rate_limit_requests: self.stats["rate_limited_requests"] += 1 if self.on_rate_limit_exceeded: await self._safe_callback(self.on_rate_limit_exceeded, connection_id, request_count) return False # 연결 활동 업데이트 self.connections[connection_id].last_activity = current_time self.connections[connection_id].request_count += 1 return True except Exception as e: self.logger.error(f"Error checking rate limit: {e}") return False def get_connection_info(self, connection_id: str) -> Optional[Dict[str, Any]]: """연결 정보 조회""" if connection_id not in self.connections: return None info = self.connections[connection_id] return { "connection_id": info.connection_id, "remote_address": info.remote_address, "connect_time": info.connect_time, "last_activity": info.last_activity, "request_count": info.request_count, "data_sent": info.data_sent, "data_received": info.data_received, "is_active": info.is_active, "uptime": time.time() - info.connect_time } def get_connections(self) -> List[Dict[str, Any]]: """모든 연결 정보 조회""" result = [] for connection_id in self.connections: info = self.get_connection_info(connection_id) if info: result.append(info) return result def get_connections_by_address(self, remote_address: str) -> List[str]: """주소별 연결 목록 조회""" return list(self.connection_by_address.get(remote_address, set())) def get_stats(self) -> Dict[str, Any]: """관리자 통계 조회""" return { **self.stats, "active_connections": len(self.connections), "unique_addresses": len(self.connection_by_address), "is_running": self.is_running, "limits": { "max_connections": self.max_connections, "rate_limit_requests": self.rate_limit_requests, "rate_limit_window": self.rate_limit_window } } async def update_data_stats(self, connection_id: str, data_sent: int = 0, data_received: int = 0): """데이터 통계 업데이트""" if connection_id in self.connections: self.connections[connection_id].data_sent += data_sent self.connections[connection_id].data_received += data_received self.connections[connection_id].last_activity = time.time() async def set_connection_inactive(self, connection_id: str): """연결을 비활성 상태로 설정""" if connection_id in self.connections: self.connections[connection_id].is_active = False async def cleanup_inactive_connections(self, max_idle_time: int = 1800) -> int: """비활성 연결 정리""" try: current_time = time.time() cutoff_time = current_time - max_idle_time inactive_connections = [] for connection_id, info in self.connections.items(): if (not info.is_active or info.last_activity < cutoff_time): inactive_connections.append(connection_id) # 비활성 연결 제거 for connection_id in inactive_connections: await self.remove_connection(connection_id) if self.on_connection_cleanup: await self._safe_callback(self.on_connection_cleanup, connection_id) cleaned_count = len(inactive_connections) self.stats["connections_cleaned"] += cleaned_count if cleaned_count > 0: self.logger.info(f"Cleaned up {cleaned_count} inactive connections") return cleaned_count except Exception as e: self.logger.error(f"Error cleaning up connections: {e}") return 0 async def get_connection_metrics(self) -> Dict[str, Any]: """연결 메트릭 조회""" current_time = time.time() # 연결 지속 시간 분석 connection_durations = [] activity_times = [] for info in self.connections.values(): duration = current_time - info.connect_time connection_durations.append(duration) activity_times.append(current_time - info.last_activity) if not connection_durations: return { "average_connection_duration": 0, "average_idle_time": 0, "longest_connection": 0, "shortest_connection": 0 } return { "average_connection_duration": sum(connection_durations) / len(connection_durations), "average_idle_time": sum(activity_times) / len(activity_times), "longest_connection": max(connection_durations), "shortest_connection": min(connection_durations), "total_requests": sum(info.request_count for info in self.connections.values()), "total_data_sent": sum(info.data_sent for info in self.connections.values()), "total_data_received": sum(info.data_received for info in self.connections.values()) } async def _cleanup_loop(self): """정리 루프""" while self.is_running: try: await self.cleanup_inactive_connections() self.stats["cleanup_runs"] += 1 await asyncio.sleep(self.cleanup_interval) except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Cleanup loop error: {e}") await asyncio.sleep(self.cleanup_interval) async def _safe_callback(self, callback: Callable, *args): """안전한 콜백 호출""" try: if asyncio.iscoroutinefunction(callback): await callback(*args) else: callback(*args) except Exception as e: self.logger.error(f"Callback error: {e}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server