Skip to main content
Glama

Codebase MCP Server

by Ravenight13
scanner.py13.9 kB
"""File scanner service with ignore pattern support. Scans repository directories, respects .gitignore/.mcpignore patterns, and detects file changes by comparing filesystem timestamps with database state. Constitutional Compliance: - Principle IV: Performance (async operations, cached ignore patterns, <5s for 10K files) - Principle V: Production quality (comprehensive error handling, graceful degradation) - Principle VIII: Type safety (full mypy --strict compliance) Key Features: - Respects .gitignore patterns using pathspec library - Supports custom .mcpignore patterns - Change detection via mtime comparison - Async operations for I/O performance - Cached ignore pattern parsing """ from __future__ import annotations import asyncio import hashlib import uuid from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Final, Sequence from pathspec import PathSpec from pathspec.patterns.gitwildmatch import GitWildMatchPattern from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from src.mcp.mcp_logging import get_logger from src.models import CodeFile # ============================================================================== # Constants # ============================================================================== logger = get_logger(__name__) # Default ignore patterns (always applied) DEFAULT_IGNORE_PATTERNS: Final[list[str]] = [ # Python bytecode and cache "__pycache__/", "*.pyc", "*.pyo", "*.pyd", # Version control ".git/", ".svn/", ".hg/", # Dependencies "node_modules/", ".venv/", "venv/", ".env", "*.egg-info/", # Build artifacts "build/", "dist/", # Cache and coverage ".ruff_cache/", ".pytest_cache/", "htmlcov/", ".coverage", ".coverage.*", # Binary files - Images "*.png", "*.jpg", "*.jpeg", "*.gif", "*.webp", "*.ico", "*.bmp", "*.svg", # Binary files - Compiled/Native "*.so", "*.dylib", "*.dll", "*.exe", "*.bin", "*.obj", "*.o", # Binary files - Archives "*.zip", "*.tar", "*.tar.gz", "*.tgz", "*.rar", "*.7z", # Binary files - Media "*.mp4", "*.mp3", "*.wav", "*.avi", "*.mov", # System files ".DS_Store", "Thumbs.db", ] # Ignore file names to load IGNORE_FILE_NAMES: Final[list[str]] = [".gitignore", ".mcpignore"] # ============================================================================== # Data Structures # ============================================================================== @dataclass(frozen=True) class ChangeSet: """Represents file changes detected during repository scan. Attributes: added: Files that don't exist in database modified: Files with newer mtime than database record deleted: Files in database that no longer exist on filesystem """ added: list[Path] modified: list[Path] deleted: list[Path] def __post_init__(self) -> None: """Validate ChangeSet invariants.""" # Ensure no overlapping files between categories added_set = set(self.added) modified_set = set(self.modified) deleted_set = set(self.deleted) if added_set & modified_set: raise ValueError("Files cannot be both added and modified") if added_set & deleted_set: raise ValueError("Files cannot be both added and deleted") if modified_set & deleted_set: raise ValueError("Files cannot be both modified and deleted") @property def total_changes(self) -> int: """Total number of changed files.""" return len(self.added) + len(self.modified) + len(self.deleted) @property def has_changes(self) -> bool: """Whether any changes were detected.""" return self.total_changes > 0 # ============================================================================== # Ignore Pattern Cache # ============================================================================== class IgnorePatternCache: """Caches parsed ignore patterns for performance. Singleton pattern to avoid re-parsing .gitignore files on each scan. """ _instance: IgnorePatternCache | None = None _cache: dict[Path, PathSpec] def __new__(cls) -> IgnorePatternCache: """Ensure singleton instance.""" if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._cache = {} return cls._instance def get_or_create(self, repo_path: Path) -> PathSpec: """Get cached PathSpec or create new one. Args: repo_path: Root path of repository Returns: PathSpec for repository ignore patterns """ if repo_path in self._cache: logger.debug(f"Using cached ignore patterns for {repo_path}") return self._cache[repo_path] logger.debug(f"Building ignore patterns for {repo_path}") pathspec = self._build_pathspec(repo_path) self._cache[repo_path] = pathspec return pathspec def _build_pathspec(self, repo_path: Path) -> PathSpec: """Build PathSpec from ignore files and defaults. Args: repo_path: Root path of repository Returns: PathSpec combining default patterns, .gitignore, and .mcpignore """ patterns: list[str] = DEFAULT_IGNORE_PATTERNS.copy() # Load patterns from .gitignore and .mcpignore files for ignore_file_name in IGNORE_FILE_NAMES: ignore_file = repo_path / ignore_file_name if ignore_file.is_file(): try: content = ignore_file.read_text(encoding="utf-8") # Filter out comments and empty lines file_patterns = [ line.strip() for line in content.splitlines() if line.strip() and not line.strip().startswith("#") ] patterns.extend(file_patterns) logger.debug( f"Loaded {len(file_patterns)} patterns from {ignore_file_name}" ) except Exception as e: logger.warning( f"Failed to read {ignore_file_name}: {e}", extra={ "context": { "ignore_file": str(ignore_file), "error": str(e), } }, ) return PathSpec.from_lines(GitWildMatchPattern, patterns) def clear(self) -> None: """Clear the cache (useful for testing).""" self._cache.clear() # ============================================================================== # Public API # ============================================================================== def is_ignored(file_path: Path, repo_path: Path) -> bool: """Check if file should be ignored based on ignore patterns. Args: file_path: Absolute path to file repo_path: Root path of repository Returns: True if file should be ignored, False otherwise Raises: ValueError: If file_path is not under repo_path """ if not file_path.is_relative_to(repo_path): raise ValueError(f"File {file_path} is not under repository {repo_path}") # Get relative path for pattern matching relative_path = file_path.relative_to(repo_path) # Get cached PathSpec cache = IgnorePatternCache() pathspec = cache.get_or_create(repo_path) # Check if path matches any ignore pattern return pathspec.match_file(str(relative_path)) async def scan_repository(repo_path: Path) -> list[Path]: """Scan repository and return all non-ignored files. Args: repo_path: Root path of repository to scan Returns: List of absolute paths to all non-ignored files Raises: ValueError: If repo_path is not a directory OSError: If repo_path is not accessible Performance: Target: <5 seconds for 10,000 files Uses async I/O for file system operations """ if not repo_path.is_dir(): raise ValueError(f"Repository path is not a directory: {repo_path}") logger.info( "Starting repository scan", extra={ "context": { "repository_path": str(repo_path), "operation": "scan_repository", } }, ) start_time = asyncio.get_event_loop().time() # Scan all files (blocking I/O, but fast enough for our purposes) # For very large repos, could parallelize with concurrent.futures all_files = [f for f in repo_path.rglob("*") if f.is_file()] # Filter ignored files non_ignored_files = [f for f in all_files if not is_ignored(f, repo_path)] elapsed_ms = (asyncio.get_event_loop().time() - start_time) * 1000 logger.info( "Repository scan complete", extra={ "context": { "repository_path": str(repo_path), "total_files": len(all_files), "non_ignored_files": len(non_ignored_files), "ignored_files": len(all_files) - len(non_ignored_files), "duration_ms": elapsed_ms, "operation": "scan_repository", } }, ) return non_ignored_files async def detect_changes( repo_path: Path, db: AsyncSession, repository_id: uuid.UUID ) -> ChangeSet: """Detect file changes since last scan by comparing filesystem with database. Args: repo_path: Root path of repository db: Async database session repository_id: UUID of repository in database Returns: ChangeSet with added, modified, and deleted files Raises: ValueError: If repo_path is not a directory OSError: If repo_path is not accessible Performance: Target: <5 seconds for 10,000 files Uses async database queries for state comparison """ logger.info( "Starting change detection", extra={ "context": { "repository_path": str(repo_path), "repository_id": repository_id, "operation": "detect_changes", } }, ) start_time = asyncio.get_event_loop().time() # Get current filesystem state current_files = await scan_repository(repo_path) current_state: dict[Path, float] = {} for file_path in current_files: try: stat = file_path.stat() current_state[file_path] = stat.st_mtime except OSError as e: logger.warning( f"Failed to stat file: {file_path}", extra={"context": {"file_path": str(file_path), "error": str(e)}}, ) # Get database state (only non-deleted files) result = await db.execute( select(CodeFile.path, CodeFile.modified_at).where( CodeFile.repository_id == repository_id, CodeFile.is_deleted == False # noqa: E712 ) ) db_records = result.all() db_state: dict[Path, float] = { Path(path): modified_at.timestamp() for path, modified_at in db_records } # Detect changes added: list[Path] = [] modified: list[Path] = [] deleted: list[Path] = [] # Find added and modified files for file_path, mtime in current_state.items(): if file_path not in db_state: added.append(file_path) elif mtime > db_state[file_path]: # File modified if mtime is newer modified.append(file_path) # Find deleted files deleted = list(set(db_state.keys()) - set(current_state.keys())) changeset = ChangeSet(added=added, modified=modified, deleted=deleted) elapsed_ms = (asyncio.get_event_loop().time() - start_time) * 1000 logger.info( "Change detection complete", extra={ "context": { "repository_path": str(repo_path), "repository_id": repository_id, "added_files": len(added), "modified_files": len(modified), "deleted_files": len(deleted), "total_changes": changeset.total_changes, "duration_ms": elapsed_ms, "operation": "detect_changes", } }, ) return changeset async def compute_file_hash(file_path: Path) -> str: """Compute SHA-256 hash of file content. Args: file_path: Path to file Returns: Hex string of SHA-256 hash (64 characters) Raises: FileNotFoundError: If file does not exist OSError: If file is not accessible """ hasher = hashlib.sha256() # Read file in chunks for memory efficiency chunk_size = 65536 # 64KB chunks try: with open(file_path, "rb") as f: while chunk := f.read(chunk_size): hasher.update(chunk) except FileNotFoundError: logger.error( f"File not found: {file_path}", extra={"context": {"file_path": str(file_path)}}, ) raise except OSError as e: logger.error( f"Failed to read file: {file_path}", extra={"context": {"file_path": str(file_path), "error": str(e)}}, ) raise return hasher.hexdigest() # ============================================================================== # Module Exports # ============================================================================== __all__ = [ "ChangeSet", "scan_repository", "detect_changes", "is_ignored", "compute_file_hash", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server