Skip to main content
Glama

Scribe MCP Server

by paxocial
file_watcher.py13.7 kB
"""File system watcher for detecting manual document edits.""" from __future__ import annotations import asyncio import hashlib import logging import time from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Dict, List, Optional, Set try: from watchdog.observers import Observer from watchdog.events import FileSystemEvent, FileSystemEventHandler WATCHDOG_AVAILABLE = True except ImportError: WATCHDOG_AVAILABLE = False Observer = None FileSystemEventHandler = None from scribe_mcp.utils.time import utcnow @dataclass class FileChangeEvent: """Represents a file system change event.""" file_path: Path event_type: str # 'created', 'modified', 'deleted', 'moved' timestamp: float = field(default_factory=time.time) size_bytes: Optional[int] = None checksum: Optional[str] = None metadata: Dict[str, any] = field(default_factory=dict) class DocumentFileHandler(FileSystemEventHandler): """Handles file system events for document files.""" def __init__( self, callback: Callable[[FileChangeEvent], None], watched_extensions: Set[str] = None, debounce_seconds: float = 1.0 ): super().__init__() self.callback = callback self.watched_extensions = watched_extensions or {'.md', '.txt', '.json'} self.debounce_seconds = debounce_seconds self._pending_events: Dict[str, float] = {} self._logger = logging.getLogger(__name__) def _should_process_file(self, file_path: str) -> bool: """Check if file should be processed based on extension.""" return Path(file_path).suffix.lower() in self.watched_extensions def _debounce_event(self, file_path: str) -> bool: """Check if event should be debounced (ignored due to recent activity).""" now = time.time() last_time = self._pending_events.get(file_path, 0) if now - last_time < self.debounce_seconds: return True # Debounce this event self._pending_events[file_path] = now return False def _create_change_event(self, event: FileSystemEvent) -> Optional[FileChangeEvent]: """Create a FileChangeEvent from a watchdog event.""" file_path = Path(event.src_path) if not self._should_process_file(str(file_path)): return None # Debounce rapid successive events if self._debounce_event(str(file_path)): return None try: # Calculate file size and checksum for file content changes size_bytes = None checksum = None if event.event_type in ['created', 'modified'] and file_path.exists(): size_bytes = file_path.stat().st_size # Calculate simple checksum for change detection checksum = self._calculate_checksum(file_path) return FileChangeEvent( file_path=file_path, event_type=event.event_type, size_bytes=size_bytes, checksum=checksum, metadata={ 'is_directory': event.is_directory, 'dest_path': getattr(event, 'dest_path', None) } ) except (OSError, IOError) as e: self._logger.warning(f"Failed to process file event for {file_path}: {e}") return None def _calculate_checksum(self, file_path: Path) -> Optional[str]: """Calculate SHA-256 checksum of file content (first 64KB for consistency).""" try: h = hashlib.sha256() with open(file_path, 'rb') as f: content = f.read(65536) # 64KB for consistency with integrity verifier h.update(content) return h.hexdigest() except (OSError, IOError): return None def on_any_event(self, event: FileSystemEvent): """Handle any file system event.""" if event.is_directory: return change_event = self._create_change_event(event) if change_event: try: self.callback(change_event) except Exception as e: self._logger.error(f"Error in file change callback: {e}") class FileSystemWatcher: """File system watcher for monitoring document changes.""" def __init__( self, project_root: Path, change_callback: Callable[[FileChangeEvent], None], watched_patterns: List[str] = None, debounce_seconds: float = 1.0, enable_watchdog: bool = True ): self.project_root = Path(project_root) self.change_callback = change_callback self.watched_patterns = watched_patterns or ['**/*.md', '**/*.txt', '**/*.json'] self.debounce_seconds = debounce_seconds self.enable_watchdog = enable_watchdog and WATCHDOG_AVAILABLE self._observer = None self._polling_task = None self._is_running = False self._logger = logging.getLogger(__name__) # Track file states for change detection self._file_states: Dict[str, Dict[str, any]] = {} self._watched_paths: Set[Path] = set() async def start(self) -> bool: """Start the file system watcher.""" if self._is_running: self._logger.warning("File watcher is already running") return True try: if self.enable_watchdog: success = await self._start_watchdog() else: success = await self._start_polling() if success: self._is_running = True self._logger.info(f"File watcher started for {self.project_root}") # Log which method we're using method = "watchdog" if self.enable_watchdog else "polling" self._logger.info(f"Using {method} for file monitoring") else: self._logger.error("Failed to start file watcher") return success except Exception as e: self._logger.error(f"Error starting file watcher: {e}") return False async def stop(self): """Stop the file system watcher.""" if not self._is_running: return self._is_running = False if self._observer: # Stop the thread, then join off-loop to avoid blocking self._observer.stop() await asyncio.to_thread(self._observer.join, 2.0) self._observer = None if self._polling_task: self._polling_task.cancel() try: await self._polling_task except asyncio.CancelledError: pass self._polling_task = None self._logger.info("File watcher stopped") async def _start_watchdog(self) -> bool: """Start watchdog-based file monitoring.""" if not WATCHDOG_AVAILABLE: self._logger.warning("Watchdog not available, falling back to polling") return await self._start_polling() try: self._observer = Observer() # Create event handler handler = DocumentFileHandler( callback=self._handle_file_change, debounce_seconds=self.debounce_seconds ) # Add recursive watch for project root self._observer.schedule(handler, str(self.project_root), recursive=True) # Start the observer self._observer.start() # Initialize file states await self._initialize_file_states() return True except Exception as e: self._logger.error(f"Failed to start watchdog observer: {e}") return False async def _start_polling(self) -> bool: """Start polling-based file monitoring.""" try: # Initialize file states await self._initialize_file_states() # Start polling task self._polling_task = asyncio.create_task(self._polling_loop()) return True except Exception as e: self._logger.error(f"Failed to start polling watcher: {e}") return False async def _initialize_file_states(self): """Initialize tracking of current file states.""" for pattern in self.watched_patterns: # glob off the loop paths = await asyncio.to_thread(lambda: list(self.project_root.glob(pattern))) for file_path in paths: if file_path.is_file(): await self._update_file_state(file_path) async def _update_file_state(self, file_path: Path): """Update the tracked state of a file.""" try: stat = file_path.stat() self._file_states[str(file_path)] = { 'mtime': stat.st_mtime, 'size': stat.st_size, 'checksum': self._calculate_checksum(file_path) } except (OSError, IOError): # File might be inaccessible pass def _calculate_checksum(self, file_path: Path) -> Optional[str]: """Calculate SHA-256 checksum of file content (first 64KB for consistency).""" try: h = hashlib.sha256() with open(file_path, 'rb') as f: content = f.read(65536) # 64KB for consistency with integrity verifier h.update(content) return h.hexdigest() except (OSError, ImportError): return None def _handle_file_change(self, change_event: FileChangeEvent): """Handle a detected file change.""" try: # Add project-specific metadata change_event.metadata.update({ 'project_root': str(self.project_root), 'relative_path': str(change_event.file_path.relative_to(self.project_root)) }) # Call the user-provided callback self.change_callback(change_event) except Exception as e: self._logger.error(f"Error handling file change: {e}") async def _polling_loop(self): """Main polling loop for file change detection.""" poll_interval = max(1.0, self.debounce_seconds) while self._is_running: try: await self._check_for_changes() await asyncio.sleep(poll_interval) except asyncio.CancelledError: break except Exception as e: self._logger.error(f"Error in polling loop: {e}") await asyncio.sleep(poll_interval) async def _check_for_changes(self): """Check for file changes via polling.""" current_files = set() # Discover current files for pattern in self.watched_patterns: for file_path in self.project_root.glob(pattern): if file_path.is_file(): current_files.add(str(file_path)) await self._check_file_changes(file_path) # Check for deleted files for file_path_str in list(self._file_states.keys()): if file_path_str not in current_files: await self._handle_file_deletion(file_path_str) async def _check_file_changes(self, file_path: Path): """Check if a specific file has changed.""" file_path_str = str(file_path) previous_state = self._file_states.get(file_path_str) try: current_stat = file_path.stat() current_state = { 'mtime': current_stat.st_mtime, 'size': current_stat.st_size, 'checksum': self._calculate_checksum(file_path) } if previous_state: # Check for changes if (current_state['mtime'] != previous_state['mtime'] or current_state['size'] != previous_state['size'] or current_state['checksum'] != previous_state['checksum']): # File has changed change_event = FileChangeEvent( file_path=file_path, event_type='modified', size_bytes=current_state['size'], checksum=current_state['checksum'], metadata={ 'previous_state': previous_state, 'detection_method': 'polling' } ) self._handle_file_change(change_event) # Update tracked state self._file_states[file_path_str] = current_state except (OSError, IOError): # File might be inaccessible or deleted pass async def _handle_file_deletion(self, file_path_str: str): """Handle file deletion detected via polling.""" if file_path_str in self._file_states: change_event = FileChangeEvent( file_path=Path(file_path_str), event_type='deleted', metadata={ 'previous_state': self._file_states[file_path_str], 'detection_method': 'polling' } ) self._handle_file_change(change_event) del self._file_states[file_path_str] def is_running(self) -> bool: """Check if the watcher is currently running.""" return self._is_running def get_method(self) -> str: """Get the monitoring method being used.""" return "watchdog" if self.enable_watchdog else "polling"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server