Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
mod.rs7.6 kB
// ABOUTME: Watch daemon module for automatic re-indexing of file changes // ABOUTME: Provides lifecycle management with SurrealDB health monitoring and circuit breaker pub mod config; pub mod health; pub mod manager; pub mod pid; pub mod session; pub mod status; pub use config::{BackoffConfig, CircuitBreakerConfig, WatchConfig}; pub use health::{CircuitState, HealthMonitor}; pub use manager::{DaemonManager, DaemonManagerState}; pub use pid::PidFile; pub use session::WatchSession; pub use status::{DaemonState, DaemonStatus, SessionMetrics}; use anyhow::{Context, Result}; use std::sync::Arc; use tokio::signal; use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; use crate::ProjectIndexer; /// Watch daemon - manages lifecycle for single project watching pub struct WatchDaemon { /// Configuration config: WatchConfig, /// Active watch session session: Option<WatchSession>, /// Health monitor for SurrealDB health_monitor: Arc<HealthMonitor>, /// Current daemon state state: Arc<RwLock<DaemonState>>, /// PID file manager pid_file: PidFile, /// Indexer for re-indexing indexer: Option<ProjectIndexer>, } impl WatchDaemon { /// Create a new daemon with configuration pub fn new(config: WatchConfig) -> Result<Self> { let pid_path = PidFile::default_path(&config.project_root); let pid_file = PidFile::new(pid_path); // Clean up stale PID file if present pid_file.cleanup_stale()?; let health_monitor = Arc::new(HealthMonitor::new( config.circuit_breaker.clone(), config.reconnect_backoff.clone(), )); Ok(Self { config, session: None, health_monitor, state: Arc::new(RwLock::new(DaemonState::Stopped)), pid_file, indexer: None, }) } /// Set the indexer for re-indexing operations pub fn set_indexer(&mut self, indexer: ProjectIndexer) { self.indexer = Some(indexer); } /// Start watching (blocking - runs event loop) pub async fn start(&mut self) -> Result<()> { // Check if already running if self.pid_file.is_process_running()? { anyhow::bail!( "Daemon already running (PID file: {:?})", self.pid_file.path() ); } // Update state *self.state.write().await = DaemonState::Starting; // Write PID file self.pid_file.write()?; // Create watch session let mut session = WatchSession::new(self.config.clone()) .await .context("Failed to create watch session")?; // Set indexer on session if let Some(indexer) = self.indexer.take() { session.set_indexer(indexer); } self.session = Some(session); // Update state to running *self.state.write().await = DaemonState::Running; info!( "Daemon started: watching {:?} ({} files)", self.config.project_root, self.session .as_ref() .map(|s| s.files_tracked()) .unwrap_or(0) ); // Run event loop self.run_event_loop().await } /// Stop daemon gracefully pub async fn stop(&mut self) -> Result<()> { info!("Initiating graceful shutdown..."); *self.state.write().await = DaemonState::Stopping; // Stop the session if let Some(ref mut session) = self.session { session.stop(); } // Remove PID file self.pid_file.remove()?; *self.state.write().await = DaemonState::Stopped; info!("Graceful shutdown complete"); Ok(()) } /// Get current status pub async fn status(&self) -> DaemonStatus { let state = *self.state.read().await; let pid = self.pid_file.read().ok().flatten(); let circuit_state = self.health_monitor.circuit_state().await; let (files_watched, metrics) = match &self.session { Some(session) => (session.files_tracked(), session.metrics().clone()), None => (0, SessionMetrics::new()), }; DaemonStatus { state, pid, started_at: if state == DaemonState::Running { Some(chrono::Utc::now()) } else { None }, project_root: self.config.project_root.clone(), files_watched, metrics, surrealdb_connected: circuit_state == CircuitState::Closed, circuit_state, } } /// Main event loop async fn run_event_loop(&mut self) -> Result<()> { let mut ctrl_c = Box::pin(signal::ctrl_c()); loop { tokio::select! { // Handle Ctrl+C _ = &mut ctrl_c => { info!("Received SIGINT, shutting down..."); break; } // Process file change batches batch = async { if let Some(session) = &self.session { session.next_batch().await } else { // No session, wait indefinitely std::future::pending::<Option<codegraph_parser::BatchedChanges>>().await } } => { if let Some(batch) = batch { // Check circuit breaker before processing if !self.health_monitor.should_allow_request().await { warn!("Circuit breaker open, skipping batch"); continue; } if let Some(ref mut session) = self.session { match session.process_batch(batch).await { Ok((indexed, deleted)) => { debug!("Batch processed: {} indexed, {} deleted", indexed, deleted); self.health_monitor.record_success().await; } Err(e) => { error!("Batch processing failed: {}", e); self.health_monitor.record_failure().await; } } } } } // Health check interval _ = tokio::time::sleep(std::time::Duration::from_secs( self.config.health_check_interval_secs )) => { debug!("Health check: circuit state = {:?}", self.health_monitor.circuit_state().await); } } } self.stop().await } } #[cfg(test)] mod tests { use super::*; use tempfile::TempDir; #[tokio::test] async fn test_daemon_creation() { let temp_dir = TempDir::new().unwrap(); let config = WatchConfig { project_root: temp_dir.path().to_path_buf(), debounce_ms: 30, batch_timeout_ms: 200, health_check_interval_secs: 30, reconnect_backoff: BackoffConfig::default(), circuit_breaker: CircuitBreakerConfig::default(), indexer: crate::IndexerConfig::default(), }; let daemon = WatchDaemon::new(config); assert!(daemon.is_ok()); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server