We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Debouncing logic for file change events.
"""
import logging
import threading
from typing import Callable, Set, Optional
logger = logging.getLogger(__name__)
class FileChangeDebouncer:
"""
Accumulates file changes and triggers callback after inactivity period.
Thread-safe debouncing mechanism:
- Accumulates file paths in a set (deduplicates)
- Resets timer on each new event
- Calls callback after delay_seconds of inactivity
- Ensures callback is only executed once per batch
Example:
def process_files(files: Set[str]):
print(f"Processing {len(files)} files: {files}")
debouncer = FileChangeDebouncer(
callback=process_files,
delay_seconds=5.0
)
# Add changes as they occur
debouncer.add_change('/path/to/file1.py')
debouncer.add_change('/path/to/file2.py')
debouncer.add_change('/path/to/file1.py') # Deduplicated
# After 5 seconds of no new changes:
# -> process_files({'/path/to/file1.py', '/path/to/file2.py'})
"""
def __init__(
self,
callback: Callable[[Set[str]], None],
delay_seconds: float
):
"""
Initialize debouncer.
Args:
callback: Function to call with accumulated file paths
delay_seconds: Seconds to wait after last change before firing
"""
self.callback = callback
self.delay_seconds = delay_seconds
self.pending_files: Set[str] = set()
self.timer: Optional[threading.Timer] = None
self.lock = threading.Lock()
self._stopped = False
logger.debug(f"FileChangeDebouncer initialized with {delay_seconds}s delay")
def add_change(self, file_path: str) -> None:
"""
Add file to pending changes and reset timer.
Thread-safe. Can be called from multiple threads.
Args:
file_path: Absolute path to changed file
"""
if self._stopped:
logger.debug(f"Debouncer stopped, ignoring change: {file_path}")
return
with self.lock:
self.pending_files.add(file_path)
self._reset_timer()
logger.debug(
f"Added change: {file_path} "
f"(total pending: {len(self.pending_files)})"
)
def _reset_timer(self) -> None:
"""
Cancel existing timer and start new one.
Must be called with lock held.
"""
# Cancel existing timer
if self.timer is not None:
self.timer.cancel()
# Start new timer
self.timer = threading.Timer(
self.delay_seconds,
self._fire_callback
)
self.timer.daemon = True # Don't block program exit
self.timer.start()
logger.debug(f"Timer reset: will fire in {self.delay_seconds}s")
def _fire_callback(self) -> None:
"""
Execute callback with accumulated changes.
Runs in timer thread. Acquires lock only briefly to copy files.
"""
# Get copy of pending files while holding lock
with self.lock:
if not self.pending_files or self._stopped:
return
files_to_process = self.pending_files.copy()
self.pending_files.clear()
self.timer = None
# Execute callback outside lock to avoid blocking add_change
logger.info(
f"Debounce timer fired: processing {len(files_to_process)} file(s)"
)
try:
self.callback(files_to_process)
logger.info(f"Callback completed successfully")
except Exception as e:
logger.error(
f"Error in debounce callback: {e}",
exc_info=True
)
def flush(self) -> None:
"""
Immediately process pending changes without waiting for timer.
Useful for graceful shutdown or testing.
"""
with self.lock:
# Cancel timer if running
if self.timer is not None:
self.timer.cancel()
self.timer = None
# Process pending files if any
if self.pending_files:
files_to_process = self.pending_files.copy()
self.pending_files.clear()
if files_to_process:
logger.info(f"Flushing {len(files_to_process)} pending file(s)")
try:
self.callback(files_to_process)
except Exception as e:
logger.error(f"Error in flush callback: {e}", exc_info=True)
def stop(self) -> None:
"""
Stop debouncer and cancel pending timer.
Does not flush pending changes. Call flush() first if needed.
"""
with self.lock:
self._stopped = True
if self.timer is not None:
self.timer.cancel()
self.timer = None
pending_count = len(self.pending_files)
self.pending_files.clear()
if pending_count > 0:
logger.warning(
f"Debouncer stopped with {pending_count} pending file(s) "
"(not processed)"
)
else:
logger.debug("Debouncer stopped")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - ensures cleanup."""
self.stop()
return False