"""Health monitoring service."""
import asyncio
import platform
import psutil
from datetime import datetime
from typing import Any, Dict
from mcp.server import FastMCP
from ..base_service import BaseService
from .settings import HealthSettings
class SelfHealth(BaseService):
"""Self health monitoring service."""
def __init__(self):
"""Initialize health service."""
super().__init__()
self.settings = HealthSettings()
self.start_time = datetime.now()
async def register(self, mcp_server: FastMCP) -> None:
"""Register health service with MCP server."""
self.log_registration("Health")
@mcp_server.tool()
async def get_system_health() -> Dict[str, Any]:
"""Get comprehensive system health information."""
return await self._get_system_health()
@mcp_server.tool()
async def get_uptime() -> Dict[str, Any]:
"""Get server uptime information."""
return await self._get_uptime()
@mcp_server.tool()
async def get_memory_usage() -> Dict[str, Any]:
"""Get memory usage statistics."""
return await self._get_memory_usage()
@mcp_server.tool()
async def get_cpu_usage() -> Dict[str, Any]:
"""Get CPU usage statistics."""
return await self._get_cpu_usage()
self.log_registration_complete("Health")
async def _get_system_health(self) -> Dict[str, Any]:
"""Get comprehensive system health."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"uptime": await self._get_uptime(),
"memory": await self._get_memory_usage(),
"cpu": await self._get_cpu_usage(),
"platform": {
"system": platform.system(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
}
}
async def _get_uptime(self) -> Dict[str, Any]:
"""Get uptime information."""
uptime = datetime.now() - self.start_time
return {
"start_time": self.start_time.isoformat(),
"uptime_seconds": uptime.total_seconds(),
"uptime_human": str(uptime),
}
async def _get_memory_usage(self) -> Dict[str, Any]:
"""Get memory usage statistics."""
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
"virtual": {
"total": memory.total,
"available": memory.available,
"percent": memory.percent,
"used": memory.used,
"free": memory.free,
},
"swap": {
"total": swap.total,
"used": swap.used,
"free": swap.free,
"percent": swap.percent,
}
}
async def _get_cpu_usage(self) -> Dict[str, Any]:
"""Get CPU usage statistics."""
# Get CPU usage over a short interval
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
cpu_freq = psutil.cpu_freq()
load_avg = psutil.getloadavg() if hasattr(psutil, 'getloadavg') else (0, 0, 0)
return {
"percent": cpu_percent,
"count": cpu_count,
"count_logical": psutil.cpu_count(logical=True),
"frequency": {
"current": cpu_freq.current if cpu_freq else None,
"min": cpu_freq.min if cpu_freq else None,
"max": cpu_freq.max if cpu_freq else None,
},
"load_average": {
"1min": load_avg[0],
"5min": load_avg[1],
"15min": load_avg[2],
}
}