Skip to main content
Glama
debouncer.py5.78 kB
""" Debouncing logic for file change events. """ import logging import threading from typing import Callable, Set, Optional logger = logging.getLogger(__name__) class FileChangeDebouncer: """ Accumulates file changes and triggers callback after inactivity period. Thread-safe debouncing mechanism: - Accumulates file paths in a set (deduplicates) - Resets timer on each new event - Calls callback after delay_seconds of inactivity - Ensures callback is only executed once per batch Example: def process_files(files: Set[str]): print(f"Processing {len(files)} files: {files}") debouncer = FileChangeDebouncer( callback=process_files, delay_seconds=5.0 ) # Add changes as they occur debouncer.add_change('/path/to/file1.py') debouncer.add_change('/path/to/file2.py') debouncer.add_change('/path/to/file1.py') # Deduplicated # After 5 seconds of no new changes: # -> process_files({'/path/to/file1.py', '/path/to/file2.py'}) """ def __init__( self, callback: Callable[[Set[str]], None], delay_seconds: float ): """ Initialize debouncer. Args: callback: Function to call with accumulated file paths delay_seconds: Seconds to wait after last change before firing """ self.callback = callback self.delay_seconds = delay_seconds self.pending_files: Set[str] = set() self.timer: Optional[threading.Timer] = None self.lock = threading.Lock() self._stopped = False logger.debug(f"FileChangeDebouncer initialized with {delay_seconds}s delay") def add_change(self, file_path: str) -> None: """ Add file to pending changes and reset timer. Thread-safe. Can be called from multiple threads. Args: file_path: Absolute path to changed file """ if self._stopped: logger.debug(f"Debouncer stopped, ignoring change: {file_path}") return with self.lock: self.pending_files.add(file_path) self._reset_timer() logger.debug( f"Added change: {file_path} " f"(total pending: {len(self.pending_files)})" ) def _reset_timer(self) -> None: """ Cancel existing timer and start new one. Must be called with lock held. """ # Cancel existing timer if self.timer is not None: self.timer.cancel() # Start new timer self.timer = threading.Timer( self.delay_seconds, self._fire_callback ) self.timer.daemon = True # Don't block program exit self.timer.start() logger.debug(f"Timer reset: will fire in {self.delay_seconds}s") def _fire_callback(self) -> None: """ Execute callback with accumulated changes. Runs in timer thread. Acquires lock only briefly to copy files. """ # Get copy of pending files while holding lock with self.lock: if not self.pending_files or self._stopped: return files_to_process = self.pending_files.copy() self.pending_files.clear() self.timer = None # Execute callback outside lock to avoid blocking add_change logger.info( f"Debounce timer fired: processing {len(files_to_process)} file(s)" ) try: self.callback(files_to_process) logger.info(f"Callback completed successfully") except Exception as e: logger.error( f"Error in debounce callback: {e}", exc_info=True ) def flush(self) -> None: """ Immediately process pending changes without waiting for timer. Useful for graceful shutdown or testing. """ with self.lock: # Cancel timer if running if self.timer is not None: self.timer.cancel() self.timer = None # Process pending files if any if self.pending_files: files_to_process = self.pending_files.copy() self.pending_files.clear() if files_to_process: logger.info(f"Flushing {len(files_to_process)} pending file(s)") try: self.callback(files_to_process) except Exception as e: logger.error(f"Error in flush callback: {e}", exc_info=True) def stop(self) -> None: """ Stop debouncer and cancel pending timer. Does not flush pending changes. Call flush() first if needed. """ with self.lock: self._stopped = True if self.timer is not None: self.timer.cancel() self.timer = None pending_count = len(self.pending_files) self.pending_files.clear() if pending_count > 0: logger.warning( f"Debouncer stopped with {pending_count} pending file(s) " "(not processed)" ) else: logger.debug("Debouncer stopped") def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - ensures cleanup.""" self.stop() return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server