Skip to main content
Glama
watcher.py11.4 kB
""" Main file watcher implementation using watchdog library. """ import logging from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Callable, Any, Optional, List, Set from nabu.file_watcher.debouncer import FileChangeDebouncer from nabu.file_watcher.filters import FileFilter from nabu.file_watcher.events import FileChangeEvent, FileChangeType logger = logging.getLogger(__name__) class FileWatcher: """ Elegant file system watcher with debouncing and filtering. Features: - Monitors codebase directory for file changes - Debounces events (accumulates changes, fires after inactivity) - Filters files using gitignore-style patterns - Thread-safe callback execution via thread pool - Graceful start/stop with cleanup - Decoupled design - no hard dependencies on nabu internals Example: # Simple usage def handle_change(file_path: str): print(f"File changed: {file_path}") watcher = FileWatcher( codebase_path="/path/to/code", on_file_changed=handle_change, debounce_seconds=5.0 ) watcher.start() # ... watcher runs in background ... watcher.stop() Example with incremental updater: from nabu.incremental import IncrementalUpdater updater = IncrementalUpdater("nabu.kuzu") watcher = FileWatcher( codebase_path="/path/to/code", on_file_changed=updater.update_file, debounce_seconds=5.0, watch_extensions=['.py', '.java', '.cpp'] ) watcher.start() """ def __init__( self, codebase_path: str, on_file_changed: Callable[[str], Any], debounce_seconds: float = 5.0, ignore_patterns: Optional[List[str]] = None, watch_extensions: Optional[List[str]] = None, executor: Optional[ThreadPoolExecutor] = None ): """ Initialize file watcher. Args: codebase_path: Root directory to watch on_file_changed: Callback function(file_path: str) -> Any Called for each changed file (in thread pool) debounce_seconds: Seconds to wait after last change before processing ignore_patterns: Gitignore-style patterns to exclude (None = use defaults) watch_extensions: File extensions to watch (None = watch all) executor: Optional ThreadPoolExecutor for callbacks (None = create with max_workers=1 for serial execution) """ self.codebase_path = Path(codebase_path).resolve() self.on_file_changed = on_file_changed self.debounce_seconds = debounce_seconds # Validate codebase path if not self.codebase_path.exists(): raise ValueError(f"Codebase path does not exist: {self.codebase_path}") if not self.codebase_path.is_dir(): raise ValueError(f"Codebase path is not a directory: {self.codebase_path}") # Setup file filter if ignore_patterns is None: ignore_patterns = FileFilter.default_ignores() self.file_filter = FileFilter( ignore_patterns=ignore_patterns, watch_extensions=watch_extensions, codebase_path=self.codebase_path ) # Setup debouncer self.debouncer = FileChangeDebouncer( callback=self._process_batch, delay_seconds=debounce_seconds ) # Thread pool for callback execution (serial by default for thread safety) self.executor = executor or ThreadPoolExecutor( max_workers=1, thread_name_prefix="nabu_watcher" ) self._owns_executor = (executor is None) # Watchdog components (initialized on start) self.observer = None self._event_handler = None self._running = False logger.info( f"FileWatcher initialized: path={self.codebase_path}, " f"debounce={debounce_seconds}s, " f"extensions={watch_extensions or 'all'}" ) def start(self) -> None: """ Start watching for file changes. Spawns watchdog observer in background thread. Raises: RuntimeError: If already started ImportError: If watchdog library not available """ if self._running: raise RuntimeError("FileWatcher is already running") try: from watchdog.observers import Observer from watchdog.events import ( FileSystemEventHandler, FileModifiedEvent, FileCreatedEvent, FileDeletedEvent, FileMovedEvent ) except ImportError: raise ImportError( "watchdog library not available. Install with: pip install watchdog" ) # Create custom event handler class NabuFileEventHandler(FileSystemEventHandler): """Handles file system events and forwards to debouncer.""" def __init__(self, watcher: 'FileWatcher'): super().__init__() self.watcher = watcher def on_modified(self, event): if not event.is_directory: self.watcher._handle_event(event.src_path, FileChangeType.MODIFIED) def on_created(self, event): if not event.is_directory: self.watcher._handle_event(event.src_path, FileChangeType.CREATED) def on_deleted(self, event): if not event.is_directory: self.watcher._handle_event(event.src_path, FileChangeType.DELETED) def on_moved(self, event): if not event.is_directory: # Treat move destination as creation self.watcher._handle_event(event.dest_path, FileChangeType.MOVED) # Create observer and event handler self._event_handler = NabuFileEventHandler(self) self.observer = Observer() self.observer.schedule( self._event_handler, str(self.codebase_path), recursive=True ) # Start observer thread self.observer.start() self._running = True logger.info(f"FileWatcher started: watching {self.codebase_path}") def stop(self, timeout: float = 10.0) -> None: """ Stop watching and cleanup resources. Args: timeout: Seconds to wait for observer thread to stop """ if not self._running: logger.debug("FileWatcher already stopped") return logger.info("Stopping FileWatcher...") # Stop observer if self.observer: try: self.observer.stop() self.observer.join(timeout=timeout) if self.observer.is_alive(): logger.warning( f"Observer thread did not stop within {timeout}s" ) except Exception as e: logger.error(f"Error stopping observer: {e}") finally: self.observer = None # Flush pending changes try: self.debouncer.flush() except Exception as e: logger.error(f"Error flushing debouncer: {e}") # Stop debouncer self.debouncer.stop() # Shutdown executor if we own it if self._owns_executor and self.executor: try: self.executor.shutdown(wait=True, cancel_futures=False) except Exception as e: logger.error(f"Error shutting down executor: {e}") self._running = False logger.info("FileWatcher stopped") def _handle_event(self, file_path: str, change_type: FileChangeType) -> None: """ Handle file system event. Filters file and adds to debouncer if it should be watched. Runs in watchdog observer thread. Args: file_path: Path to changed file change_type: Type of change """ # Apply filter if not self.file_filter.should_watch(file_path): logger.debug(f"Filtered out: {file_path}") return # Normalize to absolute path abs_path = str(Path(file_path).resolve()) logger.debug(f"Event: {change_type.value} - {abs_path}") # Add to debouncer self.debouncer.add_change(abs_path) def _process_batch(self, file_paths: Set[str]) -> None: """ Process batch of changed files. Runs in debouncer timer thread. Submits callbacks to executor. Args: file_paths: Set of absolute file paths to process """ logger.info(f"Processing batch of {len(file_paths)} file(s)") # Submit each file to executor futures = [] for file_path in sorted(file_paths): future = self.executor.submit( self._execute_callback, file_path ) futures.append(future) # Wait for all callbacks to complete (for logging purposes) success_count = 0 error_count = 0 for future in futures: try: future.result() success_count += 1 except Exception as e: error_count += 1 # Error already logged in _execute_callback logger.info( f"Batch complete: {success_count} succeeded, {error_count} failed" ) def _execute_callback(self, file_path: str) -> None: """ Execute user callback for single file. Runs in executor thread pool. Args: file_path: Absolute path to changed file """ try: logger.debug(f"Executing callback for: {file_path}") result = self.on_file_changed(file_path) logger.debug(f"Callback completed for: {file_path}") return result except Exception as e: logger.error( f"Error in callback for {file_path}: {e}", exc_info=True ) raise def is_running(self) -> bool: """Check if watcher is currently running.""" return self._running def __enter__(self): """Context manager entry - starts watcher.""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - stops watcher.""" self.stop() return False def __repr__(self) -> str: status = "running" if self._running else "stopped" return ( f"FileWatcher(path={self.codebase_path}, " f"debounce={self.debounce_seconds}s, " f"status={status})" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server