Skip to main content
Glama
juanqui
by juanqui
file_monitor.py64.5 kB
"""Advanced file system monitoring for document directory changes with metadata persistence.""" import asyncio import hashlib import json import logging import os import threading import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from .background_queue import BackgroundProcessingQueue, Job, JobType, Priority from .config import ServerConfig from .exceptions import FileMonitorError, FileSystemError logger = logging.getLogger(__name__) class FileEventDebouncer: """Debounces rapid file system events to avoid processing the same file multiple times.""" def __init__(self, delay: float = 0.5): """Initialize the debouncer. Args: delay: Delay in seconds before processing events. """ self.delay = delay self.pending_events: Dict[str, Tuple[str, float]] = {} self.lock = asyncio.Lock() async def add_event(self, file_path: str, event_type: str) -> bool: """Add an event to the debouncer. Args: file_path: Path to the file. event_type: Type of event (created, modified, deleted). Returns: True if event should be processed immediately, False if debounced. """ async with self.lock: current_time = time.time() # If we have a pending event for this file, update it if file_path in self.pending_events: self.pending_events[file_path] = (event_type, current_time) return False # Add new event self.pending_events[file_path] = (event_type, current_time) return True async def get_ready_events(self) -> List[Tuple[str, str]]: """Get events that are ready to be processed. Returns: List of (file_path, event_type) tuples ready for processing. """ async with self.lock: current_time = time.time() ready_events = [] expired_files = [] for file_path, (event_type, event_time) in self.pending_events.items(): if current_time - event_time >= self.delay: ready_events.append((file_path, event_type)) expired_files.append(file_path) # Remove processed events for file_path in expired_files: del self.pending_events[file_path] return ready_events class FileMetadata: """Represents metadata for a tracked file.""" def __init__( self, path: str, checksum: str, last_modified: float, file_size: int, processed_time: Optional[float] = None, document_id: Optional[str] = None, ): """Initialize file metadata. Args: path: File path. checksum: SHA-256 checksum. last_modified: Last modified timestamp. file_size: File size in bytes. processed_time: When the file was last processed. document_id: Associated document ID if processed. """ self.path = path self.checksum = checksum self.last_modified = last_modified self.file_size = file_size self.processed_time = processed_time self.document_id = document_id self.created_time = time.time() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "path": self.path, "checksum": self.checksum, "last_modified": self.last_modified, "file_size": self.file_size, "processed_time": self.processed_time, "document_id": self.document_id, "created_time": self.created_time, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "FileMetadata": """Create from dictionary.""" metadata = cls( path=data["path"], checksum=data["checksum"], last_modified=data["last_modified"], file_size=data["file_size"], processed_time=data.get("processed_time"), document_id=data.get("document_id"), ) metadata.created_time = data.get("created_time", time.time()) return metadata class FileMonitor: """Advanced file system monitoring with watchdog integration and metadata persistence.""" def __init__( self, config: ServerConfig, document_processor=None, vector_store=None, document_cache_callback=None, background_queue: Optional[BackgroundProcessingQueue] = None, web_document_service=None, ): """Initialize the file monitor. Args: config: Server configuration. document_processor: Document processing service. vector_store: Vector storage service. document_cache_callback: Callback function to update main server's document cache. background_queue: Optional background processing queue for non-blocking file processing. web_document_service: Optional WebDocumentService for creating in-progress document placeholders. """ self.config = config self.document_processor = document_processor self.vector_store = vector_store self.document_cache_callback = document_cache_callback self.background_queue = background_queue self.web_document_service = web_document_service # Store the web document service if self.background_queue: logger.info("🎯 FILE MONITOR: Background queue is AVAILABLE - will use background processing") else: logger.warning( "⚠️ FILE MONITOR: Background queue is NOT AVAILABLE - will use synchronous processing (blocks server!)" ) # File system monitoring self.observer = None self.event_handler = None self.is_running = False # Metadata persistence self.file_index: Dict[str, FileMetadata] = {} self.file_index_path = self.config.metadata_path / "file_index.json" self.index_lock = threading.Lock() # Use threading.Lock instead of asyncio.Lock for thread pool compatibility # Event processing self.debouncer = FileEventDebouncer(delay=1.0) self.processing_queue = asyncio.Queue() self.batch_processor_task: Optional[asyncio.Task] = None self.event_processor_task: Optional[asyncio.Task] = None self.periodic_scanner_task: Optional[asyncio.Task] = None # Processing state self.processing_files: Set[str] = set() self.failed_files: Dict[str, str] = {} # file_path -> error_message self.processing_stats = {"processed": 0, "failed": 0, "skipped": 0} # Thread pool for I/O operations self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="FileMonitor") # Directory exclusion settings self.excluded_directories = {"uploads", ".cache"} async def start_monitoring(self) -> None: """Start file system monitoring.""" try: if self.is_running: logger.warning("File monitor is already running") return logger.info("Starting file monitor...") # Load existing file index await self.load_file_index() # Perform startup synchronization await self.startup_synchronization() # Start watchdog observer await self._start_watchdog() # Start background processors self.batch_processor_task = asyncio.create_task(self._batch_processor()) self.event_processor_task = asyncio.create_task(self._event_processor()) # Start periodic scanner if enabled if self.config.enable_periodic_scan and self.config.file_scan_interval > 0: self.periodic_scanner_task = asyncio.create_task(self._periodic_scanner()) logger.info(f"📅 Periodic directory scanner started (interval: {self.config.file_scan_interval}s)") elif not self.config.enable_periodic_scan: logger.info("📅 Periodic directory scanning disabled by configuration") else: logger.info("📅 Periodic directory scanning disabled (interval = 0)") self.is_running = True logger.info(f"File monitor started, watching: {self.config.knowledgebase_path}") except Exception as e: raise FileMonitorError(f"Failed to start file monitor: {e}", "start", e) async def stop_monitoring(self) -> None: """Stop file system monitoring.""" try: if not self.is_running: return logger.info("Stopping file monitor...") self.is_running = False # Stop background tasks if self.batch_processor_task: self.batch_processor_task.cancel() try: await self.batch_processor_task except asyncio.CancelledError: pass if self.event_processor_task: self.event_processor_task.cancel() try: await self.event_processor_task except asyncio.CancelledError: pass if self.periodic_scanner_task: self.periodic_scanner_task.cancel() try: await self.periodic_scanner_task except asyncio.CancelledError: pass # Stop watchdog observer await self._stop_watchdog() # Save file index logger.info("Saving file index during shutdown...") try: logger.info("About to call save_file_index during shutdown") await self.save_file_index() logger.info("File index saved during shutdown") # Verify file exists after saving if self.file_index_path.exists(): file_size = self.file_index_path.stat().st_size logger.info(f"Verified file index exists after shutdown save (size: {file_size} bytes)") else: logger.error(f"File index not found after shutdown save at {self.file_index_path}") except Exception as e: logger.error(f"Error saving file index during shutdown: {e}") # Shutdown executor self.executor.shutdown(wait=True) logger.info("File monitor stopped") except Exception as e: logger.error(f"Error stopping file monitor: {e}") async def startup_synchronization(self) -> None: """Synchronize file system state with cached metadata on startup. This method performs a non-blocking startup synchronization by queuing files for background processing and returning immediately to avoid blocking server initialization. """ try: logger.info("Performing startup synchronization...") # Run synchronization in a separate task to avoid blocking asyncio.create_task(self._perform_startup_sync()) # Return immediately - don't wait for file processing logger.info("Startup synchronization initiated - files will be processed in background") except Exception as e: logger.error(f"Startup synchronization failed: {e}") # Don't raise here to avoid blocking server startup logger.warning("Continuing server startup despite sync error") async def _perform_startup_sync(self) -> None: """Internal method that performs the actual startup synchronization work. This runs in a separate task to avoid blocking the main initialization flow. """ try: logger.info("Background startup synchronization starting...") current_files = await self.scan_directory() cached_files = set(self.file_index.keys()) # Normalize paths for comparison - resolve to absolute paths current_file_strs = {str(f.resolve()) for f in current_files} cached_file_resolved = {str(Path(f).resolve()): f for f in cached_files} # Find new files (exist on disk but not in cache) potentially_new_files = [] for file_path in current_files: resolved_path = str(file_path.resolve()) if resolved_path not in cached_file_resolved: potentially_new_files.append(file_path) # Filter out files that already exist as completed documents in the document cache # This prevents treating already-processed documents as "new" during startup new_files = [] if self.web_document_service and hasattr(self.web_document_service, "document_cache"): for file_path in potentially_new_files: # Check if a document with this path already exists in the document cache document_exists = False for doc_id, document in self.web_document_service.document_cache.items(): if hasattr(document, "path") and str(Path(document.path).resolve()) == str(file_path.resolve()): document_exists = True # Update our file index with the existing document info try: stat = file_path.stat() # Calculate checksum in executor to avoid blocking checksum = await self.get_file_checksum(file_path) metadata = FileMetadata( path=str(file_path.resolve()), checksum=checksum, last_modified=stat.st_mtime, file_size=stat.st_size, processed_time=time.time(), document_id=doc_id, ) await self.update_file_metadata(file_path, metadata, save_immediately=False) logger.debug(f"Updated file index for existing document {file_path}") except Exception as e: logger.warning(f"Failed to update file index for existing document {file_path}: {e}") break if not document_exists: new_files.append(file_path) else: # No document cache available, treat all as potentially new new_files = potentially_new_files # Find deleted files (in cache but not on disk) deleted_files = [] for cached_file in cached_files: resolved_cached_path = str(Path(cached_file).resolve()) if resolved_cached_path not in current_file_strs: deleted_files.append(Path(cached_file)) # Find modified files modified_files = [] for file_path in current_files: resolved_path = str(file_path.resolve()) if resolved_path in cached_file_resolved: original_cached_path = cached_file_resolved[resolved_path] if await self._file_changed(file_path, self.file_index[original_cached_path]): modified_files.append(file_path) logger.info( f"Background startup sync: {len(new_files)} new, " f"{len(modified_files)} modified, {len(deleted_files)} deleted" ) # Queue changes for background processing (non-blocking) files_queued = 0 for file_path in new_files: if self.background_queue: logger.info(f"🚀 BACKGROUND STARTUP SYNC: Queuing NEW file {file_path} for processing") await self._queue_file_for_processing(file_path, JobType.FILE_WATCHER) files_queued += 1 else: logger.warning( f"⚠️ BACKGROUND STARTUP SYNC: No background queue, " f"queuing NEW file {file_path} for SYNCHRONOUS processing" ) await self._queue_file_processing(file_path, "created") for file_path in modified_files: if self.background_queue: logger.info(f"🚀 BACKGROUND STARTUP SYNC: Queuing MODIFIED file {file_path} for processing") await self._queue_file_for_processing(file_path, JobType.FILE_WATCHER) files_queued += 1 else: logger.warning( f"⚠️ BACKGROUND STARTUP SYNC: No background queue, queuing MODIFIED file {file_path} " f"for SYNCHRONOUS processing" ) await self._queue_file_processing(file_path, "modified") # Process deleted files immediately (fast operation) for file_path in deleted_files: await self.remove_file(file_path) if files_queued > 0: logger.info(f"Background sync queued {files_queued} files for processing - server remains responsive") # Save the file index once after all metadata updates during startup try: await self.save_file_index() logger.debug("Saved file index after background startup synchronization") except Exception as e: logger.warning(f"Failed to save file index after startup sync: {e}") logger.info("Background startup synchronization completed - files continue processing independently") except Exception as e: logger.error(f"Background startup synchronization failed: {e}") # Don't raise - just log the error and continue def _is_excluded_directory(self, file_path: Path) -> bool: """Check if a file path is in an excluded directory. Args: file_path: Path to check. Returns: True if the file is in an excluded directory. """ try: # Convert to relative path from knowledgebase directory for comparison try: relative_path = file_path.relative_to(self.config.knowledgebase_path) except ValueError: # Path is not within knowledgebase directory, allow it return False # Check if any part of the path matches excluded directories for part in relative_path.parts: if part in self.excluded_directories: return True return False except Exception as e: logger.warning(f"Error checking directory exclusion for {file_path}: {e}") return False # Default to not excluded on error async def scan_directory(self) -> List[Path]: """Scan the knowledgebase directory for supported files. Returns: List of file paths found. """ try: if not self.config.knowledgebase_path.exists(): logger.warning(f"Knowledgebase directory does not exist: {self.config.knowledgebase_path}") return [] files = [] for ext in self.config.supported_extensions: pattern = f"**/*{ext}" for file_path in self.config.knowledgebase_path.rglob(pattern): if file_path.is_file() and not self._is_excluded_directory(file_path): files.append(file_path) logger.debug( f"Scanned directory: found {len(files)} files (excluded directories: {self.excluded_directories})" ) return files except Exception as e: logger.error(f"Directory scan failed: {e}") return [] async def process_new_file(self, file_path: Path) -> None: """Process a newly detected file. Args: file_path: Path to the new file. """ try: if str(file_path) in self.processing_files: logger.debug(f"File already being processed: {file_path}") return # If background queue is available, queue the file for processing if self.background_queue: logger.info(f"🚀 FILE MONITOR: Queuing file {file_path} for BACKGROUND processing") await self._queue_file_for_processing(file_path, JobType.FILE_WATCHER) return # Fallback: Process synchronously logger.warning( f"⚠️ FILE MONITOR: No background queue available, processing {file_path} " f"SYNCHRONOUSLY (this will block the server!)" ) await self._process_file_synchronously(file_path) except Exception as e: self.failed_files[str(file_path)] = str(e) self.processing_stats["failed"] += 1 logger.error(f"Error processing new file {file_path}: {e}") async def _queue_file_for_processing(self, file_path: Path, job_type: JobType) -> None: """Queue a file for background processing. Args: file_path: Path to the file to process. job_type: Type of job for prioritization. """ try: # Calculate file metadata before queuing checksum = await self.get_file_checksum(file_path) stat = file_path.stat() # Create job metadata job_metadata = { "file_path": str(file_path), "checksum": checksum, "file_size": stat.st_size, "last_modified": stat.st_mtime, "queued_time": time.time(), } # Queue the job with appropriate priority priority = Priority.NORMAL if job_type == JobType.FILE_WATCHER else Priority.HIGH job_id = await self.background_queue.add_job( job_type=job_type, metadata=job_metadata, priority=priority, processor=self._process_document_job, ) # Create in-progress document placeholder if web service is available # But only if document doesn't already exist (to avoid showing existing docs as "Processing") if self.web_document_service: try: # Check if document already exists in document cache normalized_path = str(file_path.resolve()) existing_metadata = self.file_index.get(normalized_path) # Skip creating in-progress document if: # 1. File metadata shows it's already processed, OR # 2. A document with same path exists in document cache should_create_placeholder = True if existing_metadata and existing_metadata.document_id: # Check if document exists in cache if ( hasattr(self.web_document_service, "document_cache") and existing_metadata.document_id in self.web_document_service.document_cache ): should_create_placeholder = False logger.debug(f"Skipping in-progress document for {file_path} - already exists in cache") if should_create_placeholder: in_progress_doc = self.web_document_service._create_in_progress_document( job_id=job_id, filename=file_path.name, file_size=stat.st_size, temp_path=str(file_path) ) in_progress_doc.path = str(file_path) # Use actual path self.web_document_service.in_progress_documents[in_progress_doc.id] = in_progress_doc logger.debug(f"Created in-progress document {in_progress_doc.id} for file {file_path}") except Exception as e: logger.error(f"Failed to create in-progress document for {file_path}: {e}") logger.info(f"Queued file for processing: {file_path} (job_id: {job_id})") except Exception as e: logger.error(f"Failed to queue file for processing {file_path}: {e}") # Fallback to synchronous processing on queue failure await self._process_file_synchronously(file_path) async def _process_document_job(self, job: Job) -> None: """Background job processor for PDF files. Args: job: Job instance containing file processing metadata. """ file_path_str = job.metadata["file_path"] file_path = Path(file_path_str) job_id = job.job_id try: self.processing_files.add(file_path_str) logger.info(f"Starting background processing for: {file_path} (job_id: {job_id})") # Process the file if not self.document_processor: # Just track the file without processing checksum = job.metadata["checksum"] metadata = FileMetadata( path=file_path_str, checksum=checksum, last_modified=job.metadata["last_modified"], file_size=job.metadata["file_size"], ) await self.update_file_metadata(file_path, metadata) self.processing_stats["skipped"] += 1 logger.info(f"Tracked file without processing: {file_path}") # Remove in-progress document on skipped processing if web service is available if self.web_document_service: self._remove_in_progress_document(job_id) return result = await self.document_processor.process_document(file_path) if result.success and result.document: # Add to vector store if self.vector_store: await self.vector_store.add_document(result.document) # Update metadata metadata = FileMetadata( path=file_path_str, checksum=job.metadata["checksum"], last_modified=job.metadata["last_modified"], file_size=job.metadata["file_size"], processed_time=time.time(), document_id=result.document.id, ) await self.update_file_metadata(file_path, metadata) # Update main server's document cache via callback if self.document_cache_callback: await self.document_cache_callback(result.document) # Remove in-progress document on success if web service is available if self.web_document_service: self._remove_in_progress_document(job_id) self.processing_stats["processed"] += 1 logger.info(f"Successfully processed file in background: {file_path}") logger.debug(f"Document ID for processed file: {result.document.id}") else: error_msg = result.error or "Unknown processing error" self.failed_files[file_path_str] = error_msg self.processing_stats["failed"] += 1 # Update in-progress document status on failure if web service is available if self.web_document_service: self._update_in_progress_document_failed(job_id, error_msg) logger.error(f"Failed to process file {file_path}: {error_msg}") raise Exception(error_msg) except Exception as e: self.failed_files[file_path_str] = str(e) self.processing_stats["failed"] += 1 logger.error(f"Error in background processing for {file_path}: {e}") raise finally: self.processing_files.discard(file_path_str) async def _process_file_synchronously(self, file_path: Path) -> None: """Process a file synchronously (fallback when no background queue). Args: file_path: Path to the file to process. """ try: self.processing_files.add(str(file_path)) try: # Calculate file metadata checksum = await self.get_file_checksum(file_path) stat = file_path.stat() # Process the file if self.document_processor: result = await self.document_processor.process_document(file_path) if result.success and result.document: # Add to vector store if self.vector_store: await self.vector_store.add_document(result.document) # Update metadata metadata = FileMetadata( path=str(file_path), checksum=checksum, last_modified=stat.st_mtime, file_size=stat.st_size, processed_time=time.time(), document_id=result.document.id, ) await self.update_file_metadata(file_path, metadata) # Update main server's document cache via callback if self.document_cache_callback: await self.document_cache_callback(result.document) self.processing_stats["processed"] += 1 logger.info(f"Successfully processed new file: {file_path}") logger.debug(f"Document ID for processed file: {result.document.id}") else: error_msg = result.error or "Unknown processing error" self.failed_files[str(file_path)] = error_msg self.processing_stats["failed"] += 1 logger.error(f"Failed to process file {file_path}: {error_msg}") else: # No processor available, just track the file metadata = FileMetadata( path=str(file_path), checksum=checksum, last_modified=stat.st_mtime, file_size=stat.st_size, ) await self.update_file_metadata(file_path, metadata) self.processing_stats["skipped"] += 1 logger.info(f"Tracked file without processing: {file_path}") finally: self.processing_files.discard(str(file_path)) except Exception as e: self.failed_files[str(file_path)] = str(e) self.processing_stats["failed"] += 1 logger.error(f"Error processing file synchronously {file_path}: {e}") async def remove_file(self, file_path: Path) -> None: """Remove a deleted file from tracking and vector store. Args: file_path: Path to the removed file. """ try: # Use normalized path for lookup normalized_path = str(file_path.resolve()) # Get metadata before removal metadata = self.file_index.get(normalized_path) document_id = None if metadata and metadata.document_id: document_id = metadata.document_id # Remove from vector store if self.vector_store: removed_count = await self.vector_store.delete_document(document_id) logger.info(f"Removed {removed_count} chunks from vector store for deleted file: {file_path}") # Remove from main document cache via callback if available if self.web_document_service and hasattr(self.web_document_service, "document_cache"): if document_id in self.web_document_service.document_cache: del self.web_document_service.document_cache[document_id] logger.info(f"Removed document {document_id} from document cache") # Save the updated cache if callback available if self.web_document_service.save_cache_callback: try: await self.web_document_service.save_cache_callback() logger.debug(f"Saved document cache after removing {document_id}") except Exception as e: logger.warning(f"Failed to save document cache after removal: {e}") # Remove from file index with self.index_lock: if normalized_path in self.file_index: del self.file_index[normalized_path] # Save the file index synchronously to ensure persistence # This fixes the race condition where async tasks might not complete await self.save_file_index() # Remove from failed files if present self.failed_files.pop(normalized_path, None) if document_id: logger.info(f"Removed file {file_path} (document {document_id}) from all tracking systems") else: logger.info(f"Removed file {file_path} from tracking (no associated document)") except Exception as e: logger.error(f"Error removing file {file_path}: {e}") async def get_processed_files(self) -> List[dict]: """Get list of all processed files. Returns: List of file information dictionaries. """ try: with self.index_lock: files = [] for metadata in self.file_index.values(): file_info = { "path": metadata.path, "checksum": metadata.checksum, "last_modified": datetime.fromtimestamp(metadata.last_modified).isoformat(), "file_size": metadata.file_size, "processed": metadata.processed_time is not None, "document_id": metadata.document_id, } if metadata.processed_time: file_info["processed_time"] = datetime.fromtimestamp(metadata.processed_time).isoformat() files.append(file_info) return files except Exception as e: logger.error(f"Error getting processed files: {e}") return [] async def is_file_processed(self, file_path: Path) -> bool: """Check if a file has been processed. Args: file_path: Path to check. Returns: True if file has been processed. """ normalized_path = str(file_path.resolve()) metadata = self.file_index.get(normalized_path) return metadata is not None and metadata.processed_time is not None async def get_file_checksum(self, file_path: Path) -> str: """Calculate SHA-256 checksum of a file. Args: file_path: Path to the file. Returns: SHA-256 checksum as hex string. """ try: def _calculate_checksum() -> str: hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() # Run checksum calculation in thread pool loop = asyncio.get_event_loop() return await loop.run_in_executor(self.executor, _calculate_checksum) except Exception as e: raise FileSystemError(f"Failed to calculate checksum: {e}", str(file_path), e) async def load_file_index(self) -> None: """Load file index from persistent storage.""" try: logger.info(f"Looking for file index at: {self.file_index_path}") # Check if directory exists and is accessible directory = self.file_index_path.parent readable_status = os.access(directory, os.R_OK) if directory.exists() else "N/A" logger.info(f"Checking directory: {directory}, exists: {directory.exists()}, readable: {readable_status}") if self.file_index_path.exists(): logger.info(f"Found file index, loading from: {self.file_index_path}") def _load_index() -> Dict[str, Any]: with open(self.file_index_path, "r", encoding="utf-8") as f: return json.load(f) loop = asyncio.get_event_loop() data = await loop.run_in_executor(self.executor, _load_index) with self.index_lock: self.file_index = {} for file_path, metadata_dict in data.items(): try: self.file_index[file_path] = FileMetadata.from_dict(metadata_dict) except Exception as e: logger.warning(f"Skipping corrupted metadata for {file_path}: {e}") logger.info(f"Loaded file index with {len(self.file_index)} entries") else: self.file_index = {} logger.info(f"No existing file index found at {self.file_index_path}, starting fresh") except Exception as e: logger.error(f"Failed to load file index: {e}") self.file_index = {} def _save_file_index_sync(self, data: Dict[str, Dict]) -> None: """Save file index to persistent storage (synchronous, no locking).""" logger.info(f"Saving file index with {len(data)} entries to {self.file_index_path}") try: # Ensure metadata directory exists self.file_index_path.parent.mkdir(parents=True, exist_ok=True) logger.info(f"Directory created/exists: {self.file_index_path.parent}") # Check if directory is writable if not os.access(self.file_index_path.parent, os.W_OK): logger.error(f"Directory is not writable: {self.file_index_path.parent}") return # Write to temporary file first temp_path = self.file_index_path.with_suffix(".tmp") logger.info(f"Writing to temp file: {temp_path}") with open(temp_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) logger.info(f"Successfully wrote temp file: {temp_path}") # Atomic rename logger.info(f"Renaming {temp_path} to {self.file_index_path}") temp_path.replace(self.file_index_path) logger.info(f"Successfully saved file index to {self.file_index_path}") # Verify file was saved if self.file_index_path.exists(): file_size = self.file_index_path.stat().st_size logger.info(f"File index saved successfully (size: {file_size} bytes)") else: logger.error("File index not found after save operation") except Exception as e: logger.error(f"Error in save operation: {e}") raise async def save_file_index(self) -> None: """Save file index to persistent storage.""" try: with self.index_lock: data = {path: metadata.to_dict() for path, metadata in self.file_index.items()} # Run save in executor to avoid blocking loop = asyncio.get_event_loop() await loop.run_in_executor(self.executor, self._save_file_index_sync, data) except Exception as e: logger.error(f"Failed to save file index: {e}") async def update_file_metadata( self, file_path: Path, metadata: FileMetadata, save_immediately: bool = True ) -> None: """Update metadata for a file. Args: file_path: File path. metadata: New metadata. save_immediately: Whether to save the file index immediately (default: True). Set to False during bulk operations to avoid blocking. """ try: with self.index_lock: # Store normalized path (resolved to absolute path) normalized_path = str(file_path.resolve()) self.file_index[normalized_path] = metadata logger.debug( f"Updated metadata for {file_path} (normalized: {normalized_path}), " f"total files: {len(self.file_index)}" ) if save_immediately: # Save immediately for normal operations logger.debug(f"Saving file index after metadata update for {file_path}") data = {path: metadata.to_dict() for path, metadata in self.file_index.items()} # Run save in executor to avoid blocking loop = asyncio.get_event_loop() await loop.run_in_executor(self.executor, self._save_file_index_sync, data) logger.debug(f"Completed saving file index after metadata update for {file_path}") else: logger.debug(f"Deferred file index save for {file_path} (bulk operation)") except Exception as e: logger.error(f"Failed to update file metadata for {file_path}: {e}") async def _start_watchdog(self) -> None: """Start the watchdog file system observer.""" try: from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer class DocumentEventHandler(FileSystemEventHandler): def __init__(self, monitor: "FileMonitor"): self.monitor = monitor self.loop = asyncio.get_event_loop() def on_created(self, event: FileSystemEvent): if not event.is_directory and self._is_supported_file(event.src_path): if self.monitor.background_queue: asyncio.run_coroutine_threadsafe( self.monitor._queue_file_for_processing(Path(event.src_path), JobType.FILE_WATCHER), self.loop, ) else: asyncio.run_coroutine_threadsafe( self.monitor._queue_file_processing(Path(event.src_path), "created"), self.loop, ) def on_modified(self, event: FileSystemEvent): if not event.is_directory and self._is_supported_file(event.src_path): if self.monitor.background_queue: asyncio.run_coroutine_threadsafe( self.monitor._queue_file_for_processing(Path(event.src_path), JobType.FILE_WATCHER), self.loop, ) else: asyncio.run_coroutine_threadsafe( self.monitor._queue_file_processing(Path(event.src_path), "modified"), self.loop, ) def on_deleted(self, event: FileSystemEvent): if not event.is_directory and self._is_supported_file(event.src_path): asyncio.run_coroutine_threadsafe(self.monitor.remove_file(Path(event.src_path)), self.loop) def on_moved(self, event): # Handle file moves as delete + create if hasattr(event, "src_path") and self._is_supported_file(event.src_path): asyncio.run_coroutine_threadsafe(self.monitor.remove_file(Path(event.src_path)), self.loop) if hasattr(event, "dest_path") and self._is_supported_file(event.dest_path): if self.monitor.background_queue: asyncio.run_coroutine_threadsafe( self.monitor._queue_file_for_processing(Path(event.dest_path), JobType.FILE_WATCHER), self.loop, ) else: asyncio.run_coroutine_threadsafe( self.monitor._queue_file_processing(Path(event.dest_path), "created"), self.loop, ) def _is_supported_file(self, file_path: str) -> bool: """Check if file has supported extension and is not in an excluded directory.""" path = Path(file_path) # Check if file has supported extension if path.suffix.lower() not in [ext.lower() for ext in self.monitor.config.supported_extensions]: return False # Check if file is in an excluded directory if self.monitor._is_excluded_directory(path): logger.debug(f"Excluding file in restricted directory: {path}") return False return True self.event_handler = DocumentEventHandler(self) self.observer = Observer() self.observer.schedule(self.event_handler, str(self.config.knowledgebase_path), recursive=True) # Start observer in thread pool to avoid blocking loop = asyncio.get_event_loop() await loop.run_in_executor(self.executor, self.observer.start) logger.info("Watchdog file system observer started") except ImportError: raise FileMonitorError( "Watchdog package not installed. Install with: pip install watchdog", "start_watchdog", ) except Exception as e: raise FileMonitorError(f"Failed to start watchdog observer: {e}", "start_watchdog", e) async def _stop_watchdog(self) -> None: """Stop the watchdog file system observer.""" try: if self.observer: loop = asyncio.get_event_loop() await loop.run_in_executor(self.executor, self.observer.stop) await loop.run_in_executor(self.executor, self.observer.join) self.observer = None self.event_handler = None logger.info("Watchdog file system observer stopped") except Exception as e: logger.error(f"Error stopping watchdog observer: {e}") async def _queue_file_processing(self, file_path: Path, event_type: str) -> None: """Queue a file for processing with debouncing. Args: file_path: Path to the file. event_type: Type of event (created, modified, deleted). """ try: # Add to debouncer should_process = await self.debouncer.add_event(str(file_path), event_type) if should_process: await self.processing_queue.put((file_path, event_type)) except Exception as e: logger.error(f"Error queuing file processing for {file_path}: {e}") async def _batch_processor(self) -> None: """Background task that processes queued files in batches.""" try: while self.is_running: try: # Collect ready events from debouncer ready_events = await self.debouncer.get_ready_events() for file_path_str, event_type in ready_events: await self.processing_queue.put((Path(file_path_str), event_type)) # Process events in small batches batch = [] batch_size = 5 try: # Try to get up to batch_size items with a timeout for _ in range(batch_size): try: item = await asyncio.wait_for(self.processing_queue.get(), timeout=1.0) batch.append(item) except asyncio.TimeoutError: break except Exception: pass # Process the batch if batch: await self._process_batch(batch) # Brief pause to avoid busy waiting await asyncio.sleep(0.5) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in batch processor: {e}") await asyncio.sleep(1.0) except asyncio.CancelledError: pass async def _event_processor(self) -> None: """Background task that handles debounced events.""" try: while self.is_running: try: # Check for ready events ready_events = await self.debouncer.get_ready_events() for file_path_str, event_type in ready_events: file_path = Path(file_path_str) if event_type in ["created", "modified"]: if file_path.exists(): await self.process_new_file(file_path) elif event_type == "deleted": await self.remove_file(file_path) await asyncio.sleep(0.5) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in event processor: {e}") await asyncio.sleep(1.0) except asyncio.CancelledError: pass async def _process_batch(self, batch: List[Tuple[Path, str]]) -> None: """Process a batch of file events. Args: batch: List of (file_path, event_type) tuples. """ try: for file_path, event_type in batch: try: if event_type in ["created", "modified"]: if file_path.exists(): await self.process_new_file(file_path) elif event_type == "deleted": await self.remove_file(file_path) except Exception as e: logger.error(f"Error processing file {file_path} with event {event_type}: {e}") except Exception as e: logger.error(f"Error processing batch: {e}") async def _file_changed(self, file_path: Path, cached_metadata: FileMetadata) -> bool: """Check if a file has changed compared to cached metadata. Args: file_path: Path to the file. cached_metadata: Cached file metadata. Returns: True if file has changed. """ try: if not file_path.exists(): return True # File was deleted stat = file_path.stat() # Check file size and modification time first (fast) if stat.st_size != cached_metadata.file_size or abs(stat.st_mtime - cached_metadata.last_modified) > 1.0: return True # If size and mtime are same, check checksum (slower but accurate) current_checksum = await self.get_file_checksum(file_path) return current_checksum != cached_metadata.checksum except Exception as e: logger.error(f"Error checking file changes for {file_path}: {e}") return True # Assume changed on error async def _periodic_scanner(self) -> None: """Periodically scan the directory for changes as a fallback to watchdog. This provides reliable file monitoring in environments where filesystem events don't propagate properly (like Docker on macOS). """ try: logger.info(f"📅 Periodic scanner started, scanning every {self.config.file_scan_interval} seconds") while self.is_running: try: await asyncio.sleep(self.config.file_scan_interval) if not self.is_running: break logger.debug("📅 Periodic scanner: Scanning directory for changes...") # Perform the same logic as startup synchronization current_files = await self.scan_directory() cached_files = set(self.file_index.keys()) # Normalize paths for comparison current_file_strs = {str(f.resolve()) for f in current_files} cached_file_resolved = {str(Path(f).resolve()): f for f in cached_files} # Find new files new_files = [] for file_path in current_files: resolved_path = str(file_path.resolve()) if resolved_path not in cached_file_resolved: new_files.append(file_path) # Find deleted files deleted_files = [] for cached_file in cached_files: resolved_cached_path = str(Path(cached_file).resolve()) if resolved_cached_path not in current_file_strs: deleted_files.append(Path(cached_file)) # Find modified files modified_files = [] for file_path in current_files: resolved_path = str(file_path.resolve()) if resolved_path in cached_file_resolved: original_cached_path = cached_file_resolved[resolved_path] if await self._file_changed(file_path, self.file_index[original_cached_path]): modified_files.append(file_path) # Log changes if any detected if new_files or modified_files or deleted_files: logger.info( f"📅 Periodic scanner detected changes: {len(new_files)} new, " f"{len(modified_files)} modified, {len(deleted_files)} deleted" ) # Process new files for file_path in new_files: logger.info(f"📅 Periodic scanner: Processing NEW file {file_path}") await self.process_new_file(file_path) # Process modified files for file_path in modified_files: logger.info(f"📅 Periodic scanner: Processing MODIFIED file {file_path}") await self.process_new_file(file_path) # Process deleted files for file_path in deleted_files: logger.info(f"📅 Periodic scanner: Processing DELETED file {file_path}") await self.remove_file(file_path) except asyncio.CancelledError: break except Exception as e: logger.error(f"📅 Periodic scanner error: {e}") # Continue running despite errors await asyncio.sleep(5) # Brief pause before retrying except asyncio.CancelledError: pass except Exception as e: logger.error(f"📅 Periodic scanner failed: {e}") finally: logger.info("📅 Periodic scanner stopped") def get_processing_stats(self) -> Dict[str, Any]: """Get processing statistics. Returns: Dictionary with processing statistics. """ return { **self.processing_stats, "currently_processing": len(self.processing_files), "failed_files": len(self.failed_files), "tracked_files": len(self.file_index), "queue_size": self.processing_queue.qsize(), "is_running": self.is_running, } async def force_rescan(self) -> None: """Force a complete rescan of the directory.""" try: logger.info("Starting forced rescan...") # Clear processing state self.processing_files.clear() self.failed_files.clear() # Perform startup synchronization (which handles the comparison) await self.startup_synchronization() logger.info("Forced rescan completed") except Exception as e: logger.error(f"Forced rescan failed: {e}") raise FileMonitorError(f"Forced rescan failed: {e}", "force_rescan", e) async def manual_rescan(self) -> Dict[str, Any]: """Perform a manual directory rescan and return results. This is designed to be called from MCP commands and web UI. Returns: Dictionary with scan results and statistics. """ if not self.config.enable_manual_rescan: raise FileMonitorError("Manual rescan is disabled by configuration", "manual_rescan") try: logger.info("🔄 Manual rescan initiated...") start_time = time.time() # Perform directory scan current_files = await self.scan_directory() cached_files = set(self.file_index.keys()) # Normalize paths for comparison current_file_strs = {str(f.resolve()) for f in current_files} cached_file_resolved = {str(Path(f).resolve()): f for f in cached_files} # Find changes new_files = [] for file_path in current_files: resolved_path = str(file_path.resolve()) if resolved_path not in cached_file_resolved: new_files.append(file_path) deleted_files = [] for cached_file in cached_files: resolved_cached_path = str(Path(cached_file).resolve()) if resolved_cached_path not in current_file_strs: deleted_files.append(Path(cached_file)) modified_files = [] for file_path in current_files: resolved_path = str(file_path.resolve()) if resolved_path in cached_file_resolved: original_cached_path = cached_file_resolved[resolved_path] if await self._file_changed(file_path, self.file_index[original_cached_path]): modified_files.append(file_path) logger.info( f"🔄 Manual rescan detected: {len(new_files)} new, " f"{len(modified_files)} modified, {len(deleted_files)} deleted" ) # Process changes processed_new = 0 processed_modified = 0 processed_deleted = 0 errors = [] # Process new files for file_path in new_files: try: logger.info(f"🔄 Manual rescan: Processing NEW file {file_path}") await self.process_new_file(file_path) processed_new += 1 except Exception as e: error_msg = f"Failed to process new file {file_path}: {e}" logger.error(error_msg) errors.append(error_msg) # Process modified files for file_path in modified_files: try: logger.info(f"🔄 Manual rescan: Processing MODIFIED file {file_path}") await self.process_new_file(file_path) processed_modified += 1 except Exception as e: error_msg = f"Failed to process modified file {file_path}: {e}" logger.error(error_msg) errors.append(error_msg) # Process deleted files for file_path in deleted_files: try: logger.info(f"🔄 Manual rescan: Processing DELETED file {file_path}") await self.remove_file(file_path) processed_deleted += 1 except Exception as e: error_msg = f"Failed to process deleted file {file_path}: {e}" logger.error(error_msg) errors.append(error_msg) elapsed_time = time.time() - start_time result = { "scan_completed": True, "scan_time_seconds": round(elapsed_time, 2), "total_files_scanned": len(current_files), "changes_detected": { "new_files": len(new_files), "modified_files": len(modified_files), "deleted_files": len(deleted_files), }, "changes_processed": { "new_files_processed": processed_new, "modified_files_processed": processed_modified, "deleted_files_processed": processed_deleted, }, "errors": errors, "current_index_size": len(self.file_index), "timestamp": time.time(), } logger.info( f"🔄 Manual rescan completed in {elapsed_time:.2f}s: " f"{processed_new + processed_modified + processed_deleted} files processed, " f"{len(errors)} errors" ) return result except Exception as e: logger.error(f"🔄 Manual rescan failed: {e}") raise FileMonitorError(f"Manual rescan failed: {e}", "manual_rescan", e) def _remove_in_progress_document(self, job_id: str) -> None: """Remove in-progress document by job ID.""" if not self.web_document_service: return try: # Find in-progress document by job_id for doc_id, doc in list(self.web_document_service.in_progress_documents.items()): if doc.job_id == job_id: del self.web_document_service.in_progress_documents[doc_id] logger.info(f"📋 FILE MONITOR: Removed in-progress document {doc_id} after processing") break except Exception as e: logger.error(f"Failed to remove in-progress document for job {job_id}: {e}") def _update_in_progress_document_failed(self, job_id: str, error_msg: str) -> None: """Update in-progress document status to failed.""" if not self.web_document_service: return try: from .web.models.web_models import ProcessingStatus # Find and update in-progress document by job_id for doc_id, doc in self.web_document_service.in_progress_documents.items(): if doc.job_id == job_id: doc.processing_status = ProcessingStatus.FAILED doc.processing_error = error_msg logger.info(f"📋 FILE MONITOR: Updated in-progress document {doc_id} status to FAILED") break except Exception as e: logger.error(f"Failed to update in-progress document status for job {job_id}: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/juanqui/pdfkb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server