Skip to main content
Glama
watch_service.py21.1 kB
"""Watch service for Basic Memory.""" import asyncio import os from collections import defaultdict from datetime import datetime from pathlib import Path from typing import List, Optional, Set, Sequence, Callable, Awaitable, TYPE_CHECKING if TYPE_CHECKING: from basic_memory.sync.sync_service import SyncService from basic_memory.config import BasicMemoryConfig, WATCH_STATUS_JSON from basic_memory.ignore_utils import load_gitignore_patterns, should_ignore_path from basic_memory.models import Project from basic_memory.repository import ProjectRepository from loguru import logger from pydantic import BaseModel from rich.console import Console from watchfiles import awatch from watchfiles.main import FileChange, Change import time class WatchEvent(BaseModel): timestamp: datetime path: str action: str # new, delete, etc status: str # success, error checksum: Optional[str] error: Optional[str] = None class WatchServiceState(BaseModel): # Service status running: bool = False start_time: datetime = datetime.now() # Use directly with Pydantic model pid: int = os.getpid() # Use directly with Pydantic model # Stats error_count: int = 0 last_error: Optional[datetime] = None last_scan: Optional[datetime] = None # File counts synced_files: int = 0 # Recent activity recent_events: List[WatchEvent] = [] # Use directly with Pydantic model def add_event( self, path: str, action: str, status: str, checksum: Optional[str] = None, error: Optional[str] = None, ) -> WatchEvent: event = WatchEvent( timestamp=datetime.now(), path=path, action=action, status=status, checksum=checksum, error=error, ) self.recent_events.insert(0, event) self.recent_events = self.recent_events[:100] # Keep last 100 return event def record_error(self, error: str): self.error_count += 1 self.add_event(path="", action="sync", status="error", error=error) self.last_error = datetime.now() # Type alias for sync service factory function SyncServiceFactory = Callable[[Project], Awaitable["SyncService"]] class WatchService: def __init__( self, app_config: BasicMemoryConfig, project_repository: ProjectRepository, quiet: bool = False, sync_service_factory: Optional[SyncServiceFactory] = None, ): self.app_config = app_config self.project_repository = project_repository self.state = WatchServiceState() self.status_path = Path.home() / ".basic-memory" / WATCH_STATUS_JSON self.status_path.parent.mkdir(parents=True, exist_ok=True) self._ignore_patterns_cache: dict[Path, Set[str]] = {} self._sync_service_factory = sync_service_factory # quiet mode for mcp so it doesn't mess up stdout self.console = Console(quiet=quiet) async def _get_sync_service(self, project: Project) -> "SyncService": """Get sync service for a project, using factory if provided.""" if self._sync_service_factory: return await self._sync_service_factory(project) # Fall back to default factory from basic_memory.sync.sync_service import get_sync_service return await get_sync_service(project) async def _schedule_restart(self, stop_event: asyncio.Event): """Schedule a restart of the watch service after the configured interval.""" await asyncio.sleep(self.app_config.watch_project_reload_interval) stop_event.set() def _get_ignore_patterns(self, project_path: Path) -> Set[str]: """Get or load ignore patterns for a project path.""" if project_path not in self._ignore_patterns_cache: self._ignore_patterns_cache[project_path] = load_gitignore_patterns(project_path) return self._ignore_patterns_cache[project_path] async def _watch_projects_cycle(self, projects: Sequence[Project], stop_event: asyncio.Event): """Run one cycle of watching the given projects until stop_event is set.""" project_paths = [project.path for project in projects] async for changes in awatch( *project_paths, debounce=self.app_config.sync_delay, watch_filter=self.filter_changes, recursive=True, stop_event=stop_event, ): # group changes by project and filter using ignore patterns project_changes = defaultdict(list) for change, path in changes: for project in projects: if self.is_project_path(project, path): # Check if the file should be ignored based on gitignore patterns project_path = Path(project.path) file_path = Path(path) ignore_patterns = self._get_ignore_patterns(project_path) if should_ignore_path(file_path, project_path, ignore_patterns): logger.trace( f"Ignoring watched file change: {file_path.relative_to(project_path)}" ) continue project_changes[project].append((change, path)) break # create coroutines to handle changes change_handlers = [ self.handle_changes(project, changes) # pyright: ignore for project, changes in project_changes.items() ] # process changes await asyncio.gather(*change_handlers) async def run(self): # pragma: no cover """Watch for file changes and sync them""" self.state.running = True self.state.start_time = datetime.now() await self.write_status() logger.info( "Watch service started", f"debounce_ms={self.app_config.sync_delay}", f"pid={os.getpid()}", ) try: while self.state.running: # Clear ignore patterns cache to pick up any .gitignore changes self._ignore_patterns_cache.clear() # Reload projects to catch any new/removed projects projects = await self.project_repository.get_active_projects() project_paths = [project.path for project in projects] logger.debug(f"Starting watch cycle for directories: {project_paths}") # Create stop event for this watch cycle stop_event = asyncio.Event() # Schedule restart after configured interval to reload projects timer_task = asyncio.create_task(self._schedule_restart(stop_event)) try: await self._watch_projects_cycle(projects, stop_event) except Exception as e: logger.exception("Watch service error during cycle", error=str(e)) self.state.record_error(str(e)) await self.write_status() # Continue to next cycle instead of exiting await asyncio.sleep(5) # Brief pause before retry finally: # Cancel timer task if it's still running if not timer_task.done(): timer_task.cancel() try: await timer_task except asyncio.CancelledError: pass except Exception as e: logger.exception("Watch service error", error=str(e)) self.state.record_error(str(e)) await self.write_status() raise finally: logger.info( "Watch service stopped", f"runtime_seconds={int((datetime.now() - self.state.start_time).total_seconds())}", ) self.state.running = False await self.write_status() def filter_changes(self, change: Change, path: str) -> bool: # pragma: no cover """Filter to only watch non-hidden files and directories. Returns: True if the file should be watched, False if it should be ignored """ # Skip hidden directories and files path_parts = Path(path).parts for part in path_parts: if part.startswith("."): return False # Skip temp files used in atomic operations if path.endswith(".tmp"): return False return True async def write_status(self): """Write current state to status file""" self.status_path.write_text(WatchServiceState.model_dump_json(self.state, indent=2)) def is_project_path(self, project: Project, path): """ Checks if path is a subdirectory or file within a project """ project_path = Path(project.path).resolve() sub_path = Path(path).resolve() return project_path in sub_path.parents async def handle_changes(self, project: Project, changes: Set[FileChange]) -> None: """Process a batch of file changes""" # Check if project still exists in configuration before processing # This prevents deleted projects from being recreated by background sync from basic_memory.config import ConfigManager config_manager = ConfigManager() if ( project.name not in config_manager.projects and project.permalink not in config_manager.projects ): logger.info( f"Skipping sync for deleted project: {project.name}, change_count={len(changes)}" ) return sync_service = await self._get_sync_service(project) file_service = sync_service.file_service start_time = time.time() directory = Path(project.path).resolve() logger.info( f"Processing project: {project.name} changes, change_count={len(changes)}, directory={directory}" ) # Group changes by type adds: List[str] = [] deletes: List[str] = [] modifies: List[str] = [] for change, path in changes: # convert to relative path relative_path = Path(path).relative_to(directory).as_posix() # Skip .tmp files - they're temporary and shouldn't be synced if relative_path.endswith(".tmp"): continue if change == Change.added: adds.append(relative_path) elif change == Change.deleted: deletes.append(relative_path) elif change == Change.modified: modifies.append(relative_path) logger.debug( f"Grouped file changes, added={len(adds)}, deleted={len(deletes)}, modified={len(modifies)}" ) # because of our atomic writes on updates, an add may be an existing file for added_path in adds: # pragma: no cover TODO add test entity = await sync_service.entity_repository.get_by_file_path(added_path) if entity is not None: logger.debug(f"Existing file will be processed as modified, path={added_path}") adds.remove(added_path) modifies.append(added_path) # Track processed files to avoid duplicates processed: Set[str] = set() # First handle potential moves for added_path in adds: if added_path in processed: continue # pragma: no cover # Skip directories for added paths # We don't need to process directories, only the files inside them # This prevents errors when trying to compute checksums or read directories as files added_full_path = directory / added_path if not added_full_path.exists() or added_full_path.is_dir(): logger.debug("Skipping non-existent or directory path", path=added_path) processed.add(added_path) continue for deleted_path in deletes: if deleted_path in processed: continue # pragma: no cover # Skip directories for deleted paths (based on entity type in db) deleted_entity = await sync_service.entity_repository.get_by_file_path(deleted_path) if deleted_entity is None: # If this was a directory, it wouldn't have an entity logger.debug("Skipping unknown path for move detection", path=deleted_path) continue if added_path != deleted_path: # Compare checksums to detect moves try: added_checksum = await file_service.compute_checksum(added_path) if deleted_entity and deleted_entity.checksum == added_checksum: await sync_service.handle_move(deleted_path, added_path) self.state.add_event( path=f"{deleted_path} -> {added_path}", action="moved", status="success", ) self.console.print(f"[blue]→[/blue] {deleted_path} → {added_path}") logger.info(f"move: {deleted_path} -> {added_path}") processed.add(added_path) processed.add(deleted_path) break except Exception as e: # pragma: no cover logger.warning( "Error checking for move", f"old_path={deleted_path}", f"new_path={added_path}", f"error={str(e)}", ) # Handle remaining changes - group them by type for concise output moved_count = len([p for p in processed if p in deletes or p in adds]) delete_count = 0 add_count = 0 modify_count = 0 # Process deletes for path in deletes: if path not in processed: # Check if file still exists on disk (vim atomic write edge case) full_path = directory / path if full_path.exists() and full_path.is_file(): # File still exists despite DELETE event - treat as modification logger.debug( "File exists despite DELETE event, treating as modification", path=path ) entity, checksum = await sync_service.sync_file(path, new=False) self.state.add_event( path=path, action="modified", status="success", checksum=checksum ) self.console.print(f"[yellow]✎[/yellow] {path} (atomic write)") logger.info(f"atomic write detected: {path}") processed.add(path) modify_count += 1 else: # Check if this was a directory - skip if so # (we can't tell if the deleted path was a directory since it no longer exists, # so we check if there's an entity in the database for it) entity = await sync_service.entity_repository.get_by_file_path(path) if entity is None: # No entity means this was likely a directory - skip it logger.debug( f"Skipping deleted path with no entity (likely directory), path={path}" ) processed.add(path) continue # File truly deleted logger.debug("Processing deleted file", path=path) await sync_service.handle_delete(path) self.state.add_event(path=path, action="deleted", status="success") self.console.print(f"[red]✕[/red] {path}") logger.info(f"deleted: {path}") processed.add(path) delete_count += 1 # Process adds for path in adds: if path not in processed: # Skip directories - only process files full_path = directory / path if not full_path.exists() or full_path.is_dir(): logger.debug( f"Skipping non-existent or directory path, path={path}" ) # pragma: no cover processed.add(path) # pragma: no cover continue # pragma: no cover logger.debug(f"Processing new file, path={path}") entity, checksum = await sync_service.sync_file(path, new=True) if checksum: self.state.add_event( path=path, action="new", status="success", checksum=checksum ) self.console.print(f"[green]✓[/green] {path}") logger.info( "new file processed", f"path={path}", f"checksum={checksum}", ) processed.add(path) add_count += 1 else: # pragma: no cover logger.warning(f"Error syncing new file, path={path}") # pragma: no cover self.console.print( f"[orange]?[/orange] Error syncing: {path}" ) # pragma: no cover # Process modifies - detect repeats last_modified_path = None repeat_count = 0 for path in modifies: if path not in processed: # Skip directories - only process files full_path = directory / path if not full_path.exists() or full_path.is_dir(): logger.debug("Skipping non-existent or directory path", path=path) processed.add(path) continue logger.debug(f"Processing modified file: path={path}") entity, checksum = await sync_service.sync_file(path, new=False) self.state.add_event( path=path, action="modified", status="success", checksum=checksum ) # Check if this is a repeat of the last modified file if path == last_modified_path: # pragma: no cover repeat_count += 1 # pragma: no cover # Only show a message for the first repeat if repeat_count == 1: # pragma: no cover self.console.print( f"[yellow]...[/yellow] Repeated changes to {path}" ) # pragma: no cover else: # haven't processed this file self.console.print(f"[yellow]✎[/yellow] {path}") logger.info(f"modified: {path}") last_modified_path = path repeat_count = 0 modify_count += 1 logger.debug( # pragma: no cover "Modified file processed, " f"path={path} " f"entity_id={entity.id if entity else None} " f"checksum={checksum}", ) processed.add(path) # Add a concise summary instead of a divider if processed: changes = [] # pyright: ignore if add_count > 0: changes.append(f"[green]{add_count} added[/green]") # pyright: ignore if modify_count > 0: changes.append(f"[yellow]{modify_count} modified[/yellow]") # pyright: ignore if moved_count > 0: changes.append(f"[blue]{moved_count} moved[/blue]") # pyright: ignore if delete_count > 0: changes.append(f"[red]{delete_count} deleted[/red]") # pyright: ignore if changes: self.console.print(f"{', '.join(changes)}", style="dim") # pyright: ignore logger.info(f"changes: {len(changes)}") duration_ms = int((time.time() - start_time) * 1000) self.state.last_scan = datetime.now() self.state.synced_files += len(processed) logger.info( "File change processing completed, " f"processed_files={len(processed)}, " f"total_synced_files={self.state.synced_files}, " f"duration_ms={duration_ms}" ) await self.write_status()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/basicmachines-co/basic-memory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server