file_watcher_service.py•14.7 kB
"""
File Watcher Service for automatic index rebuilds.
This module provides file system monitoring capabilities that automatically
trigger index rebuilds when relevant files are modified, created, or deleted.
It uses the watchdog library for cross-platform file system event monitoring.
"""
# pylint: disable=missing-function-docstring  # Fallback stub methods don't need docstrings
import logging
import os
import traceback
from threading import Timer
from typing import Optional, Callable, List
from pathlib import Path
try:
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler, FileSystemEvent
    WATCHDOG_AVAILABLE = True
except ImportError:
    # Fallback classes for when watchdog is not available
    class Observer:
        """Fallback Observer class when watchdog library is not available."""
        def __init__(self):
            pass
        def schedule(self, *args, **kwargs):
            pass
        def start(self):
            pass
        def stop(self):
            pass
        def join(self, *args, **kwargs):
            pass
        def is_alive(self):
            return False
    class FileSystemEventHandler:
        """Fallback FileSystemEventHandler class when watchdog library is not available."""
        def __init__(self):
            pass
    class FileSystemEvent:
        """Fallback FileSystemEvent class when watchdog library is not available."""
        def __init__(self):
            self.is_directory = False
            self.src_path = ""
            self.event_type = ""
    WATCHDOG_AVAILABLE = False
