Skip to main content
Glama

Codebase MCP Server

by Ravenight13
indexer.py32.7 kB
"""Repository indexer orchestration service. Coordinates the complete repository indexing workflow by orchestrating: - Scanner service (file discovery and change detection) - Chunker service (AST-based code chunking) - Embedder service (vector embedding generation) - Database persistence (repository, files, chunks with embeddings) Constitutional Compliance: - Principle IV: Performance (60s target for 10K files, batching, async operations) - Principle V: Production quality (error handling, transaction management, retry logic) - Principle VIII: Type safety (full mypy --strict compliance) - Principle II: Local-first (all processing local, no cloud dependencies) Key Features: - Scan → Chunk → Embed → Store pipeline - Incremental updates (only reindex changed files) - Batch processing for performance (100 files/batch, 50 embeddings/batch) - Comprehensive error tracking with partial success support - Performance metrics (files indexed, chunks created, duration) - Force reindex option (reindex all files) Note: Change event and embedding metadata tracking removed in database-per-project refactoring (non-essential analytics not included in simplified schema). """ from __future__ import annotations import asyncio import time from datetime import datetime from pathlib import Path from typing import Final, Iterator, Literal, Sequence, TypeVar from uuid import UUID from pydantic import BaseModel, Field from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from src.config.settings import get_settings from src.mcp.mcp_logging import get_logger from src.models import ( # ChangeEvent, # Removed - non-essential analytics not in database-per-project schema CodeChunk, CodeFile, # EmbeddingMetadata, # Removed - non-essential analytics not in database-per-project schema Repository, ) from src.services.chunker import chunk_files_batch, detect_language from src.services.embedder import generate_embeddings from src.services.scanner import ChangeSet, compute_file_hash, detect_changes, scan_repository # ============================================================================== # Constants # ============================================================================== logger = get_logger(__name__) # Batching configuration FILE_BATCH_SIZE: Final[int] = 100 # Files per chunking batch EMBEDDING_BATCH_SIZE: Final[int] = 5 # Texts per embedding batch - reduced to prevent Ollama overload # Performance targets TARGET_INDEXING_TIME_SECONDS: Final[int] = 60 # 60s for 10K files T = TypeVar("T") # ============================================================================== # Pydantic Models # ============================================================================== class IndexResult(BaseModel): """Result of repository indexing operation. Attributes: repository_id: UUID of indexed repository files_indexed: Number of files processed chunks_created: Number of code chunks created duration_seconds: Total indexing time status: Success status (success/partial/failed) errors: List of error messages encountered """ repository_id: UUID files_indexed: int chunks_created: int duration_seconds: float status: Literal["success", "partial", "failed"] errors: list[str] = Field(default_factory=list) model_config = {"frozen": True} # ============================================================================== # Helper Functions # ============================================================================== def _batch(items: Sequence[T], batch_size: int) -> Iterator[list[T]]: """Split sequence into batches of specified size. Args: items: Sequence to batch batch_size: Number of items per batch Yields: Lists of items (batches) Examples: >>> list(_batch([1, 2, 3, 4, 5], 2)) [[1, 2], [3, 4], [5]] """ for i in range(0, len(items), batch_size): yield list(items[i : i + batch_size]) async def _get_or_create_repository( db: AsyncSession, path: Path, name: str ) -> Repository: """Get existing repository or create new one. Args: db: Async database session path: Absolute path to repository name: Repository display name Returns: Repository instance (existing or newly created) Raises: ValueError: If path is not absolute """ if not path.is_absolute(): raise ValueError(f"Repository path must be absolute: {path}") path_str = str(path) # Try to find existing repository result = await db.execute(select(Repository).where(Repository.path == path_str)) repository = result.scalar_one_or_none() if repository is not None: logger.debug( f"Found existing repository: {repository.id}", extra={"context": {"repository_id": str(repository.id), "path": path_str}}, ) return repository # Create new repository repository = Repository(path=path_str, name=name, is_active=True) db.add(repository) await db.flush() # Get ID without committing logger.info( f"Created new repository: {repository.id}", extra={ "context": { "repository_id": str(repository.id), "path": path_str, "name": name, } }, ) return repository async def _read_file(file_path: Path) -> str: """Read file content as string. Args: file_path: Path to file Returns: File content as UTF-8 string Raises: FileNotFoundError: If file does not exist UnicodeDecodeError: If file is not valid UTF-8 OSError: If file cannot be read """ try: # Use asyncio for non-blocking I/O loop = asyncio.get_event_loop() content = await loop.run_in_executor( None, lambda: file_path.read_text(encoding="utf-8") ) return content except UnicodeDecodeError as e: logger.warning( f"File is not valid UTF-8, skipping: {file_path}", extra={"context": {"file_path": str(file_path), "error": str(e)}}, ) raise except Exception as e: logger.error( f"Failed to read file: {file_path}", extra={"context": {"file_path": str(file_path), "error": str(e)}}, ) raise async def _create_code_files( db: AsyncSession, repository_id: UUID, repo_path: Path, file_paths: list[Path], ) -> list[UUID]: """Create or update CodeFile records for given files. Args: db: Async database session repository_id: UUID of repository repo_path: Root path of repository file_paths: List of absolute file paths Returns: List of CodeFile UUIDs (in same order as file_paths) Raises: ValueError: If any file_path is not under repo_path """ file_ids: list[UUID] = [] for file_path in file_paths: if not file_path.is_relative_to(repo_path): raise ValueError(f"File {file_path} is not under repository {repo_path}") relative_path = str(file_path.relative_to(repo_path)) path_str = str(file_path) # Get file metadata stat = file_path.stat() size_bytes = stat.st_size modified_at = datetime.utcfromtimestamp(stat.st_mtime) # Compute content hash content_hash = await compute_file_hash(file_path) # Detect language language = detect_language(file_path) # Check if file already exists result = await db.execute( select(CodeFile).where( CodeFile.repository_id == repository_id, CodeFile.relative_path == relative_path, ) ) existing_file = result.scalar_one_or_none() if existing_file is not None: # Update existing file existing_file.path = path_str existing_file.content_hash = content_hash existing_file.size_bytes = size_bytes existing_file.language = language existing_file.modified_at = modified_at existing_file.indexed_at = datetime.utcnow() existing_file.is_deleted = False existing_file.deleted_at = None file_ids.append(existing_file.id) logger.debug( f"Updated existing file: {relative_path}", extra={ "context": { "file_id": str(existing_file.id), "relative_path": relative_path, } }, ) else: # Create new file code_file = CodeFile( repository_id=repository_id, path=path_str, relative_path=relative_path, content_hash=content_hash, size_bytes=size_bytes, language=language, modified_at=modified_at, indexed_at=datetime.utcnow(), is_deleted=False, ) db.add(code_file) await db.flush() # Get ID file_ids.append(code_file.id) logger.debug( f"Created new file: {relative_path}", extra={ "context": { "file_id": str(code_file.id), "relative_path": relative_path, } }, ) return file_ids async def _delete_chunks_for_file(db: AsyncSession, file_id: UUID) -> None: """Delete all chunks for a file (before re-chunking). Args: db: Async database session file_id: UUID of file """ result = await db.execute(select(CodeChunk).where(CodeChunk.code_file_id == file_id)) chunks = result.scalars().all() for chunk in chunks: await db.delete(chunk) logger.debug( f"Deleted {len(chunks)} chunks for file {file_id}", extra={"context": {"file_id": str(file_id), "chunk_count": len(chunks)}}, ) async def _mark_files_deleted( db: AsyncSession, repository_id: UUID, file_paths: list[Path] ) -> None: """Mark files as deleted (soft delete). Args: db: Async database session repository_id: UUID of repository file_paths: List of absolute paths to deleted files """ for file_path in file_paths: # Find file by path result = await db.execute( select(CodeFile).where( CodeFile.repository_id == repository_id, CodeFile.path == str(file_path), ) ) code_file = result.scalar_one_or_none() if code_file is not None: code_file.is_deleted = True code_file.deleted_at = datetime.utcnow() logger.debug( f"Marked file as deleted: {file_path}", extra={ "context": { "file_id": str(code_file.id), "path": str(file_path), } }, ) async def _create_change_events( db: AsyncSession, repository_id: UUID, changeset: ChangeSet, repo_path: Path ) -> None: """Create ChangeEvent records for detected changes. Args: db: Async database session repository_id: UUID of repository changeset: Detected file changes repo_path: Root path of repository """ events_created = 0 # Create events for added files for file_path in changeset.added: relative_path = str(file_path.relative_to(repo_path)) # Find file record result = await db.execute( select(CodeFile).where( CodeFile.repository_id == repository_id, CodeFile.relative_path == relative_path, ) ) code_file = result.scalar_one_or_none() if code_file is not None: event = ChangeEvent( repository_id=repository_id, file_path=relative_path, change_type="added", detected_at=datetime.utcnow(), ) db.add(event) events_created += 1 # Create events for modified files for file_path in changeset.modified: relative_path = str(file_path.relative_to(repo_path)) result = await db.execute( select(CodeFile).where( CodeFile.repository_id == repository_id, CodeFile.relative_path == relative_path, ) ) code_file = result.scalar_one_or_none() if code_file is not None: event = ChangeEvent( repository_id=repository_id, file_path=relative_path, change_type="modified", detected_at=datetime.utcnow(), ) db.add(event) events_created += 1 # Create events for deleted files for file_path in changeset.deleted: # Find file by absolute path result = await db.execute( select(CodeFile).where( CodeFile.repository_id == repository_id, CodeFile.path == str(file_path), ) ) code_file = result.scalar_one_or_none() if code_file is not None: event = ChangeEvent( repository_id=repository_id, file_path=code_file.relative_path, change_type="deleted", detected_at=datetime.utcnow(), ) db.add(event) events_created += 1 logger.info( f"Created {events_created} change events", extra={ "context": { "repository_id": str(repository_id), "events_created": events_created, } }, ) async def _create_embedding_metadata( db: AsyncSession, count: int, duration_ms: float ) -> None: """Create EmbeddingMetadata record for analytics. Args: db: Async database session count: Number of embeddings generated duration_ms: Total generation time in milliseconds """ settings = get_settings() # Calculate average time per embedding avg_time_ms = int(duration_ms / count) if count > 0 else 0 # NOTE: Embedding metadata tracking temporarily disabled due to model/schema mismatch # The Python model (analytics.py) doesn't match the actual database schema # This is non-essential analytics and doesn't affect core functionality # metadata = EmbeddingMetadata( # model_name=settings.ollama_embedding_model, # model_version=None, # dimensions=768, # generation_time_ms=avg_time_ms, # created_at=datetime.utcnow(), # ) # db.add(metadata) logger.debug( f"Created embedding metadata: {count} embeddings, {avg_time_ms}ms avg", extra={ "context": { "embedding_count": count, "avg_time_ms": avg_time_ms, "total_time_ms": duration_ms, } }, ) # ============================================================================== # Public API # ============================================================================== async def index_repository( repo_path: Path, name: str, db: AsyncSession, project_id: str, force_reindex: bool = False, ) -> IndexResult: """Index or re-index a repository. Orchestrates the complete indexing workflow: 1. Get or create Repository record 2. Scan repository for files 3. Detect changes (or force reindex all) 4. Chunk files in batches 5. Generate embeddings in batches 6. Store chunks with embeddings 7. Update repository metadata 8. Create change events Args: repo_path: Absolute path to repository name: Repository display name db: Async database session project_id: Project workspace identifier force_reindex: If True, reindex all files (default: False) Returns: IndexResult with summary statistics Raises: ValueError: If repo_path is not a directory or not absolute OSError: If repo_path is not accessible Performance: Target: <60 seconds for 10,000 files Uses batching for files (100/batch) and embeddings (50/batch) """ start_time = time.perf_counter() errors: list[str] = [] logger.info( f"Starting repository indexing: {repo_path}", extra={ "context": { "repository_path": str(repo_path), "name": name, "force_reindex": force_reindex, "operation": "index_repository", } }, ) try: # 1. Get or create Repository record repository = await _get_or_create_repository(db, repo_path, name) # 2. Scan repository for files all_files = await scan_repository(repo_path) logger.info( f"Scanned repository: {len(all_files)} files found", extra={ "context": { "repository_id": str(repository.id), "file_count": len(all_files), } }, ) # 3. Detect changes (or force reindex all) if force_reindex: files_to_index = all_files changeset = ChangeSet(added=all_files, modified=[], deleted=[]) logger.info( "Force reindex enabled: indexing all files", extra={"context": {"file_count": len(files_to_index)}}, ) else: changeset = await detect_changes(repo_path, db, repository.id) files_to_index = changeset.added + changeset.modified logger.info( f"Change detection: {len(changeset.added)} added, " f"{len(changeset.modified)} modified, {len(changeset.deleted)} deleted", extra={ "context": { "repository_id": str(repository.id), "added": len(changeset.added), "modified": len(changeset.modified), "deleted": len(changeset.deleted), } }, ) # Handle deleted files (mark as deleted) if changeset.deleted: await _mark_files_deleted(db, repository.id, changeset.deleted) # Create change events (disabled - non-essential analytics not in database-per-project schema) # if changeset.has_changes: # await _create_change_events(db, repository.id, changeset, repo_path) # If no files to index, return early if not files_to_index: # Determine appropriate status based on context # If force_reindex is True and we found files but none to index, something is wrong # If no files were found at all, this is an error condition if len(all_files) == 0: error_msg = ( f"No files found in repository at {repo_path}. " f"Check that the directory contains code files." ) errors.append(error_msg) logger.error( error_msg, extra={ "context": { "repository_id": str(repository.id), "repository_path": str(repo_path), } }, ) duration = time.perf_counter() - start_time return IndexResult( repository_id=repository.id, files_indexed=0, chunks_created=0, duration_seconds=duration, status="failed", errors=errors, ) # If we're in force_reindex mode but somehow nothing to index, that's suspicious if force_reindex and len(all_files) > 0: error_msg = ( f"Force reindex requested but no files to process. " f"Found {len(all_files)} files but all filtered out or failed validation." ) errors.append(error_msg) logger.warning( error_msg, extra={ "context": { "repository_id": str(repository.id), "file_count": len(all_files), } }, ) duration = time.perf_counter() - start_time return IndexResult( repository_id=repository.id, files_indexed=0, chunks_created=0, duration_seconds=duration, status="failed", errors=errors, ) # Normal incremental update case: no changes detected (this is OK) logger.info( "No files to index (no changes detected since last indexing)", extra={ "context": { "repository_id": str(repository.id), "total_files": len(all_files), } }, ) # Update repository metadata repository.last_indexed_at = datetime.utcnow() await db.commit() duration = time.perf_counter() - start_time return IndexResult( repository_id=repository.id, files_indexed=0, chunks_created=0, duration_seconds=duration, status="success", errors=[], ) # 4. Chunk files in batches all_chunks_to_create: list[tuple[CodeChunk, str]] = [] # (chunk, embedding_text) files_processed = 0 for file_batch in _batch(files_to_index, FILE_BATCH_SIZE): batch_start = time.perf_counter() # Read file contents file_contents: list[str] = [] for file_path in file_batch: try: content = await _read_file(file_path) file_contents.append(content) except Exception as e: error_msg = f"Failed to read {file_path}: {e}" errors.append(error_msg) logger.error( error_msg, extra={"context": {"file_path": str(file_path), "error": str(e)}}, ) # Use empty content for failed reads file_contents.append("") # Create CodeFile records try: file_ids = await _create_code_files( db, repository.id, repo_path, file_batch ) except Exception as e: error_msg = f"Failed to create CodeFile records: {e}" errors.append(error_msg) logger.error( error_msg, extra={"context": {"batch_size": len(file_batch), "error": str(e)}}, ) continue # Delete old chunks for modified files for file_id in file_ids: try: await _delete_chunks_for_file(db, file_id) except Exception as e: error_msg = f"Failed to delete chunks for file {file_id}: {e}" errors.append(error_msg) logger.warning(error_msg, extra={"context": {"file_id": str(file_id)}}) # Chunk files try: # Add project_id to each tuple for chunker chunk_files_input = list( zip(file_batch, file_contents, file_ids, [project_id] * len(file_ids)) ) chunk_lists = await chunk_files_batch(chunk_files_input) except Exception as e: error_msg = f"Failed to chunk files: {e}" errors.append(error_msg) logger.error( error_msg, extra={"context": {"batch_size": len(file_batch), "error": str(e)}}, ) continue # Convert CodeChunkCreate to CodeChunk (without embeddings yet) for chunk_list in chunk_lists: for chunk_create in chunk_list: chunk = CodeChunk( code_file_id=chunk_create.code_file_id, project_id=chunk_create.project_id, content=chunk_create.content, start_line=chunk_create.start_line, end_line=chunk_create.end_line, chunk_type=chunk_create.chunk_type, embedding=None, # Will be set after embedding generation ) all_chunks_to_create.append((chunk, chunk_create.content)) files_processed += len(file_batch) batch_duration = time.perf_counter() - batch_start logger.debug( f"Processed file batch: {len(file_batch)} files, " f"{sum(len(cl) for cl in chunk_lists)} chunks, " f"{batch_duration:.2f}s", extra={ "context": { "batch_size": len(file_batch), "chunk_count": sum(len(cl) for cl in chunk_lists), "duration_seconds": batch_duration, } }, ) logger.info( f"Chunking complete: {files_processed} files, {len(all_chunks_to_create)} chunks", extra={ "context": { "files_processed": files_processed, "chunk_count": len(all_chunks_to_create), } }, ) # 5. Generate embeddings in batches embeddings_generated = 0 if all_chunks_to_create: embedding_start = time.perf_counter() # Extract texts for embedding texts = [text for _, text in all_chunks_to_create] # Generate embeddings in batches all_embeddings: list[list[float]] = [] for text_batch in _batch(texts, EMBEDDING_BATCH_SIZE): try: batch_embeddings = await generate_embeddings(text_batch) all_embeddings.extend(batch_embeddings) embeddings_generated += len(batch_embeddings) except Exception as e: error_msg = f"Failed to generate embeddings: {e}" errors.append(error_msg) logger.error( error_msg, extra={ "context": { "batch_size": len(text_batch), "error": str(e), } }, ) # Add None for failed embeddings all_embeddings.extend([[] for _ in text_batch]) embedding_duration_ms = (time.perf_counter() - embedding_start) * 1000 logger.info( f"Generated {embeddings_generated} embeddings in {embedding_duration_ms:.0f}ms", extra={ "context": { "embedding_count": embeddings_generated, "duration_ms": embedding_duration_ms, "avg_ms_per_embedding": ( embedding_duration_ms / embeddings_generated if embeddings_generated > 0 else 0 ), } }, ) # Create embedding metadata for analytics if embeddings_generated > 0: await _create_embedding_metadata(db, embeddings_generated, embedding_duration_ms) # 6. Store chunks with embeddings for i, (chunk, _) in enumerate(all_chunks_to_create): if i < len(all_embeddings) and all_embeddings[i]: chunk.embedding = all_embeddings[i] db.add(chunk) logger.info( f"Stored {len(all_chunks_to_create)} chunks in database", extra={"context": {"chunk_count": len(all_chunks_to_create)}}, ) # 7. Update repository metadata repository.last_indexed_at = datetime.utcnow() # 8. Commit transaction await db.commit() duration = time.perf_counter() - start_time # Determine status if errors: status: Literal["success", "partial", "failed"] = "partial" else: status = "success" logger.info( f"Repository indexing complete: {status}", extra={ "context": { "repository_id": str(repository.id), "files_indexed": len(files_to_index), "chunks_created": len(all_chunks_to_create), "duration_seconds": duration, "status": status, "error_count": len(errors), } }, ) return IndexResult( repository_id=repository.id, files_indexed=len(files_to_index), chunks_created=len(all_chunks_to_create), duration_seconds=duration, status=status, errors=errors, ) except Exception as e: # Critical error - rollback transaction await db.rollback() error_msg = f"Critical error during indexing: {e}" errors.append(error_msg) logger.error( error_msg, extra={ "context": { "repository_path": str(repo_path), "error": str(e), "operation": "index_repository", } }, ) duration = time.perf_counter() - start_time # Return failed result (repository_id may not be available) return IndexResult( repository_id=UUID("00000000-0000-0000-0000-000000000000"), # Placeholder files_indexed=0, chunks_created=0, duration_seconds=duration, status="failed", errors=errors, ) async def incremental_update( repository_id: UUID, db: AsyncSession, project_id: str ) -> IndexResult: """Perform incremental update for modified files. Detects changes since last indexing and only reindexes modified files. Much faster than full reindex for large repositories. Args: repository_id: UUID of repository to update db: Async database session project_id: Project workspace identifier Returns: IndexResult with summary statistics Raises: ValueError: If repository not found Performance: Significantly faster than full reindex (only processes changed files) Target: <5 seconds for typical change sets (<100 files) """ logger.info( f"Starting incremental update for repository {repository_id}", extra={ "context": { "repository_id": str(repository_id), "operation": "incremental_update", } }, ) # Get repository result = await db.execute(select(Repository).where(Repository.id == repository_id)) repository = result.scalar_one_or_none() if repository is None: raise ValueError(f"Repository not found: {repository_id}") repo_path = Path(repository.path) # Perform indexing without force_reindex return await index_repository( repo_path=repo_path, name=repository.name, db=db, project_id=project_id, force_reindex=False, ) # ============================================================================== # Module Exports # ============================================================================== __all__ = [ "IndexResult", "index_repository", "incremental_update", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server