Skip to main content
Glama
watcher.py•18.5 kB
""" Event-Driven File Watcher for CLAUDE.md Optimization Monitors project configuration files and automatically triggers CLAUDE.md updates when changes are detected. Watched files: - .editorconfig - pyproject.toml - package.json - tsconfig.json - .prettierrc* - .eslintrc* - CLAUDE.md (for manual edit detection) Events: - file_created: New config file added - file_modified: Config file changed - file_deleted: Config file removed - manual_edit: CLAUDE.md manually edited """ import asyncio import logging import hashlib from pathlib import Path from typing import Dict, Set, Optional, Callable, Any, List from dataclasses import dataclass, asdict from datetime import datetime, timedelta import json logger = logging.getLogger(__name__) @dataclass class FileChangeEvent: """Represents a file change event.""" event_type: str # created, modified, deleted file_path: Path timestamp: datetime file_hash: Optional[str] = None previous_hash: Optional[str] = None metadata: Dict[str, Any] = None def to_dict(self) -> Dict[str, Any]: result = asdict(self) result['file_path'] = str(self.file_path) result['timestamp'] = self.timestamp.isoformat() return result @dataclass class WatcherConfig: """Configuration for file watcher.""" watch_patterns: List[str] debounce_seconds: float auto_optimize: bool backup_on_change: bool notification_callback: Optional[Callable] = None class ConfigFileWatcher: """ Event-driven file watcher for project configuration files. Features: - Efficient polling with configurable intervals - File content hashing to detect real changes - Debouncing to avoid excessive triggers - Event callbacks for extensibility - Change history tracking """ # Default files to watch DEFAULT_WATCH_PATTERNS = [ '.editorconfig', 'pyproject.toml', 'package.json', 'tsconfig.json', 'package-lock.json', 'yarn.lock', 'pnpm-lock.yaml', '.prettierrc*', '.eslintrc*', 'jest.config.*', 'vitest.config.*', 'vite.config.*', 'CLAUDE.md', '.claude/**/*.md', 'Makefile', 'docker-compose.yml', 'Dockerfile' ] def __init__( self, project_path: Path, config: Optional[WatcherConfig] = None, memory_system=None, optimizer=None ): """ Initialize file watcher. Args: project_path: Root path of project to watch config: Watcher configuration memory_system: PersistentMemory for event storage optimizer: ContextOptimizer for automatic optimization """ self.project_path = project_path self.memory = memory_system self.optimizer = optimizer # Use provided config or defaults self.config = config or WatcherConfig( watch_patterns=self.DEFAULT_WATCH_PATTERNS, debounce_seconds=2.0, auto_optimize=True, backup_on_change=True ) # Track file states self._file_hashes: Dict[Path, str] = {} self._pending_events: Dict[Path, FileChangeEvent] = {} self._last_process_time: Dict[Path, datetime] = {} # Event handlers self._event_handlers: Dict[str, List[Callable]] = { 'file_created': [], 'file_modified': [], 'file_deleted': [], 'manual_edit': [] } # Watcher state self._running = False self._watch_task: Optional[asyncio.Task] = None # Statistics self._stats = { 'events_detected': 0, 'events_processed': 0, 'optimizations_triggered': 0, 'errors': 0 } async def start(self): """Start watching for file changes.""" if self._running: logger.warning("Watcher already running") return self._running = True # Initialize file hashes await self._initialize_hashes() # Start watch loop self._watch_task = asyncio.create_task(self._watch_loop()) logger.info(f"Started watching {self.project_path} for config changes") async def stop(self): """Stop watching for file changes.""" if not self._running: return self._running = False if self._watch_task: self._watch_task.cancel() try: await self._watch_task except asyncio.CancelledError: pass logger.info("Stopped file watcher") def register_handler(self, event_type: str, handler: Callable): """ Register an event handler. Args: event_type: Type of event (file_created, file_modified, etc.) handler: Async callable to handle event """ if event_type not in self._event_handlers: raise ValueError(f"Unknown event type: {event_type}") self._event_handlers[event_type].append(handler) logger.debug(f"Registered handler for {event_type}") async def _initialize_hashes(self): """Initialize file content hashes.""" watched_files = self._get_watched_files() for file_path in watched_files: if file_path.exists(): file_hash = self._calculate_hash(file_path) self._file_hashes[file_path] = file_hash logger.info(f"Initialized {len(self._file_hashes)} file hashes") def _get_watched_files(self) -> Set[Path]: """Get all files matching watch patterns.""" watched_files = set() for pattern in self.config.watch_patterns: if '*' in pattern: # Glob pattern matches = self.project_path.glob(pattern) watched_files.update(matches) else: # Direct file file_path = self.project_path / pattern if file_path.exists(): watched_files.add(file_path) return watched_files def _calculate_hash(self, file_path: Path) -> str: """Calculate SHA256 hash of file content.""" try: hasher = hashlib.sha256() with open(file_path, 'rb') as f: # Read in chunks for memory efficiency for chunk in iter(lambda: f.read(4096), b''): hasher.update(chunk) return hasher.hexdigest() except Exception as e: logger.error(f"Failed to hash {file_path}: {e}") return "" async def _watch_loop(self): """Main watch loop - polls for file changes.""" poll_interval = 1.0 # Check every second while self._running: try: await self._check_for_changes() await self._process_pending_events() await asyncio.sleep(poll_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in watch loop: {e}") self._stats['errors'] += 1 await asyncio.sleep(poll_interval) async def _check_for_changes(self): """Check all watched files for changes.""" current_files = self._get_watched_files() previous_files = set(self._file_hashes.keys()) # Check for new files new_files = current_files - previous_files for file_path in new_files: await self._handle_file_created(file_path) # Check for deleted files deleted_files = previous_files - current_files for file_path in deleted_files: await self._handle_file_deleted(file_path) # Check for modified files for file_path in current_files & previous_files: current_hash = self._calculate_hash(file_path) previous_hash = self._file_hashes.get(file_path) if current_hash != previous_hash: await self._handle_file_modified(file_path, previous_hash, current_hash) async def _handle_file_created(self, file_path: Path): """Handle file creation event.""" file_hash = self._calculate_hash(file_path) self._file_hashes[file_path] = file_hash event = FileChangeEvent( event_type='file_created', file_path=file_path, timestamp=datetime.now(), file_hash=file_hash, metadata={'size': file_path.stat().st_size} ) self._pending_events[file_path] = event self._stats['events_detected'] += 1 logger.info(f"Detected new file: {file_path.name}") async def _handle_file_modified( self, file_path: Path, previous_hash: str, current_hash: str ): """Handle file modification event.""" self._file_hashes[file_path] = current_hash # Check if this is a manual edit to CLAUDE.md event_type = 'manual_edit' if file_path.name == 'CLAUDE.md' else 'file_modified' event = FileChangeEvent( event_type=event_type, file_path=file_path, timestamp=datetime.now(), file_hash=current_hash, previous_hash=previous_hash, metadata={ 'size': file_path.stat().st_size, 'mtime': datetime.fromtimestamp(file_path.stat().st_mtime) } ) self._pending_events[file_path] = event self._stats['events_detected'] += 1 logger.info(f"Detected change in: {file_path.name}") async def _handle_file_deleted(self, file_path: Path): """Handle file deletion event.""" previous_hash = self._file_hashes.pop(file_path, None) event = FileChangeEvent( event_type='file_deleted', file_path=file_path, timestamp=datetime.now(), previous_hash=previous_hash ) self._pending_events[file_path] = event self._stats['events_detected'] += 1 logger.info(f"Detected deletion: {file_path.name}") async def _process_pending_events(self): """Process pending events after debounce period.""" now = datetime.now() debounce_delta = timedelta(seconds=self.config.debounce_seconds) events_to_process = [] for file_path, event in list(self._pending_events.items()): # Check if event is old enough to process if now - event.timestamp >= debounce_delta: events_to_process.append(event) del self._pending_events[file_path] # Process events for event in events_to_process: await self._process_event(event) async def _process_event(self, event: FileChangeEvent): """Process a file change event.""" try: # Store event in memory if self.memory: await self.memory.store( key=f"file_event_{event.timestamp.timestamp()}", value=event.to_dict(), namespace="file_events", ttl_seconds=86400 * 30 # Keep for 30 days ) # Call registered handlers handlers = self._event_handlers.get(event.event_type, []) for handler in handlers: try: await handler(event) except Exception as e: logger.error(f"Handler error for {event.event_type}: {e}") # Auto-optimize if configured if self.config.auto_optimize and self._should_trigger_optimization(event): await self._trigger_optimization(event) self._stats['events_processed'] += 1 # Notification callback if self.config.notification_callback: await self.config.notification_callback(event) except Exception as e: logger.error(f"Failed to process event: {e}") self._stats['errors'] += 1 def _should_trigger_optimization(self, event: FileChangeEvent) -> bool: """Determine if event should trigger CLAUDE.md optimization.""" # Don't optimize on CLAUDE.md manual edits (would be circular) if event.file_path.name == 'CLAUDE.md': return False # Optimize on config file changes config_files = { '.editorconfig', 'pyproject.toml', 'package.json', 'tsconfig.json', '.prettierrc', '.eslintrc' } file_name = event.file_path.name # Check exact match or pattern match if file_name in config_files: return True # Check patterns if any(file_name.startswith(cf) for cf in config_files): return True return False async def _trigger_optimization(self, event: FileChangeEvent): """Trigger CLAUDE.md optimization based on config change.""" if not self.optimizer: logger.warning("No optimizer configured - skipping optimization") return try: # Check rate limiting last_optimize = self._last_process_time.get(self.project_path) if last_optimize: time_since_last = datetime.now() - last_optimize if time_since_last < timedelta(minutes=5): logger.debug("Skipping optimization - too soon since last") return logger.info(f"Triggering optimization due to {event.file_path.name} change") # Find CLAUDE.md claudemd_path = self.project_path / 'CLAUDE.md' if not claudemd_path.exists(): logger.info("No CLAUDE.md found - creating new one") # Could trigger initial generation here return # Read current content current_content = claudemd_path.read_text() # Backup if configured if self.config.backup_on_change: backup_path = claudemd_path.with_suffix( f'.backup.{datetime.now().strftime("%Y%m%d_%H%M%S")}.md' ) backup_path.write_text(current_content) logger.info(f"Created backup: {backup_path.name}") # Optimize content optimized_content, metrics = self.optimizer.optimize_content( current_content, target_tokens=self.optimizer.TOKEN_BUDGET['project'] ) # Write optimized content claudemd_path.write_text(optimized_content) # Update timestamp self._last_process_time[self.project_path] = datetime.now() self._stats['optimizations_triggered'] += 1 logger.info( f"Optimized CLAUDE.md: {metrics.token_count} tokens " f"({metrics.compression_ratio:.2f}x compression)" ) # Store optimization metrics if self.memory: await self.memory.store( key=f"optimization_{datetime.now().timestamp()}", value={ 'trigger_event': event.to_dict(), 'metrics': metrics.to_dict(), 'project_path': str(self.project_path) }, namespace="optimizations", ttl_seconds=86400 * 90 # Keep for 90 days ) except Exception as e: logger.error(f"Optimization failed: {e}") self._stats['errors'] += 1 async def detect_manual_edits( self, current_content: str, previous_content: str ) -> Dict[str, Any]: """ Detect and analyze manual edits to CLAUDE.md. This enables learning from user corrections. Returns: Dictionary with edit analysis """ import difflib # Calculate diff diff = list(difflib.unified_diff( previous_content.splitlines(keepends=True), current_content.splitlines(keepends=True), lineterm='' )) # Analyze changes additions = [] deletions = [] for line in diff: if line.startswith('+') and not line.startswith('+++'): additions.append(line[1:].strip()) elif line.startswith('-') and not line.startswith('---'): deletions.append(line[1:].strip()) # Detect patterns in edits patterns = { 'preference_changes': [], 'new_sections': [], 'removed_sections': [], 'style_changes': [] } # Check for preference changes (e.g., "use X not Y") for addition in additions: if any(kw in addition.lower() for kw in ['use', 'prefer', 'always', 'never']): patterns['preference_changes'].append(addition) # Check for new sections for addition in additions: if addition.startswith('##'): patterns['new_sections'].append(addition[2:].strip()) # Check for removed sections for deletion in deletions: if deletion.startswith('##'): patterns['removed_sections'].append(deletion[2:].strip()) return { 'total_additions': len(additions), 'total_deletions': len(deletions), 'patterns': patterns, 'diff_lines': len(diff), 'significant': len(additions) > 3 or len(deletions) > 3 } def get_statistics(self) -> Dict[str, Any]: """Get watcher statistics.""" return { **self._stats, 'watching': self._running, 'files_tracked': len(self._file_hashes), 'pending_events': len(self._pending_events), 'project_path': str(self.project_path) } async def force_optimization(self): """Force immediate optimization regardless of debounce.""" logger.info("Forcing CLAUDE.md optimization") # Create synthetic event claudemd_path = self.project_path / 'CLAUDE.md' event = FileChangeEvent( event_type='file_modified', file_path=claudemd_path, timestamp=datetime.now() ) await self._trigger_optimization(event)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/airmcp-com/mcp-standards'

If you have feedback or need assistance with the MCP directory API, please join our Discord server