Skip to main content
Glama
05-health-monitoring.md16.1 kB
# Implementation Step 5: Health Monitoring ## Overview This step implements health monitoring using the MCP ping/pong protocol. The hub sends periodic pings to connected instances and tracks their responsiveness, marking unresponsive instances for retry or removal. ## Goals 1. Implement MCP ping/pong protocol 2. Send pings every 5 seconds to active connections 3. Mark instances as retrying after 3 missed pings (20 seconds) 4. Retry connections with exponential backoff 5. Clean up dead connections after max retries ## Technical Design ### Health Check Flow ``` Hub Instance │ │ ├──ping (every 5s)──────>│ │<──────pong─────────────┤ │ │ ├──ping──────────────────>│ │ (no response) │ │ (wait 5s) │ ├──ping──────────────────>│ │ (no response) │ │ (wait 5s) │ ├──ping──────────────────>│ │ (no response) │ │ │ └─> Mark as RETRYING │ Start reconnection │ ``` ### Timing Configuration ```go const ( // Ping interval - how often to send pings PingInterval = 5 * time.Second // Ping timeout - how long to wait for pong PingTimeout = 2 * time.Second // Max missed pings before marking unhealthy MaxMissedPings = 3 // Retry backoff intervals RetryBackoff = []time.Duration{ 200 * time.Millisecond, 400 * time.Millisecond, 800 * time.Millisecond, } ) ``` ## Implementation ### 1. Health Monitor Component ```go // internal/mcp/health_monitor.go package mcp import ( "context" "log" "sync" "time" ) type HealthMonitor struct { connMgr *ConnectionManager // Ping tracking pingTrackers map[string]*PingTracker // instanceID -> tracker mu sync.RWMutex // Control stopCh chan struct{} wg sync.WaitGroup } type PingTracker struct { instanceID string lastPing time.Time lastPong time.Time missedPings int pingInFlight bool } func NewHealthMonitor(connMgr *ConnectionManager) *HealthMonitor { return &HealthMonitor{ connMgr: connMgr, pingTrackers: make(map[string]*PingTracker), stopCh: make(chan struct{}), } } func (hm *HealthMonitor) Start() { hm.wg.Add(1) go hm.monitorLoop() } func (hm *HealthMonitor) Stop() { close(hm.stopCh) hm.wg.Wait() } func (hm *HealthMonitor) monitorLoop() { defer hm.wg.Done() ticker := time.NewTicker(PingInterval) defer ticker.Stop() for { select { case <-ticker.C: hm.sendPings() case <-hm.stopCh: return } } } func (hm *HealthMonitor) sendPings() { // Get active connections instances := hm.connMgr.ListInstances() for _, info := range instances { if info.State != StateActive { continue } // Get or create tracker hm.mu.Lock() tracker, exists := hm.pingTrackers[info.InstanceID] if !exists { tracker = &PingTracker{ instanceID: info.InstanceID, lastPong: time.Now(), // Assume healthy initially } hm.pingTrackers[info.InstanceID] = tracker } hm.mu.Unlock() // Check if previous ping is still in flight if tracker.pingInFlight { tracker.missedPings++ log.Printf("Instance %s: ping timeout (missed: %d)", info.InstanceID, tracker.missedPings) if tracker.missedPings >= MaxMissedPings { log.Printf("Instance %s: marking as unhealthy after %d missed pings", info.InstanceID, tracker.missedPings) hm.connMgr.UpdateState(info.InstanceID, StateRetrying) tracker.missedPings = 0 tracker.pingInFlight = false } continue } // Send ping asynchronously hm.wg.Add(1) go hm.sendPing(info, tracker) } // Clean up trackers for dead instances hm.cleanupTrackers(instances) } func (hm *HealthMonitor) sendPing(info *ConnectionInfo, tracker *PingTracker) { defer hm.wg.Done() // Get client for this session // Note: We need to find a session for this instance client := hm.getClientForInstance(info.InstanceID) if client == nil { return } // Mark ping in flight hm.mu.Lock() tracker.pingInFlight = true tracker.lastPing = time.Now() hm.mu.Unlock() // Send ping with timeout ctx, cancel := context.WithTimeout(context.Background(), PingTimeout) defer cancel() err := client.Ping(ctx) hm.mu.Lock() tracker.pingInFlight = false if err == nil { // Successful pong tracker.lastPong = time.Now() tracker.missedPings = 0 // Update activity in connection manager hm.connMgr.UpdateActivity(info.InstanceID) } else { // Failed ping tracker.missedPings++ log.Printf("Instance %s: ping failed: %v (missed: %d)", info.InstanceID, err, tracker.missedPings) } hm.mu.Unlock() } func (hm *HealthMonitor) getClientForInstance(instanceID string) *HubClient { // Find any session connected to this instance instances := hm.connMgr.ListInstances() for _, info := range instances { if info.InstanceID == instanceID && info.Client != nil { return info.Client } } return nil } func (hm *HealthMonitor) cleanupTrackers(activeInstances []*ConnectionInfo) { hm.mu.Lock() defer hm.mu.Unlock() // Build set of active instance IDs activeIDs := make(map[string]bool) for _, info := range activeInstances { activeIDs[info.InstanceID] = true } // Remove trackers for non-existent instances for id := range hm.pingTrackers { if !activeIDs[id] { delete(hm.pingTrackers, id) } } } // GetHealthStatus returns current health information func (hm *HealthMonitor) GetHealthStatus(instanceID string) (lastPong time.Time, missedPings int, ok bool) { hm.mu.RLock() defer hm.mu.RUnlock() tracker, exists := hm.pingTrackers[instanceID] if !exists { return time.Time{}, 0, false } return tracker.lastPong, tracker.missedPings, true } ``` ### 2. Update Connection Manager ```go // internal/mcp/connection_manager.go // Add health monitor to ConnectionManager type ConnectionManager struct { // ... existing fields ... healthMon *HealthMonitor } func NewConnectionManager() *ConnectionManager { cm := &ConnectionManager{ // ... existing initialization ... } // Create health monitor cm.healthMon = NewHealthMonitor(cm) go cm.run() go cm.retryLoop() // New retry loop // Start health monitoring cm.healthMon.Start() return cm } // Add retry loop for disconnected instances func (cm *ConnectionManager) retryLoop() { // Stagger retries to avoid thundering herd ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() retrySchedule := make(map[string]time.Time) // instanceID -> next retry time for { select { case <-ticker.C: now := time.Now() instances := cm.ListInstances() for _, info := range instances { if info.State != StateRetrying { continue } // Check if it's time to retry nextRetry, exists := retrySchedule[info.InstanceID] if exists && now.Before(nextRetry) { continue } // Calculate next retry time backoffIndex := info.RetryCount if backoffIndex >= len(RetryBackoff) { // Max retries reached cm.updateState(info.InstanceID, StateDead) delete(retrySchedule, info.InstanceID) continue } // Schedule retry retrySchedule[info.InstanceID] = now.Add(RetryBackoff[backoffIndex]) // Attempt reconnection go cm.attemptReconnection(info.InstanceID) } case <-cm.stopCh: return } } } func (cm *ConnectionManager) attemptReconnection(instanceID string) { log.Printf("Attempting to reconnect to instance %s", instanceID) // Get instance info instances := cm.ListInstances() var info *ConnectionInfo for _, i := range instances { if i.InstanceID == instanceID { info = i break } } if info == nil || info.State != StateRetrying { return } // Increment retry count cm.incrementRetryCount(instanceID) // Try to connect client, err := NewHubClient(info.Port) if err != nil { log.Printf("Failed to create client for reconnection: %v", err) return } // Test connection ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := client.Initialize(ctx); err != nil { log.Printf("Reconnection failed for %s: %v", instanceID, err) return } // Success! Update state cm.setClient(instanceID, client) cm.updateState(instanceID, StateActive) // Reset retry count if i, exists := cm.connections[instanceID]; exists { i.RetryCount = 0 } log.Printf("Successfully reconnected to instance %s", instanceID) } // Update Stop to clean up health monitor func (cm *ConnectionManager) Stop() { cm.healthMon.Stop() close(cm.stopCh) <-cm.doneCh } ``` ### 3. MCP Ping Implementation ```go // internal/mcp/ping.go package mcp import ( "context" "encoding/json" "time" ) // Ensure hub client supports ping func (c *HubClient) Ping(ctx context.Context) error { request := map[string]interface{}{ "jsonrpc": "2.0", "id": time.Now().UnixNano(), "method": "ping", } result, err := c.sendRequest(ctx, request) if err != nil { return err } // Verify we got a pong response var response string if err := json.Unmarshal(result, &response); err != nil { return err } if response != "pong" { return fmt.Errorf("unexpected ping response: %s", response) } return nil } // Instance server must handle ping func (s *Server) handlePing(ctx context.Context) (interface{}, error) { return "pong", nil } ``` ### 4. Health Status Tool ```go // Add to hub tools { Name: "instances/health", Description: "Get health status of all instances", InputSchema: mcp.ToolInputSchema{ Type: "object", Properties: map[string]interface{}{}, }, } // Handler func (h *HubServer) handleInstancesHealth(ctx context.Context) (*mcp.CallToolResult, error) { instances := h.connMgr.ListInstances() var output []map[string]interface{} for _, info := range instances { lastPong, missedPings, ok := h.healthMon.GetHealthStatus(info.InstanceID) healthInfo := map[string]interface{}{ "id": info.InstanceID, "name": info.Name, "state": info.State.String(), "last_activity": info.LastActivity, } if ok { healthInfo["last_pong"] = lastPong healthInfo["missed_pings"] = missedPings healthInfo["healthy"] = missedPings < MaxMissedPings } output = append(output, healthInfo) } data, _ := json.MarshalIndent(output, "", " ") return &mcp.CallToolResult{ Content: []mcp.Content{{ Type: "text", Text: string(data), }}, }, nil } ``` ## Testing Plan ### 1. Unit Tests ```go // internal/mcp/health_monitor_test.go func TestHealthMonitor(t *testing.T) { // Create mock connection manager connMgr := NewMockConnectionManager() monitor := NewHealthMonitor(connMgr) // Add test instance instance := &ConnectionInfo{ InstanceID: "test-123", State: StateActive, } connMgr.AddInstance(instance) // Start monitoring monitor.Start() defer monitor.Stop() // Wait for first ping time.Sleep(6 * time.Second) // Check health status lastPong, missedPings, ok := monitor.GetHealthStatus("test-123") assert.True(t, ok) assert.Equal(t, 0, missedPings) assert.WithinDuration(t, time.Now(), lastPong, 10*time.Second) } func TestMissedPings(t *testing.T) { // Test that 3 missed pings triggers state change // Mock client that doesn't respond to pings // Verify state changes to retrying } func TestReconnection(t *testing.T) { // Test reconnection with exponential backoff // Verify retry intervals are correct // Test max retries leads to dead state } ``` ### 2. Integration Test ```bash #!/bin/bash # test_health_monitoring.sh # Start instance cd test-project brum --no-tui & INSTANCE_PID=$! sleep 2 # Start hub with debug logging BRUMMER_MCP_DEBUG=true brum --mcp & HUB_PID=$! sleep 2 # Monitor health for 30 seconds for i in {1..6}; do echo "Health check $i:" echo '{"jsonrpc":"2.0","id":'$i',"method":"tools/call","params":{"name":"instances/health"}}' | \ nc -q 1 localhost 7777 | jq '.result.content[0].text' | jq '.' sleep 5 done # Kill instance to test failure detection kill $INSTANCE_PID # Check health after instance death sleep 25 echo "Health after instance death:" echo '{"jsonrpc":"2.0","id":7,"method":"tools/call","params":{"name":"instances/health"}}' | \ nc -q 1 localhost 7777 | jq '.result.content[0].text' | jq '.' # Cleanup kill $HUB_PID ``` ### 3. Stress Test ```go func TestHealthMonitorStress(t *testing.T) { // Add 100 instances // Verify all get pinged within reasonable time // Kill random instances and verify detection // Check no goroutine leaks } ``` ## Success Criteria 1. ✅ Ping sent every 5 seconds to active instances 2. ✅ Pong response updates last activity 3. ✅ 3 missed pings triggers state change 4. ✅ Exponential backoff for reconnection 5. ✅ Dead instances cleaned up properly 6. ✅ Health status available via tool 7. ✅ No goroutine leaks 8. ✅ Graceful shutdown ## Edge Cases ### 1. Slow Pong Response - Use 2-second timeout - Don't wait for previous ping ### 2. Instance Restarts - Quick reconnection on same port - Reset health tracking ### 3. Network Hiccup - Don't immediately mark as dead - Allow for transient failures ### 4. Many Instances - Stagger pings to avoid load spikes - Use goroutine pool if needed ### 5. Ping During Shutdown - Cancel all in-flight pings - Clean shutdown ## Performance Considerations 1. **Goroutine Management** - Pool goroutines for pings - Limit concurrent pings 2. **Memory Usage** - Clean up old trackers - Bounded data structures 3. **CPU Usage** - Efficient timer usage - Avoid busy loops ## Next Steps 1. Step 6: End-to-end testing and verification ## Code Checklist - [ ] Implement health monitor component - [ ] Add ping/pong to hub client - [ ] Update connection manager with retry logic - [ ] Add health status tool - [ ] Create comprehensive tests - [ ] Test failure detection timing - [ ] Verify reconnection works - [ ] Check for goroutine leaks

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server