from .base_service import BaseService
from ..constants import SUPPORTED_EXTENSIONS
class FileWatcherService(BaseService):
    """
    Service for monitoring file system changes and triggering index rebuilds.
    This service uses the watchdog library to monitor file system events and
    automatically triggers background index rebuilds when relevant files change.
    It includes intelligent debouncing to batch rapid changes and filtering
    to only monitor relevant file types.
    """
    MAX_RESTART_ATTEMPTS = 3
    def __init__(self, ctx):
        """
        Initialize the file watcher service.
        Args:
            ctx: The MCP Context object
        """
        super().__init__(ctx)
        self.logger = logging.getLogger(__name__)
        self.observer: Optional[Observer] = None
        self.event_handler: Optional[DebounceEventHandler] = None
        self.is_monitoring = False
        self.restart_attempts = 0
        self.rebuild_callback: Optional[Callable] = None
        # Check if watchdog is available
        if not WATCHDOG_AVAILABLE:
            self.logger.warning("Watchdog library not available - file watcher disabled")
    def start_monitoring(self, rebuild_callback: Callable) -> bool:
        """
        Start file system monitoring.
        Args:
            rebuild_callback: Function to call when rebuild is needed
        Returns:
            True if monitoring started successfully, False otherwise
        """
        if not WATCHDOG_AVAILABLE:
            self.logger.warning("Cannot start file watcher - watchdog library not available")
            return False
        if self.is_monitoring:
            self.logger.debug("File watcher already monitoring")
            return True
        # Validate project setup
        error = self._validate_project_setup()
        if error:
            self.logger.error("Cannot start file watcher: %s", error)
            return False
        self.rebuild_callback = rebuild_callback
        # Get debounce seconds from config
        config = self.settings.get_file_watcher_config()
        debounce_seconds = config.get('debounce_seconds', 6.0)
        try:
            self.observer = Observer()
            self.event_handler = DebounceEventHandler(
                debounce_seconds=debounce_seconds,
                rebuild_callback=self.rebuild_callback,
                base_path=Path(self.base_path),
                logger=self.logger
            )
            # Log detailed Observer setup
            watch_path = str(self.base_path)
            self.logger.debug("Scheduling Observer for path: %s", watch_path)
            self.observer.schedule(
                self.event_handler,
                watch_path,
                recursive=True
            )
            # Log Observer start
            self.logger.debug("Starting Observer...")
            self.observer.start()
            self.is_monitoring = True
            self.restart_attempts = 0
            # Log Observer thread info
            if hasattr(self.observer, '_thread'):
                self.logger.debug("Observer thread: %s", self.observer._thread)
            # Verify observer is actually running
            if self.observer.is_alive():
                self.logger.info(
                    "File watcher started successfully",
                    extra={
                        "debounce_seconds": debounce_seconds,
                        "monitored_path": str(self.base_path),
                        "supported_extensions": len(SUPPORTED_EXTENSIONS)
                    }
                )
                # Add diagnostic test - create a test event to verify Observer works
                self.logger.debug("Observer thread is alive: %s", self.observer.is_alive())
                self.logger.debug("Monitored path exists: %s", os.path.exists(str(self.base_path)))
                self.logger.debug("Event handler is set: %s", self.event_handler is not None)
                # Log current directory for comparison
                current_dir = os.getcwd()
                self.logger.debug("Current working directory: %s", current_dir)
                self.logger.debug("Are paths same: %s", os.path.normpath(current_dir) == os.path.normpath(str(self.base_path)))
                return True
            else:
                self.logger.error("File watcher failed to start - Observer not alive")
                return False
        except Exception as e:
            self.logger.warning("Failed to start file watcher: %s", e)
            self.logger.info("Falling back to reactive index refresh")
            return False
    def stop_monitoring(self) -> None:
        """
        Stop file system monitoring and cleanup all resources.
        This method ensures complete cleanup of:
        - Observer thread
        - Event handler
        - Debounce timers
        - Monitoring state
        """
        if not self.observer and not self.is_monitoring:
            # Already stopped or never started
            return
        self.logger.info("Stopping file watcher monitoring...")
        try:
            # Step 1: Stop the observer first
            if self.observer:
                self.logger.debug("Stopping observer...")
                self.observer.stop()
                # Step 2: Cancel any active debounce timer
                if self.event_handler and self.event_handler.debounce_timer:
                    self.logger.debug("Cancelling debounce timer...")
                    self.event_handler.debounce_timer.cancel()
                # Step 3: Wait for observer thread to finish (with timeout)
                self.logger.debug("Waiting for observer thread to finish...")
                self.observer.join(timeout=5.0)
                # Step 4: Check if thread actually finished
                if self.observer.is_alive():
                    self.logger.warning("Observer thread did not stop within timeout")
                else:
                    self.logger.debug("Observer thread stopped successfully")
            # Step 5: Clear all references
            self.observer = None
            self.event_handler = None
            self.rebuild_callback = None
            self.is_monitoring = False
            self.logger.info("File watcher stopped and cleaned up successfully")
        except Exception as e:
            self.logger.error("Error stopping file watcher: %s", e)
            # Force cleanup even if there were errors
            self.observer = None
            self.event_handler = None
            self.rebuild_callback = None
            self.is_monitoring = False
    def is_active(self) -> bool:
        """
        Check if file watcher is actively monitoring.
        Returns:
            True if actively monitoring, False otherwise
        """
        return (self.is_monitoring and
                self.observer and
                self.observer.is_alive())
    def restart_observer(self) -> bool:
        """
        Attempt to restart the file system observer.
        Returns:
            True if restart successful, False otherwise
        """
        if self.restart_attempts >= self.MAX_RESTART_ATTEMPTS:
            self.logger.error("Max restart attempts reached, file watcher disabled")
            return False
        self.logger.info("Attempting to restart file watcher (attempt %d)",
                         self.restart_attempts + 1)
        self.restart_attempts += 1
        # Stop current observer if running
        if self.observer:
            try:
                self.observer.stop()
                self.observer.join(timeout=2.0)
            except Exception as e:
                self.logger.warning("Error stopping observer during restart: %s", e)
        # Start new observer
        try:
            self.observer = Observer()
            self.observer.schedule(
                self.event_handler,
                str(self.base_path),
                recursive=True
            )
            self.observer.start()
            self.is_monitoring = True
            self.logger.info("File watcher restarted successfully")
            return True
        except Exception as e:
            self.logger.error("Failed to restart file watcher: %s", e)
            return False
    def get_status(self) -> dict:
        """
        Get current file watcher status information.
        Returns:
            Dictionary containing status information
        """
        # Get current debounce seconds from config
        config = self.settings.get_file_watcher_config()
        debounce_seconds = config.get('debounce_seconds', 6.0)
        return {
            "available": WATCHDOG_AVAILABLE,
            "active": self.is_active(),
            "monitoring": self.is_monitoring,
            "restart_attempts": self.restart_attempts,
            "debounce_seconds": debounce_seconds,
            "base_path": self.base_path if self.base_path else None,
            "observer_alive": self.observer.is_alive() if self.observer else False
        }
