Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
session.rs9.9 kB
// ABOUTME: Watch session that connects FileSystemWatcher with ProjectIndexer // ABOUTME: Handles batch processing of file changes with incremental re-indexing use anyhow::{Context, Result}; use chrono::Utc; use codegraph_parser::{BatchedChanges, FileChangeEvent, FileSystemWatcher}; use std::path::PathBuf; use std::time::Duration; use tracing::{debug, error, info, warn}; use super::config::WatchConfig; use super::status::SessionMetrics; use crate::ProjectIndexer; /// Watch session - owns watcher and manages batch processing pub struct WatchSession { /// Project root being watched project_root: PathBuf, /// File system watcher (from codegraph-parser) watcher: FileSystemWatcher, /// Session metrics metrics: SessionMetrics, /// Configuration config: WatchConfig, /// Indexer for re-indexing changed files indexer: Option<ProjectIndexer>, } impl WatchSession { /// Create a new watch session pub async fn new(config: WatchConfig) -> Result<Self> { let mut watcher = FileSystemWatcher::new().context("Failed to create file system watcher")?; // Configure watcher based on config watcher.set_debounce_duration(Duration::from_millis(config.debounce_ms)); watcher.set_batch_timeout(Duration::from_millis(config.batch_timeout_ms)); // Set include patterns from indexer config if specified if !config.indexer.include_patterns.is_empty() { watcher .set_include_patterns(&config.indexer.include_patterns) .context("Failed to set include patterns")?; } // Add watch directory watcher .add_watch_directory(&config.project_root) .await .context("Failed to add watch directory")?; let tracked_files = watcher.get_tracked_files(); info!( "Watch session initialized: {} files tracked in {:?}", tracked_files.len(), config.project_root ); Ok(Self { project_root: config.project_root.clone(), watcher, metrics: SessionMetrics::new(), config, indexer: None, }) } /// Set the indexer for re-indexing (must be called before processing) pub fn set_indexer(&mut self, indexer: ProjectIndexer) { self.indexer = Some(indexer); } /// Get the number of tracked files pub fn files_tracked(&self) -> usize { self.watcher.get_tracked_files().len() } /// Get session metrics pub fn metrics(&self) -> &SessionMetrics { &self.metrics } /// Wait for and get the next batch of changes pub async fn next_batch(&self) -> Option<BatchedChanges> { self.watcher.next_batch().await } /// Process a batch of file changes pub async fn process_batch(&mut self, batch: BatchedChanges) -> Result<(u64, u64)> { let batch_id = &batch.batch_id; let change_count = batch.changes.len(); debug!( "Processing batch {}: {} changes at {}", batch_id, change_count, batch.timestamp ); let mut indexed = 0u64; let mut deleted = 0u64; for change in batch.changes { match change { FileChangeEvent::Created(_file_id, metadata) => { debug!("File created: {:?}", metadata.path); if self.should_index(&metadata.path) { match self.reindex_file(&metadata.path).await { Ok(_) => indexed += 1, Err(e) => { error!("Failed to index new file {:?}: {}", metadata.path, e); self.metrics.record_error(); } } } } FileChangeEvent::Modified(_file_id, new_metadata, old_metadata) => { debug!( "File modified: {:?} (hash changed: {} -> {})", new_metadata.path, old_metadata.content_hash, new_metadata.content_hash ); if self.should_index(&new_metadata.path) { match self.reindex_file(&new_metadata.path).await { Ok(_) => indexed += 1, Err(e) => { error!("Failed to reindex file {:?}: {}", new_metadata.path, e); self.metrics.record_error(); } } } } FileChangeEvent::Deleted(_file_id, metadata) => { debug!("File deleted: {:?}", metadata.path); match self.delete_file_data(&metadata.path).await { Ok(_) => deleted += 1, Err(e) => { error!("Failed to delete data for {:?}: {}", metadata.path, e); self.metrics.record_error(); } } } FileChangeEvent::Renamed(from_id, to_id, metadata) => { debug!("File renamed: {} -> {}", from_id.0, to_id.0); // Delete old data, reindex with new path let old_path = PathBuf::from(&from_id.0); let _ = self.delete_file_data(&old_path).await; if self.should_index(&metadata.path) { match self.reindex_file(&metadata.path).await { Ok(_) => { deleted += 1; indexed += 1; } Err(e) => { error!("Failed to reindex renamed file {:?}: {}", metadata.path, e); self.metrics.record_error(); } } } } } } self.metrics.record_batch(indexed, deleted); info!( "Batch {} complete: {} indexed, {} deleted ({} ms)", batch_id, indexed, deleted, Utc::now() .signed_duration_since(batch.timestamp) .num_milliseconds() ); Ok((indexed, deleted)) } /// Check if a file should be indexed based on configuration fn should_index(&self, path: &std::path::Path) -> bool { // Check language filter if !self.config.indexer.languages.is_empty() { if let Some(ext) = path.extension().and_then(|e| e.to_str()) { let ext_lower = ext.to_lowercase(); let matches_language = self.config.indexer.languages.iter().any(|lang: &String| { match lang.to_lowercase().as_str() { "rust" => ext_lower == "rs", "python" => ext_lower == "py", "typescript" => ext_lower == "ts" || ext_lower == "tsx", "javascript" => ext_lower == "js" || ext_lower == "jsx", "go" => ext_lower == "go", "java" => ext_lower == "java", "cpp" | "c++" => { ext_lower == "cpp" || ext_lower == "hpp" || ext_lower == "cc" } "c" => ext_lower == "c" || ext_lower == "h", _ => false, } }); if !matches_language { return false; } } } // Check exclude patterns for pattern in &self.config.indexer.exclude_patterns { if glob_match::glob_match(pattern, &path.to_string_lossy()) { return false; } } true } /// Re-index a single file async fn reindex_file(&self, path: &std::path::Path) -> Result<()> { // The indexer handles upsert semantics - no duplicates created if let Some(indexer) = &self.indexer { indexer .index_single_file(path) .await .with_context(|| format!("Failed to reindex {:?}", path))?; } else { warn!("No indexer set, skipping reindex of {:?}", path); } Ok(()) } /// Delete all data for a file async fn delete_file_data(&self, path: &std::path::Path) -> Result<()> { if let Some(indexer) = &self.indexer { indexer .delete_file_data(path) .await .with_context(|| format!("Failed to delete data for {:?}", path))?; } else { warn!("No indexer set, skipping delete for {:?}", path); } Ok(()) } /// Stop the watch session pub fn stop(&mut self) { // Watcher is dropped when session is dropped info!("Watch session stopped for {:?}", self.project_root); } } #[cfg(test)] mod tests { use super::*; #[test] fn test_should_index_rust_file() { let config = WatchConfig { project_root: PathBuf::from("/test"), debounce_ms: 30, batch_timeout_ms: 200, health_check_interval_secs: 30, reconnect_backoff: Default::default(), circuit_breaker: Default::default(), indexer: crate::IndexerConfig { languages: vec!["rust".to_string()], exclude_patterns: vec!["**/target/**".to_string()], ..Default::default() }, }; // Create a mock session to test should_index // Note: In real tests, we'd use a proper mock } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server