connection_manager.py•13.3 kB
"""연결 관리자 구현"""
import asyncio
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Set, Any, Optional, Callable
from collections import defaultdict, deque
from dataclasses import dataclass
@dataclass
class ConnectionInfo:
"""연결 정보"""
connection_id: str
remote_address: str
connect_time: float
last_activity: float
request_count: int = 0
data_sent: int = 0
data_received: int = 0
is_active: bool = True
class ConnectionManager:
"""연결 관리자 클래스"""
def __init__(self, max_connections: int = 1000, rate_limit_requests: int = 100,
rate_limit_window: int = 60, cleanup_interval: int = 300):
"""
Args:
max_connections: 최대 연결 수
rate_limit_requests: 속도 제한 요청 수
rate_limit_window: 속도 제한 시간 창 (초)
cleanup_interval: 정리 간격 (초)
"""
self.max_connections = max_connections
self.rate_limit_requests = rate_limit_requests
self.rate_limit_window = rate_limit_window
self.cleanup_interval = cleanup_interval
# 연결 관리
self.connections: Dict[str, ConnectionInfo] = {}
self.connection_by_address: Dict[str, Set[str]] = defaultdict(set)
# 속도 제한 관리
self.rate_limit_data: Dict[str, deque] = defaultdict(lambda: deque(maxlen=rate_limit_requests))
# 통계
self.stats = {
"total_connections": 0,
"active_connections": 0,
"rejected_connections": 0,
"rate_limited_requests": 0,
"cleanup_runs": 0,
"connections_cleaned": 0
}
# 백그라운드 태스크
self._cleanup_task: Optional[asyncio.Task] = None
self.is_running = False
# 콜백
self.on_connection_limit_exceeded: Optional[Callable] = None
self.on_rate_limit_exceeded: Optional[Callable] = None
self.on_connection_cleanup: Optional[Callable] = None
# 로깅
self.logger = logging.getLogger(__name__)
async def start(self) -> bool:
"""관리자 시작"""
try:
self.is_running = True
# 정리 태스크 시작
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
self.logger.info("Connection manager started")
return True
except Exception as e:
self.logger.error(f"Failed to start connection manager: {e}")
return False
async def stop(self) -> bool:
"""관리자 중지"""
try:
self.is_running = False
# 정리 태스크 중지
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
self.logger.info("Connection manager stopped")
return True
except Exception as e:
self.logger.error(f"Error stopping connection manager: {e}")
return False
async def add_connection(self, connection_id: str, remote_address: str) -> bool:
"""연결 추가"""
try:
# 연결 수 제한 확인
if len(self.connections) >= self.max_connections:
self.stats["rejected_connections"] += 1
if self.on_connection_limit_exceeded:
await self._safe_callback(self.on_connection_limit_exceeded, connection_id, remote_address)
return False
# 연결 정보 생성
current_time = time.time()
connection_info = ConnectionInfo(
connection_id=connection_id,
remote_address=remote_address,
connect_time=current_time,
last_activity=current_time
)
# 연결 등록
self.connections[connection_id] = connection_info
self.connection_by_address[remote_address].add(connection_id)
# 통계 업데이트
self.stats["total_connections"] += 1
self.stats["active_connections"] = len(self.connections)
self.logger.debug(f"Added connection {connection_id} from {remote_address}")
return True
except Exception as e:
self.logger.error(f"Error adding connection: {e}")
return False
async def remove_connection(self, connection_id: str) -> bool:
"""연결 제거"""
try:
if connection_id not in self.connections:
return False
connection_info = self.connections[connection_id]
remote_address = connection_info.remote_address
# 연결 제거
del self.connections[connection_id]
self.connection_by_address[remote_address].discard(connection_id)
# 빈 주소 정리
if not self.connection_by_address[remote_address]:
del self.connection_by_address[remote_address]
# 속도 제한 데이터 정리
if connection_id in self.rate_limit_data:
del self.rate_limit_data[connection_id]
# 통계 업데이트
self.stats["active_connections"] = len(self.connections)
self.logger.debug(f"Removed connection {connection_id}")
return True
except Exception as e:
self.logger.error(f"Error removing connection: {e}")
return False
async def check_rate_limit(self, connection_id: str) -> bool:
"""속도 제한 확인"""
try:
if connection_id not in self.connections:
return False
current_time = time.time()
# 요청 시간 추가
self.rate_limit_data[connection_id].append(current_time)
# 시간 창 밖의 요청 제거
cutoff_time = current_time - self.rate_limit_window
while (self.rate_limit_data[connection_id] and
self.rate_limit_data[connection_id][0] < cutoff_time):
self.rate_limit_data[connection_id].popleft()
# 속도 제한 확인
request_count = len(self.rate_limit_data[connection_id])
if request_count > self.rate_limit_requests:
self.stats["rate_limited_requests"] += 1
if self.on_rate_limit_exceeded:
await self._safe_callback(self.on_rate_limit_exceeded, connection_id, request_count)
return False
# 연결 활동 업데이트
self.connections[connection_id].last_activity = current_time
self.connections[connection_id].request_count += 1
return True
except Exception as e:
self.logger.error(f"Error checking rate limit: {e}")
return False
def get_connection_info(self, connection_id: str) -> Optional[Dict[str, Any]]:
"""연결 정보 조회"""
if connection_id not in self.connections:
return None
info = self.connections[connection_id]
return {
"connection_id": info.connection_id,
"remote_address": info.remote_address,
"connect_time": info.connect_time,
"last_activity": info.last_activity,
"request_count": info.request_count,
"data_sent": info.data_sent,
"data_received": info.data_received,
"is_active": info.is_active,
"uptime": time.time() - info.connect_time
}
def get_connections(self) -> List[Dict[str, Any]]:
"""모든 연결 정보 조회"""
result = []
for connection_id in self.connections:
info = self.get_connection_info(connection_id)
if info:
result.append(info)
return result
def get_connections_by_address(self, remote_address: str) -> List[str]:
"""주소별 연결 목록 조회"""
return list(self.connection_by_address.get(remote_address, set()))
def get_stats(self) -> Dict[str, Any]:
"""관리자 통계 조회"""
return {
**self.stats,
"active_connections": len(self.connections),
"unique_addresses": len(self.connection_by_address),
"is_running": self.is_running,
"limits": {
"max_connections": self.max_connections,
"rate_limit_requests": self.rate_limit_requests,
"rate_limit_window": self.rate_limit_window
}
}
async def update_data_stats(self, connection_id: str, data_sent: int = 0, data_received: int = 0):
"""데이터 통계 업데이트"""
if connection_id in self.connections:
self.connections[connection_id].data_sent += data_sent
self.connections[connection_id].data_received += data_received
self.connections[connection_id].last_activity = time.time()
async def set_connection_inactive(self, connection_id: str):
"""연결을 비활성 상태로 설정"""
if connection_id in self.connections:
self.connections[connection_id].is_active = False
async def cleanup_inactive_connections(self, max_idle_time: int = 1800) -> int:
"""비활성 연결 정리"""
try:
current_time = time.time()
cutoff_time = current_time - max_idle_time
inactive_connections = []
for connection_id, info in self.connections.items():
if (not info.is_active or
info.last_activity < cutoff_time):
inactive_connections.append(connection_id)
# 비활성 연결 제거
for connection_id in inactive_connections:
await self.remove_connection(connection_id)
if self.on_connection_cleanup:
await self._safe_callback(self.on_connection_cleanup, connection_id)
cleaned_count = len(inactive_connections)
self.stats["connections_cleaned"] += cleaned_count
if cleaned_count > 0:
self.logger.info(f"Cleaned up {cleaned_count} inactive connections")
return cleaned_count
except Exception as e:
self.logger.error(f"Error cleaning up connections: {e}")
return 0
async def get_connection_metrics(self) -> Dict[str, Any]:
"""연결 메트릭 조회"""
current_time = time.time()
# 연결 지속 시간 분석
connection_durations = []
activity_times = []
for info in self.connections.values():
duration = current_time - info.connect_time
connection_durations.append(duration)
activity_times.append(current_time - info.last_activity)
if not connection_durations:
return {
"average_connection_duration": 0,
"average_idle_time": 0,
"longest_connection": 0,
"shortest_connection": 0
}
return {
"average_connection_duration": sum(connection_durations) / len(connection_durations),
"average_idle_time": sum(activity_times) / len(activity_times),
"longest_connection": max(connection_durations),
"shortest_connection": min(connection_durations),
"total_requests": sum(info.request_count for info in self.connections.values()),
"total_data_sent": sum(info.data_sent for info in self.connections.values()),
"total_data_received": sum(info.data_received for info in self.connections.values())
}
async def _cleanup_loop(self):
"""정리 루프"""
while self.is_running:
try:
await self.cleanup_inactive_connections()
self.stats["cleanup_runs"] += 1
await asyncio.sleep(self.cleanup_interval)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Cleanup loop error: {e}")
await asyncio.sleep(self.cleanup_interval)
async def _safe_callback(self, callback: Callable, *args):
"""안전한 콜백 호출"""
try:
if asyncio.iscoroutinefunction(callback):
await callback(*args)
else:
callback(*args)
except Exception as e:
self.logger.error(f"Callback error: {e}")