Skip to main content
Glama

Multi-Cloud Infrastructure MCP Server

by nomadslayer
heartbeat.py5.93 kB
""" Heartbeat Monitor Periodically checks health of all registered resources and records heartbeats to ClickHouse for monitoring and alerting. """ import asyncio import logging import os import sys from datetime import datetime from typing import List, Dict, Any # Add src to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from agents.registry import AgentRegistry from core.clickhouse_client import ClickhouseClient logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class HeartbeatMonitor: def __init__(self, interval: int = 60): """ Args: interval: Seconds between heartbeat checks """ self.interval = interval self.ch = ClickhouseClient( host=os.getenv("CLICKHOUSE_HOST", "localhost"), port=int(os.getenv("CLICKHOUSE_PORT", 9000)), database=os.getenv("CLICKHOUSE_DB", "mcp") ) async def check_resource_health(self, provider: str, resource_id: str) -> Dict[str, Any]: """Check health of a single resource""" try: agent = AgentRegistry.get_persistent_agent(provider) if not agent: logger.warning(f"No agent for provider {provider}") return { "resource_id": resource_id, "provider": provider, "status": "unknown", "error": "no_agent", "checked_at": datetime.utcnow().isoformat() } # Check status status = await agent.check_status(resource_id) heartbeat = { "resource_id": resource_id, "provider": provider, "status": status, "checked_at": datetime.utcnow().isoformat() } # Try to get additional metrics if available if hasattr(agent, "get_metrics"): try: metrics = await agent.get_metrics(resource_id) heartbeat["metrics"] = metrics except Exception as e: logger.debug(f"Failed to get metrics for {resource_id}: {e}") return heartbeat except Exception as e: logger.error(f"Health check failed for {provider}:{resource_id} - {e}") return { "resource_id": resource_id, "provider": provider, "status": "error", "error": str(e), "checked_at": datetime.utcnow().isoformat() } async def check_all_resources(self): """Check health of all registered resources""" try: # Get all resources from ClickHouse resources = await self.ch.list_resources_by_provider("") logger.info(f"Checking {len(resources)} registered resources") # Check each resource tasks = [] for res in resources: resource_id = res.get("resource_id") provider = res.get("provider") if resource_id and provider: tasks.append(self.check_resource_health(provider, resource_id)) # Run checks concurrently heartbeats = await asyncio.gather(*tasks, return_exceptions=True) # Record heartbeats to ClickHouse success_count = 0 for hb in heartbeats: if isinstance(hb, Exception): logger.error(f"Heartbeat check failed: {hb}") continue try: await self.ch.record_heartbeat(hb["resource_id"], hb) success_count += 1 # Log unhealthy resources if hb["status"] not in ["running", "active", "online", "up"]: logger.warning(f"Unhealthy resource: {hb['provider']}:{hb['resource_id']} - status={hb['status']}") except Exception as e: logger.error(f"Failed to record heartbeat: {e}") logger.info(f"Recorded {success_count}/{len(heartbeats)} heartbeats") # Also emit a system health event await self.ch.insert_event( "heartbeat_cycle_complete", { "total_resources": len(resources), "checks_completed": len(heartbeats), "checks_successful": success_count, "timestamp": datetime.utcnow().isoformat() }, source="heartbeat_monitor" ) except Exception as e: logger.exception(f"Error in check_all_resources: {e}") async def run(self): """Main monitoring loop""" logger.info(f"Starting heartbeat monitor (interval={self.interval}s)") # Initialize registry AgentRegistry.init() while True: try: await self.check_all_resources() except Exception as e: logger.exception(f"Heartbeat cycle failed: {e}") # Wait for next interval await asyncio.sleep(self.interval) async def main(): interval = int(os.getenv("HEARTBEAT_INTERVAL", 60)) monitor = HeartbeatMonitor(interval=interval) try: await monitor.run() except KeyboardInterrupt: logger.info("Heartbeat monitor stopped by user") except Exception as e: logger.exception(f"Fatal error in heartbeat monitor: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nomadslayer/infra-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server