Skip to main content
Glama

Codebase MCP Server

by Ravenight13
background_worker.py14.9 kB
"""Background indexing worker for non-blocking repository indexing. Simple state machine: pending → running → completed/failed No progress callbacks - binary state only. Constitutional Compliance: - Principle I: Simplicity (reuses existing indexer, no callbacks) - Principle V: Production Quality (error handling, logging) - Principle VIII: Type Safety (full type hints) Worker Responsibilities: - Update job status through state machine - Call existing index_repository service (NO modifications) - Capture errors and update job status - Use structured logging with context - Handle all exception types gracefully State Machine: 1. pending → running: Job starts processing 2. running → completed: Indexing succeeds 3. running → failed: Indexing encounters error Error Handling: - Catch ALL exceptions (never crash) - Log errors with structured context - Always update job status to failed - Nested try/catch for status update failures """ from __future__ import annotations import re from datetime import datetime from pathlib import Path from typing import Any from uuid import UUID from fastmcp import Context from src.database.session import get_session, engine from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from src.mcp.mcp_logging import get_logger from src.models.indexing_job import IndexingJob from src.models.code_file import CodeFile from src.models.repository import Repository from src.services.indexer import index_repository logger = get_logger(__name__) async def update_job( job_id: UUID, **updates: Any, ) -> None: """Update job fields atomically in main database. Args: job_id: UUID of indexing_jobs row **updates: Field names and values to update (e.g., status="in_progress", files_indexed=100) Example: >>> await update_job( ... job_id=job_id, ... status="completed", ... files_indexed=1000, ... completed_at=datetime.now() ... ) """ async with AsyncSession(engine) as session: job = await session.get(IndexingJob, job_id) if job is None: logger.warning(f"Job {job_id} not found for update") return # Apply updates for key, value in updates.items(): if hasattr(job, key): setattr(job, key, value) else: logger.warning(f"Invalid field for job update: {key}") await session.commit() async def get_available_databases(prefix: str = "cb_proj_") -> list[str]: """Query PostgreSQL for available databases matching prefix. Args: prefix: Database name prefix to search for (default: cb_proj_) Returns: List of database names matching the prefix, sorted alphabetically Example: >>> databases = await get_available_databases() >>> print(databases) ['cb_proj_client_a_abc123de', 'cb_proj_default_00000000'] """ from src.database.session import _initialize_registry_pool try: registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: databases = await conn.fetch( "SELECT datname FROM pg_database WHERE datname LIKE $1 ORDER BY datname", f"{prefix}%" ) return [db['datname'] for db in databases] except Exception as e: logger.debug(f"Could not query available databases: {e}") return [] async def generate_database_suggestion(error_message: str, project_id: str | None) -> str: """Generate helpful error message with database suggestions. Parses database connection errors to extract the missing database name and provides actionable suggestions including available alternatives. Args: error_message: Original database error message project_id: Project ID that was attempted (if any) Returns: Enhanced error message with actionable suggestions Example: >>> error = 'database "cb_proj_test" does not exist' >>> suggestion = await generate_database_suggestion(error, "test-project") >>> print(suggestion) ❌ Database 'cb_proj_test' does not exist. 📋 Available databases: • cb_proj_default_00000000 🔧 Recommended actions: 1. Update .codebase-mcp/config.json with a valid database name ... """ # Extract database name from error (e.g., "database \"cb_proj_xxx\" does not exist") db_match = re.search(r'database "([^"]+)" does not exist', error_message) if not db_match: return error_message # Can't enhance, return original missing_db = db_match.group(1) # Get available databases available_dbs = await get_available_databases() # Build enhanced error message msg = f"❌ Database '{missing_db}' does not exist.\n\n" if available_dbs: msg += "📋 Available databases:\n" for db in available_dbs: msg += f" • {db}\n" msg += "\n" else: msg += "⚠️ No codebase-mcp databases found. This may be the first project.\n\n" msg += "🔧 Recommended actions:\n" msg += "1. Update .codebase-mcp/config.json with a valid database name\n" msg += " Add: \"database_name\": \"cb_proj_your_project_xxxxx\" to project config\n" if available_dbs: msg += f" Example: \"database_name\": \"{available_dbs[0]}\"\n" msg += "2. Or, remove the 'id' field from config to auto-create a new database\n" msg += "3. Or, create the database manually: createdb " + missing_db + "\n" return msg async def _background_indexing_worker( job_id: UUID, repo_path: str, project_id: str, config_path: Path | None = None, force_reindex: bool = False, ) -> None: """Background worker that executes indexing and updates PostgreSQL. Simple state machine: pending → running → completed/failed No progress callbacks - binary state only. Args: job_id: UUID of indexing_jobs row repo_path: Absolute path to repository project_id: Resolved project identifier config_path: Optional path to .codebase-mcp/config.json for auto-creation If provided, worker will attempt to auto-create project database. If None, worker uses existing database or default database. force_reindex: If True, re-index all files regardless of changes (default: False) Bug Fix: Resolves Bug 2 - Background indexing auto-creation failure. Previously passed FastMCP Context which becomes stale in background task. Now captures config path in foreground (while ctx valid) and passes path. State Transitions: 1. Update status: pending → running (with started_at timestamp) 2. Execute: Call index_repository service 3. Update status: running → completed/failed (with completed_at timestamp) Error Handling: - All exceptions caught and logged - Job status always updated to failed on error - Nested try/catch prevents status update failures Performance: Worker execution time depends on repository size (60s target for 10K files) """ logger.info( f"Background worker started for job {job_id}", extra={ "context": { "job_id": str(job_id), "project_id": project_id, "force_reindex": force_reindex, } }, ) try: # 1. Update status to running await update_job( job_id=job_id, status="running", started_at=datetime.now(), ) # 1.5. Auto-create project database if config provided (Bug 2 fix) if config_path: try: from src.database.auto_create import get_or_create_project_from_config await get_or_create_project_from_config(config_path) logger.debug(f"Auto-created/verified database from {config_path}") except Exception as e: logger.warning(f"Auto-creation failed: {e}, attempting indexing anyway") # Continue - database might exist, or get_session will fail below # 2. Run existing indexer (NO MODIFICATIONS to indexer.py!) async with get_session(project_id=project_id, ctx=None) as session: result = await index_repository( repo_path=Path(repo_path), name=Path(repo_path).name, db=session, project_id=project_id, force_reindex=force_reindex, ) # 3. Check if indexing succeeded by inspecting result.status if result.status == "failed": # Indexer reported failure - extract error message error_message = result.errors[0] if result.errors else "Unknown indexing error" logger.error( f"Indexing failed for job {job_id}", extra={ "context": { "job_id": str(job_id), "error": error_message, "errors_count": len(result.errors), } }, ) await update_job( job_id=job_id, status="failed", error_message=error_message, completed_at=datetime.now(), ) return # Exit early - do not proceed to "completed" update # 4. Path validation (defensive checks - should never fail after successful indexing) repo_path_obj = Path(repo_path) if not repo_path_obj.exists(): # Path doesn't exist - mark as failed raise FileNotFoundError(f"Repository path does not exist: {repo_path}") if not repo_path_obj.is_dir(): # Path is not a directory - mark as failed raise NotADirectoryError(f"Repository path is not a directory: {repo_path}") # 5. Determine indexing scenario and set appropriate status message status_message = "" # Query database to determine if this was incremental or full index async with get_session(project_id=project_id, ctx=None) as session: # Count total non-deleted files in database for this repository count_result = await session.execute( select(func.count(CodeFile.id)).where( CodeFile.repository_id == result.repository_id, CodeFile.is_deleted == False # noqa: E712 ) ) total_files_in_db = count_result.scalar() or 0 # Determine scenario based on force_reindex flag, files_indexed count, and total files if force_reindex: # Force reindex mode - all files re-indexed regardless of changes status_message = f"Force reindex completed: {result.files_indexed} files, {result.chunks_created} chunks" elif result.files_indexed == 0: # No changes detected (incremental with no modifications) status_message = f"Repository up to date - no file changes detected since last index ({total_files_in_db} files already indexed)" elif total_files_in_db == result.files_indexed: # Full index of new repository (all files in DB equal files just indexed) status_message = f"Full repository index completed: {result.files_indexed} files, {result.chunks_created} chunks" else: # Incremental update - some files changed status_message = f"Incremental update completed: {result.files_indexed} files updated" # 6. Update to completed with results and status message await update_job( job_id=job_id, status="completed", files_indexed=result.files_indexed, chunks_created=result.chunks_created, status_message=status_message, completed_at=datetime.now(), ) logger.info( f"Job {job_id} completed successfully", extra={ "context": { "job_id": str(job_id), "files_indexed": result.files_indexed, "chunks_created": result.chunks_created, "status_message": status_message, "force_reindex": force_reindex, } }, ) except FileNotFoundError as e: # Repository not found logger.error( f"Repository not found for job {job_id}", extra={"context": {"job_id": str(job_id), "repo_path": repo_path}}, ) await update_job( job_id=job_id, status="failed", error_message=f"Repository not found: {repo_path}", completed_at=datetime.now(), ) except NotADirectoryError as e: # Path exists but is not a directory logger.error( f"Path is not a directory for job {job_id}", extra={"context": {"job_id": str(job_id), "repo_path": repo_path}}, ) await update_job( job_id=job_id, status="failed", error_message=f"Path is not a directory: {repo_path}", completed_at=datetime.now(), ) except Exception as e: # Catch-all for unexpected errors error_str = str(e) # Check if this is a database existence error and enhance the message if "database" in error_str.lower() and "does not exist" in error_str.lower(): error_message = await generate_database_suggestion(error_str, project_id) else: error_message = error_str logger.error( f"Unexpected error in job {job_id}", extra={ "context": { "job_id": str(job_id), "repo_path": repo_path, "project_id": project_id, "error": error_message, "error_type": type(e).__name__, } }, exc_info=True, ) # Nested try/catch for status update failures try: await update_job( job_id=job_id, status="failed", error_message=error_message, # Use enhanced message completed_at=datetime.now(), ) except Exception as update_error: logger.error( f"Failed to update job {job_id} status to failed", extra={"context": {"error": str(update_error)}}, ) __all__ = [ "_background_indexing_worker", "update_job", "get_available_databases", "generate_database_suggestion", ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server