Skip to main content
Glama

code-index-mcp

""" File Watcher Service for automatic index rebuilds. This module provides file system monitoring capabilities that automatically trigger index rebuilds when relevant files are modified, created, or deleted. It uses the watchdog library for cross-platform file system event monitoring. """ # pylint: disable=missing-function-docstring # Fallback stub methods don't need docstrings import logging import os import traceback from threading import Timer from typing import Optional, Callable, List from pathlib import Path try: from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, FileSystemEvent WATCHDOG_AVAILABLE = True except ImportError: # Fallback classes for when watchdog is not available class Observer: """Fallback Observer class when watchdog library is not available.""" def __init__(self): pass def schedule(self, *args, **kwargs): pass def start(self): pass def stop(self): pass def join(self, *args, **kwargs): pass def is_alive(self): return False class FileSystemEventHandler: """Fallback FileSystemEventHandler class when watchdog library is not available.""" def __init__(self): pass class FileSystemEvent: """Fallback FileSystemEvent class when watchdog library is not available.""" def __init__(self): self.is_directory = False self.src_path = "" self.event_type = "" WATCHDOG_AVAILABLE = False from .base_service import BaseService from ..constants import SUPPORTED_EXTENSIONS class FileWatcherService(BaseService): """ Service for monitoring file system changes and triggering index rebuilds. This service uses the watchdog library to monitor file system events and automatically triggers background index rebuilds when relevant files change. It includes intelligent debouncing to batch rapid changes and filtering to only monitor relevant file types. """ MAX_RESTART_ATTEMPTS = 3 def __init__(self, ctx): """ Initialize the file watcher service. Args: ctx: The MCP Context object """ super().__init__(ctx) self.logger = logging.getLogger(__name__) self.observer: Optional[Observer] = None self.event_handler: Optional[DebounceEventHandler] = None self.is_monitoring = False self.restart_attempts = 0 self.rebuild_callback: Optional[Callable] = None # Check if watchdog is available if not WATCHDOG_AVAILABLE: self.logger.warning("Watchdog library not available - file watcher disabled") def start_monitoring(self, rebuild_callback: Callable) -> bool: """ Start file system monitoring. Args: rebuild_callback: Function to call when rebuild is needed Returns: True if monitoring started successfully, False otherwise """ if not WATCHDOG_AVAILABLE: self.logger.warning("Cannot start file watcher - watchdog library not available") return False if self.is_monitoring: self.logger.debug("File watcher already monitoring") return True # Validate project setup error = self._validate_project_setup() if error: self.logger.error("Cannot start file watcher: %s", error) return False self.rebuild_callback = rebuild_callback # Get debounce seconds from config config = self.settings.get_file_watcher_config() debounce_seconds = config.get('debounce_seconds', 6.0) try: self.observer = Observer() self.event_handler = DebounceEventHandler( debounce_seconds=debounce_seconds, rebuild_callback=self.rebuild_callback, base_path=Path(self.base_path), logger=self.logger ) # Log detailed Observer setup watch_path = str(self.base_path) self.logger.debug("Scheduling Observer for path: %s", watch_path) self.observer.schedule( self.event_handler, watch_path, recursive=True ) # Log Observer start self.logger.debug("Starting Observer...") self.observer.start() self.is_monitoring = True self.restart_attempts = 0 # Log Observer thread info if hasattr(self.observer, '_thread'): self.logger.debug("Observer thread: %s", self.observer._thread) # Verify observer is actually running if self.observer.is_alive(): self.logger.info( "File watcher started successfully", extra={ "debounce_seconds": debounce_seconds, "monitored_path": str(self.base_path), "supported_extensions": len(SUPPORTED_EXTENSIONS) } ) # Add diagnostic test - create a test event to verify Observer works self.logger.debug("Observer thread is alive: %s", self.observer.is_alive()) self.logger.debug("Monitored path exists: %s", os.path.exists(str(self.base_path))) self.logger.debug("Event handler is set: %s", self.event_handler is not None) # Log current directory for comparison current_dir = os.getcwd() self.logger.debug("Current working directory: %s", current_dir) self.logger.debug("Are paths same: %s", os.path.normpath(current_dir) == os.path.normpath(str(self.base_path))) return True else: self.logger.error("File watcher failed to start - Observer not alive") return False except Exception as e: self.logger.warning("Failed to start file watcher: %s", e) self.logger.info("Falling back to reactive index refresh") return False def stop_monitoring(self) -> None: """ Stop file system monitoring and cleanup all resources. This method ensures complete cleanup of: - Observer thread - Event handler - Debounce timers - Monitoring state """ if not self.observer and not self.is_monitoring: # Already stopped or never started return self.logger.info("Stopping file watcher monitoring...") try: # Step 1: Stop the observer first if self.observer: self.logger.debug("Stopping observer...") self.observer.stop() # Step 2: Cancel any active debounce timer if self.event_handler and self.event_handler.debounce_timer: self.logger.debug("Cancelling debounce timer...") self.event_handler.debounce_timer.cancel() # Step 3: Wait for observer thread to finish (with timeout) self.logger.debug("Waiting for observer thread to finish...") self.observer.join(timeout=5.0) # Step 4: Check if thread actually finished if self.observer.is_alive(): self.logger.warning("Observer thread did not stop within timeout") else: self.logger.debug("Observer thread stopped successfully") # Step 5: Clear all references self.observer = None self.event_handler = None self.rebuild_callback = None self.is_monitoring = False self.logger.info("File watcher stopped and cleaned up successfully") except Exception as e: self.logger.error("Error stopping file watcher: %s", e) # Force cleanup even if there were errors self.observer = None self.event_handler = None self.rebuild_callback = None self.is_monitoring = False def is_active(self) -> bool: """ Check if file watcher is actively monitoring. Returns: True if actively monitoring, False otherwise """ return (self.is_monitoring and self.observer and self.observer.is_alive()) def restart_observer(self) -> bool: """ Attempt to restart the file system observer. Returns: True if restart successful, False otherwise """ if self.restart_attempts >= self.MAX_RESTART_ATTEMPTS: self.logger.error("Max restart attempts reached, file watcher disabled") return False self.logger.info("Attempting to restart file watcher (attempt %d)", self.restart_attempts + 1) self.restart_attempts += 1 # Stop current observer if running if self.observer: try: self.observer.stop() self.observer.join(timeout=2.0) except Exception as e: self.logger.warning("Error stopping observer during restart: %s", e) # Start new observer try: self.observer = Observer() self.observer.schedule( self.event_handler, str(self.base_path), recursive=True ) self.observer.start() self.is_monitoring = True self.logger.info("File watcher restarted successfully") return True except Exception as e: self.logger.error("Failed to restart file watcher: %s", e) return False def get_status(self) -> dict: """ Get current file watcher status information. Returns: Dictionary containing status information """ # Get current debounce seconds from config config = self.settings.get_file_watcher_config() debounce_seconds = config.get('debounce_seconds', 6.0) return { "available": WATCHDOG_AVAILABLE, "active": self.is_active(), "monitoring": self.is_monitoring, "restart_attempts": self.restart_attempts, "debounce_seconds": debounce_seconds, "base_path": self.base_path if self.base_path else None, "observer_alive": self.observer.is_alive() if self.observer else False } class DebounceEventHandler(FileSystemEventHandler): """ File system event handler with debouncing capability. This handler filters file system events to only relevant files and implements a debounce mechanism to batch rapid changes into single rebuild operations. """ def __init__(self, debounce_seconds: float, rebuild_callback: Callable, base_path: Path, logger: logging.Logger, additional_excludes: Optional[List[str]] = None): """ Initialize the debounce event handler. Args: debounce_seconds: Number of seconds to wait before triggering rebuild rebuild_callback: Function to call when rebuild is needed base_path: Base project path for filtering logger: Logger instance for debug messages additional_excludes: Additional patterns to exclude """ from ..utils import FileFilter super().__init__() self.debounce_seconds = debounce_seconds self.rebuild_callback = rebuild_callback self.base_path = base_path self.debounce_timer: Optional[Timer] = None self.logger = logger # Use centralized file filtering self.file_filter = FileFilter(additional_excludes) def on_any_event(self, event: FileSystemEvent) -> None: """ Handle any file system event. Args: event: The file system event """ # Check if event should be processed should_process = self.should_process_event(event) if should_process: self.logger.info("File changed: %s - %s", event.event_type, event.src_path) self.reset_debounce_timer() else: # Only log at debug level for filtered events self.logger.debug("Filtered: %s - %s", event.event_type, event.src_path) def should_process_event(self, event: FileSystemEvent) -> bool: """ Determine if event should trigger index rebuild using centralized filtering. Args: event: The file system event to evaluate Returns: True if event should trigger rebuild, False otherwise """ # Skip directory events if event.is_directory: self.logger.debug("Skipping directory event: %s", event.src_path) return False # Select path to check: dest_path for moves, src_path for others if event.event_type == 'moved': if not hasattr(event, 'dest_path'): return False target_path = event.dest_path else: target_path = event.src_path # Use centralized filtering logic try: path = Path(target_path) should_process = self.file_filter.should_process_path(path, self.base_path) # Skip temporary files using centralized logic if not should_process or self.file_filter.is_temporary_file(path): return False return True except Exception: return False def reset_debounce_timer(self) -> None: """Reset the debounce timer, canceling any existing timer.""" if self.debounce_timer: self.debounce_timer.cancel() self.debounce_timer = Timer( self.debounce_seconds, self.trigger_rebuild ) self.debounce_timer.start() def trigger_rebuild(self) -> None: """Trigger index rebuild after debounce period.""" self.logger.info("File changes detected, triggering rebuild") if self.rebuild_callback: try: result = self.rebuild_callback() except Exception as e: self.logger.error("Rebuild callback failed: %s", e) traceback_msg = traceback.format_exc() self.logger.error("Traceback: %s", traceback_msg) else: self.logger.warning("No rebuild callback configured")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnhuang316/code-index-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server