Skip to main content
Glama

Discord MCP Server

health.py6.63 kB
""" 헬스체크 및 메트릭 """ import time from typing import Dict, Any, Optional from dataclasses import dataclass, field from loguru import logger from .cache import cache_manager from .schema import ErrorCode, MCPError @dataclass class HealthStatus: """헬스 상태""" status: str timestamp: float uptime: float services: Dict[str, Dict[str, Any]] = field(default_factory=dict) metrics: Dict[str, Any] = field(default_factory=dict) @dataclass class ServiceMetrics: """서비스 메트릭""" total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 rate_limited_requests: int = 0 average_latency: float = 0.0 last_request_time: Optional[float] = None class HealthChecker: """헬스체크 관리자""" def __init__(self): self.start_time = time.time() self.metrics = ServiceMetrics() self._discord_connected = False self._redis_connected = False def update_discord_status(self, connected: bool) -> None: """Discord 연결 상태 업데이트""" self._discord_connected = connected def update_redis_status(self, connected: bool) -> None: """Redis 연결 상태 업데이트""" self._redis_connected = connected def record_request( self, success: bool, latency: float, rate_limited: bool = False ) -> None: """요청 메트릭 기록""" self.metrics.total_requests += 1 self.metrics.last_request_time = time.time() if success: self.metrics.successful_requests += 1 else: self.metrics.failed_requests += 1 if rate_limited: self.metrics.rate_limited_requests += 1 # 평균 지연시간 업데이트 (이동 평균) if self.metrics.average_latency == 0: self.metrics.average_latency = latency else: alpha = 0.1 # 가중치 self.metrics.average_latency = ( alpha * latency + (1 - alpha) * self.metrics.average_latency ) async def check_discord_health(self) -> Dict[str, Any]: """Discord API 헬스체크""" try: # 실제로는 Discord API ping을 보내야 하지만, # 여기서는 연결 상태만 확인 return { "status": "healthy" if self._discord_connected else "unhealthy", "connected": self._discord_connected, "last_check": time.time() } except Exception as e: logger.error(f"Discord health check failed: {e}") return { "status": "unhealthy", "connected": False, "error": str(e), "last_check": time.time() } async def check_redis_health(self) -> Dict[str, Any]: """Redis 헬스체크""" try: if cache_manager._connected and cache_manager._redis: await cache_manager._redis.ping() return { "status": "healthy", "connected": True, "last_check": time.time() } else: return { "status": "unhealthy", "connected": False, "error": "Not connected to Redis", "last_check": time.time() } except Exception as e: logger.error(f"Redis health check failed: {e}") return { "status": "unhealthy", "connected": False, "error": str(e), "last_check": time.time() } async def get_health_status(self) -> HealthStatus: """전체 헬스 상태 반환""" current_time = time.time() uptime = current_time - self.start_time # 서비스별 헬스체크 discord_health = await self.check_discord_health() redis_health = await self.check_redis_health() services = { "discord": discord_health, "redis": redis_health, } # 전체 상태 결정 all_healthy = all( service["status"] == "healthy" for service in services.values() ) status = "healthy" if all_healthy else "unhealthy" # 메트릭 정보 metrics = { "total_requests": self.metrics.total_requests, "successful_requests": self.metrics.successful_requests, "failed_requests": self.metrics.failed_requests, "rate_limited_requests": self.metrics.rate_limited_requests, "success_rate": ( self.metrics.successful_requests / max(self.metrics.total_requests, 1) ), "average_latency_ms": self.metrics.average_latency * 1000, "uptime_seconds": uptime, } return HealthStatus( status=status, timestamp=current_time, uptime=uptime, services=services, metrics=metrics ) def get_metrics(self) -> Dict[str, Any]: """Prometheus 형식 메트릭 반환""" current_time = time.time() uptime = current_time - self.start_time return { "discord_mcp_requests_total": self.metrics.total_requests, "discord_mcp_requests_successful": self.metrics.successful_requests, "discord_mcp_requests_failed": self.metrics.failed_requests, "discord_mcp_requests_rate_limited": self.metrics.rate_limited_requests, "discord_mcp_request_duration_seconds": self.metrics.average_latency, "discord_mcp_uptime_seconds": uptime, "discord_mcp_discord_connected": 1 if self._discord_connected else 0, "discord_mcp_redis_connected": 1 if self._redis_connected else 0, } # 전역 헬스체커 인스턴스 health_checker = HealthChecker() async def get_health() -> Dict[str, Any]: """헬스체크 엔드포인트용""" health_status = await health_checker.get_health_status() return { "status": health_status.status, "timestamp": health_status.timestamp, "uptime": health_status.uptime, "services": health_status.services, "metrics": health_status.metrics } async def get_metrics() -> Dict[str, Any]: """메트릭 엔드포인트용""" return health_checker.get_metrics()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tristan-kkim/discord-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server