class DebounceEventHandler(FileSystemEventHandler):
    """
    File system event handler with debouncing capability.
    This handler filters file system events to only relevant files and
    implements a debounce mechanism to batch rapid changes into single
    rebuild operations.
    """
    def __init__(self, debounce_seconds: float, rebuild_callback: Callable,
                 base_path: Path, logger: logging.Logger, additional_excludes: Optional[List[str]] = None):
        """
        Initialize the debounce event handler.
        Args:
            debounce_seconds: Number of seconds to wait before triggering rebuild
            rebuild_callback: Function to call when rebuild is needed
            base_path: Base project path for filtering
            logger: Logger instance for debug messages
            additional_excludes: Additional patterns to exclude
        """
        from ..utils import FileFilter
        
        super().__init__()
        self.debounce_seconds = debounce_seconds
        self.rebuild_callback = rebuild_callback
        self.base_path = base_path
        self.debounce_timer: Optional[Timer] = None
        self.logger = logger
        # Use centralized file filtering
        self.file_filter = FileFilter(additional_excludes)
    def on_any_event(self, event: FileSystemEvent) -> None:
        """
        Handle any file system event.
        Args:
            event: The file system event
        """
        # Check if event should be processed
        should_process = self.should_process_event(event)
        if should_process:
            self.logger.info("File changed: %s - %s", event.event_type, event.src_path)
            self.reset_debounce_timer()
        else:
            # Only log at debug level for filtered events
            self.logger.debug("Filtered: %s - %s", event.event_type, event.src_path)
    def should_process_event(self, event: FileSystemEvent) -> bool:
        """
        Determine if event should trigger index rebuild using centralized filtering.
        Args:
            event: The file system event to evaluate
        Returns:
            True if event should trigger rebuild, False otherwise
        """
        # Skip directory events
        if event.is_directory:
            self.logger.debug("Skipping directory event: %s", event.src_path)
            return False
        # Select path to check: dest_path for moves, src_path for others
        if event.event_type == 'moved':
            if not hasattr(event, 'dest_path'):
                return False
            target_path = event.dest_path
        else:
            target_path = event.src_path
        # Use centralized filtering logic
        try:
            path = Path(target_path)
            should_process = self.file_filter.should_process_path(path, self.base_path)
            
            # Skip temporary files using centralized logic
            if not should_process or self.file_filter.is_temporary_file(path):
                return False
                
            return True
        except Exception:
            return False
    def reset_debounce_timer(self) -> None:
        """Reset the debounce timer, canceling any existing timer."""
        if self.debounce_timer:
            self.debounce_timer.cancel()
        self.debounce_timer = Timer(
            self.debounce_seconds,
            self.trigger_rebuild
        )
        self.debounce_timer.start()
    def trigger_rebuild(self) -> None:
        """Trigger index rebuild after debounce period."""
        self.logger.info("File changes detected, triggering rebuild")
        if self.rebuild_callback:
            try:
                result = self.rebuild_callback()
            except Exception as e:
                self.logger.error("Rebuild callback failed: %s", e)
                traceback_msg = traceback.format_exc()
                self.logger.error("Traceback: %s", traceback_msg)
        else:
            self.logger.warning("No rebuild callback configured")