heartbeat.py•5.93 kB
"""
Heartbeat Monitor
Periodically checks health of all registered resources and records heartbeats
to ClickHouse for monitoring and alerting.
"""
import asyncio
import logging
import os
import sys
from datetime import datetime
from typing import List, Dict, Any
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from agents.registry import AgentRegistry
from core.clickhouse_client import ClickhouseClient
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class HeartbeatMonitor:
def __init__(self, interval: int = 60):
"""
Args:
interval: Seconds between heartbeat checks
"""
self.interval = interval
self.ch = ClickhouseClient(
host=os.getenv("CLICKHOUSE_HOST", "localhost"),
port=int(os.getenv("CLICKHOUSE_PORT", 9000)),
database=os.getenv("CLICKHOUSE_DB", "mcp")
)
async def check_resource_health(self, provider: str, resource_id: str) -> Dict[str, Any]:
"""Check health of a single resource"""
try:
agent = AgentRegistry.get_persistent_agent(provider)
if not agent:
logger.warning(f"No agent for provider {provider}")
return {
"resource_id": resource_id,
"provider": provider,
"status": "unknown",
"error": "no_agent",
"checked_at": datetime.utcnow().isoformat()
}
# Check status
status = await agent.check_status(resource_id)
heartbeat = {
"resource_id": resource_id,
"provider": provider,
"status": status,
"checked_at": datetime.utcnow().isoformat()
}
# Try to get additional metrics if available
if hasattr(agent, "get_metrics"):
try:
metrics = await agent.get_metrics(resource_id)
heartbeat["metrics"] = metrics
except Exception as e:
logger.debug(f"Failed to get metrics for {resource_id}: {e}")
return heartbeat
except Exception as e:
logger.error(f"Health check failed for {provider}:{resource_id} - {e}")
return {
"resource_id": resource_id,
"provider": provider,
"status": "error",
"error": str(e),
"checked_at": datetime.utcnow().isoformat()
}
async def check_all_resources(self):
"""Check health of all registered resources"""
try:
# Get all resources from ClickHouse
resources = await self.ch.list_resources_by_provider("")
logger.info(f"Checking {len(resources)} registered resources")
# Check each resource
tasks = []
for res in resources:
resource_id = res.get("resource_id")
provider = res.get("provider")
if resource_id and provider:
tasks.append(self.check_resource_health(provider, resource_id))
# Run checks concurrently
heartbeats = await asyncio.gather(*tasks, return_exceptions=True)
# Record heartbeats to ClickHouse
success_count = 0
for hb in heartbeats:
if isinstance(hb, Exception):
logger.error(f"Heartbeat check failed: {hb}")
continue
try:
await self.ch.record_heartbeat(hb["resource_id"], hb)
success_count += 1
# Log unhealthy resources
if hb["status"] not in ["running", "active", "online", "up"]:
logger.warning(f"Unhealthy resource: {hb['provider']}:{hb['resource_id']} - status={hb['status']}")
except Exception as e:
logger.error(f"Failed to record heartbeat: {e}")
logger.info(f"Recorded {success_count}/{len(heartbeats)} heartbeats")
# Also emit a system health event
await self.ch.insert_event(
"heartbeat_cycle_complete",
{
"total_resources": len(resources),
"checks_completed": len(heartbeats),
"checks_successful": success_count,
"timestamp": datetime.utcnow().isoformat()
},
source="heartbeat_monitor"
)
except Exception as e:
logger.exception(f"Error in check_all_resources: {e}")
async def run(self):
"""Main monitoring loop"""
logger.info(f"Starting heartbeat monitor (interval={self.interval}s)")
# Initialize registry
AgentRegistry.init()
while True:
try:
await self.check_all_resources()
except Exception as e:
logger.exception(f"Heartbeat cycle failed: {e}")
# Wait for next interval
await asyncio.sleep(self.interval)
async def main():
interval = int(os.getenv("HEARTBEAT_INTERVAL", 60))
monitor = HeartbeatMonitor(interval=interval)
try:
await monitor.run()
except KeyboardInterrupt:
logger.info("Heartbeat monitor stopped by user")
except Exception as e:
logger.exception(f"Fatal error in heartbeat monitor: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())