Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
health.rs8.42 kB
// ABOUTME: Health monitoring with circuit breaker pattern for SurrealDB resilience // ABOUTME: Provides automatic reconnection with exponential backoff use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; use tracing::{debug, info, warn}; use super::config::{BackoffConfig, CircuitBreakerConfig}; /// Circuit breaker state #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum CircuitState { /// Normal operation - requests allowed Closed, /// Testing if service recovered - single request allowed HalfOpen, /// Failing - requests blocked for timeout period Open, } impl std::fmt::Display for CircuitState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { CircuitState::Closed => write!(f, "Closed"), CircuitState::HalfOpen => write!(f, "Half-Open"), CircuitState::Open => write!(f, "Open"), } } } /// Health monitor with circuit breaker pattern pub struct HealthMonitor { /// Circuit breaker state circuit_state: Arc<RwLock<CircuitState>>, /// Configuration config: CircuitBreakerConfig, /// Backoff configuration backoff_config: BackoffConfig, /// Consecutive failures consecutive_failures: Arc<RwLock<u32>>, /// Consecutive successes (for half-open recovery) consecutive_successes: Arc<RwLock<u32>>, /// Time when circuit opened (for timeout) circuit_opened_at: Arc<RwLock<Option<Instant>>>, /// Current backoff duration current_backoff: Arc<RwLock<Duration>>, } impl HealthMonitor { pub fn new(config: CircuitBreakerConfig, backoff_config: BackoffConfig) -> Self { Self { circuit_state: Arc::new(RwLock::new(CircuitState::Closed)), config, backoff_config: backoff_config.clone(), consecutive_failures: Arc::new(RwLock::new(0)), consecutive_successes: Arc::new(RwLock::new(0)), circuit_opened_at: Arc::new(RwLock::new(None)), current_backoff: Arc::new(RwLock::new(Duration::from_secs( backoff_config.initial_secs, ))), } } /// Get current circuit state pub async fn circuit_state(&self) -> CircuitState { *self.circuit_state.read().await } /// Check if requests should be allowed pub async fn should_allow_request(&self) -> bool { let state = self.circuit_state.read().await; match *state { CircuitState::Closed => true, CircuitState::HalfOpen => true, CircuitState::Open => { // Check if timeout has elapsed if let Some(opened_at) = *self.circuit_opened_at.read().await { if opened_at.elapsed() >= Duration::from_secs(self.config.timeout_secs) { drop(state); self.transition_to_half_open().await; return true; } } debug!("Circuit open, blocking request"); false } } } /// Record a successful operation pub async fn record_success(&self) { let mut state = self.circuit_state.write().await; let mut successes = self.consecutive_successes.write().await; let mut failures = self.consecutive_failures.write().await; *failures = 0; *successes += 1; match *state { CircuitState::HalfOpen => { if *successes >= self.config.success_threshold { info!("Circuit breaker: Half-Open -> Closed (recovered)"); *state = CircuitState::Closed; *successes = 0; // Reset backoff *self.current_backoff.write().await = Duration::from_secs(self.backoff_config.initial_secs); } } CircuitState::Closed => { debug!("Health check passed"); } CircuitState::Open => { // Shouldn't happen, but handle gracefully *state = CircuitState::Closed; } } } /// Record a failed operation pub async fn record_failure(&self) { let mut state = self.circuit_state.write().await; let mut failures = self.consecutive_failures.write().await; let mut successes = self.consecutive_successes.write().await; *successes = 0; *failures += 1; match *state { CircuitState::Closed => { if *failures >= self.config.failure_threshold { warn!( "Circuit breaker: Closed -> Open ({} consecutive failures)", failures ); *state = CircuitState::Open; *self.circuit_opened_at.write().await = Some(Instant::now()); self.increase_backoff().await; } } CircuitState::HalfOpen => { warn!("Circuit breaker: Half-Open -> Open (test failed)"); *state = CircuitState::Open; *self.circuit_opened_at.write().await = Some(Instant::now()); self.increase_backoff().await; } CircuitState::Open => { debug!("Circuit already open, failure recorded"); } } } /// Get current backoff duration for retry pub async fn current_backoff(&self) -> Duration { *self.current_backoff.read().await } async fn transition_to_half_open(&self) { let mut state = self.circuit_state.write().await; if *state == CircuitState::Open { info!("Circuit breaker: Open -> Half-Open (testing recovery)"); *state = CircuitState::HalfOpen; *self.consecutive_successes.write().await = 0; } } async fn increase_backoff(&self) { let mut backoff = self.current_backoff.write().await; let new_backoff = (*backoff).mul_f64(self.backoff_config.multiplier); let max_backoff = Duration::from_secs(self.backoff_config.max_secs); *backoff = new_backoff.min(max_backoff); debug!("Backoff increased to {:?}", *backoff); } /// Reset the circuit breaker to initial state pub async fn reset(&self) { *self.circuit_state.write().await = CircuitState::Closed; *self.consecutive_failures.write().await = 0; *self.consecutive_successes.write().await = 0; *self.circuit_opened_at.write().await = None; *self.current_backoff.write().await = Duration::from_secs(self.backoff_config.initial_secs); info!("Circuit breaker reset to initial state"); } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_circuit_breaker_opens_after_failures() { let config = CircuitBreakerConfig { failure_threshold: 3, success_threshold: 2, timeout_secs: 10, }; let backoff = BackoffConfig::default(); let monitor = HealthMonitor::new(config, backoff); assert_eq!(monitor.circuit_state().await, CircuitState::Closed); for _ in 0..3 { monitor.record_failure().await; } assert_eq!(monitor.circuit_state().await, CircuitState::Open); } #[tokio::test] async fn test_circuit_breaker_recovers() { let config = CircuitBreakerConfig { failure_threshold: 2, success_threshold: 2, timeout_secs: 0, // Immediate timeout for test }; let backoff = BackoffConfig::default(); let monitor = HealthMonitor::new(config, backoff); // Open the circuit monitor.record_failure().await; monitor.record_failure().await; assert_eq!(monitor.circuit_state().await, CircuitState::Open); // Wait for timeout and check - should transition to half-open tokio::time::sleep(Duration::from_millis(10)).await; assert!(monitor.should_allow_request().await); // Record successes to close monitor.record_success().await; monitor.record_success().await; assert_eq!(monitor.circuit_state().await, CircuitState::Closed); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server