import logging import math import os import time from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from threading import Lock, Timer from typing import Literal import more_itertools from git import Repo from pydantic import ( BaseModel, ) from import ( DirCreatedEvent, DirDeletedEvent, DirModifiedEvent, FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileSystemEventHandler, ) from watchdog.observers import Observer from mcpunk.file_chunk import Chunk, ChunkCategory from mcpunk.file_chunkers import ( BaseChunker, MarkdownChunker, PythonChunker, VueChunker, WholeFileChunker, ) ALL_CHUNKERS: list[type[BaseChunker]] = [ PythonChunker, MarkdownChunker, VueChunker, # Want the WholeFileChunker to be last as it's more of a "fallback" chunker WholeFileChunker, ] logger = logging.getLogger(__name__) class _ProjectFileHandler(FileSystemEventHandler): def __init__( self, project: "Project", file_watch_refresh_freq_seconds: float = 0.1, ) -> None: self.project = project self._project_lock = Lock() self._paths_pending_refresh: set[Path] = set() self._paths_pending_refresh_lock = Lock() self._refresh_freq_sec = file_watch_refresh_freq_seconds self._timer: Timer | None = None self._schedule_refresh() def _schedule_refresh(self) -> None: if self._timer is not None: self._timer.cancel() self._timer = Timer(self._refresh_freq_sec, self._refresh_paths) self._timer.daemon = True self._timer.start() def _refresh_paths(self) -> None: with self._project_lock: try: with self._paths_pending_refresh_lock: paths_pending_refresh = self._paths_pending_refresh.copy() self._paths_pending_refresh.clear() if paths_pending_refresh:"Refreshing {len(paths_pending_refresh)} paths") _paths_fmt = "\n\t".join(str(x) for x in paths_pending_refresh) logger.debug(f"Refreshing\n\t{_paths_fmt}") paths_to_delete = {x for x in paths_pending_refresh if not x.exists()} for p in paths_to_delete: if p.absolute() in self.project.file_map: del self.project.file_map[p.absolute()] dir_paths = {x for x in paths_pending_refresh if x.exists() and x.is_dir()} paths_to_really_refresh = paths_pending_refresh - dir_paths - paths_to_delete self.project.load_files(list(paths_to_really_refresh)) except Exception: logger.exception("Error refreshing paths") finally: self._schedule_refresh() def _path_event( self, path: Path, *, action: Literal["modified", "created", "deleted"], ) -> None: path = path.absolute() logger.debug(f"watchdog says {action}: {path}") if self._should_process(path): logger.debug(f"New path pending refresh: {path}") with self._paths_pending_refresh_lock: self._paths_pending_refresh.add(path) else: logger.debug(f"Ignoring {action} for {path}") def _should_process(self, path: Path) -> bool: if self.project.git_repo is None: return True assert path.is_absolute() # We don't want to exclude non-existent files, as they may have been deleted. if path.exists() and path.is_dir(): return False # Special case this, as we def don't want it and it seems that # `git check-ignore` doesn't consider it as ignored. if path.is_relative_to(self.project.root / ".git"): return False try: rel_path = str(path.relative_to(self.project.root)) t1 = time.monotonic() check_ignore_res: str = self.project.git_repo.git.execute( # type: ignore[call-overload] ["git", "check-ignore", str(rel_path)], with_exceptions=False, ) logger.debug(f"git check-ignore took {(time.monotonic() - t1) * 1000:.4f}ms") return check_ignore_res == "" except Exception: logger.exception(f"Error checking git ignore for {path}") return False def on_modified(self, event: FileModifiedEvent | DirModifiedEvent) -> None: self._path_event( Path(self._to_str(event.src_path)), action="modified", ) def on_created(self, event: FileCreatedEvent | DirCreatedEvent) -> None: self._path_event( Path(self._to_str(event.src_path)), action="created", ) def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None: self._path_event( Path(self._to_str(event.src_path)), action="deleted", ) @staticmethod def _to_str(s: str | bytes) -> str: if isinstance(s, bytes): return s.decode("utf-8") return s class File(BaseModel): chunks: list[Chunk] abs_path: Path contents: str ext: str # File extension @classmethod def from_file_contents( cls, source_code: str, file_path: Path, max_chunk_size: int = 10_000, ) -> "File": """Extract all callables, calls and imports from the given source code file.""" chunks: list[Chunk] = [] # Try all eligible chunkers in order until one of them doesn't crash. for chunker in ALL_CHUNKERS: if chunker.can_chunk(source_code, file_path): try: chunks = chunker(source_code, file_path).chunk_file() chunks = list( more_itertools.flatten(x.split(max_size=max_chunk_size) for x in chunks), ) break except Exception: logger.exception(f"Error chunking file {file_path} with {chunker}") return File( chunks=chunks, abs_path=file_path.absolute(), contents=source_code, ext=file_path.suffix, ) def chunks_of_type(self, chunk_type: ChunkCategory) -> list[Chunk]: return [c for c in self.chunks if c.category == chunk_type] class Project: def __init__( self, *, root: Path, files_per_parallel_worker: int = 100, file_watch_refresh_freq_seconds: float = 0.1, max_chunk_size: int = 10_000, ) -> None: self.root = root.expanduser().absolute() self.files_per_parallel_worker = files_per_parallel_worker self.max_chunk_size = max_chunk_size self.file_map: dict[Path, File] = {} git_repo: Repo | None if (root / ".git").exists(): git_repo = Repo(root / ".git") else: git_repo = None self.git_repo = git_repo self._init_from_root_dir(root) # Note potential that if a file is modified here it won't be picked up. = Observer() event_handler=_ProjectFileHandler( self, file_watch_refresh_freq_seconds=file_watch_refresh_freq_seconds, ), path=str(self.root), recursive=True, ) @property def files(self) -> list[File]: return list(self.file_map.values()) def load_files(self, files: list[Path]) -> None: # How many workers to use? _cpu_count = os.cpu_count() or 1 n_workers = math.floor(len(files) / self.files_per_parallel_worker) n_workers = min(n_workers, _cpu_count // 2) # Avoid maxing out the system n_workers = max(n_workers, 1) files_analysed: list[File] if n_workers == 1: files_analysed_maybe_none = [ _analyze_file(file_path, max_chunk_size=self.max_chunk_size) for file_path in files ] files_analysed = [x for x in files_analysed_maybe_none if x is not None] else:"Using {n_workers} workers to process {len(files)} files") files_analysed = [] with ProcessPoolExecutor(max_workers=n_workers) as executor: future_to_file = { executor.submit( _analyze_file, file_path, max_chunk_size=self.max_chunk_size, ): file_path for file_path in files } for future in as_completed(future_to_file): file_path = future_to_file[future] try: result = future.result() if result is not None: files_analysed.append(result) except Exception: logger.exception(f"File {file_path} generated an exception") for file in files_analysed: self.file_map[file.abs_path] = file def _init_from_root_dir(self, root: Path) -> None: if not root.exists(): raise ValueError(f"Root directory {root} does not exist") files: list[Path] = [] if self.git_repo is not None: rel_paths = self.git_repo.git.ls_files().splitlines() files.extend(root / rel_path for rel_path in rel_paths) else: # Exclude specific top-level directories # TODO: make this configurable ignore_dirs = {".venv", "build", ".git", "__pycache__"} for path in root.iterdir(): if path.is_dir() and not in ignore_dirs: files.extend(path.glob("**/*")) # Don't forget files in the root directory itself files.extend(root.glob("*")) files = [file for file in files if file.is_file()] self.load_files(files) def _analyze_file(file_path: Path, max_chunk_size: int = 10_000) -> File | None: try: if not file_path.exists(): logger.warning(f"File {file_path} does not exist") return None if not file_path.is_file(): logger.warning(f"File {file_path} is not a file") return None return File.from_file_contents( file_path.read_text(), file_path, max_chunk_size=max_chunk_size, ) except Exception: logger.exception(f"Error processing file {file_path}") return None