Skip to main content
Glama

Codebase MCP Server

by Ravenight13
test_database_existence_check.py18.4 kB
"""Integration tests for database existence check bug fix. Tests validate that get_or_create_project() now properly checks database existence before returning projects from the PostgreSQL registry. This prevents the bug where orphaned registry entries (project exists but database doesn't) caused indexing operations to fail with "database does not exist" errors. Bug Report: docs/bugs/registry-database-existence-check/BUG_REPORT.md Test Cases: 1. Registry entry exists, database missing -> auto-provision scenario 2. Registry entry correct, database exists -> no-op scenario 3. Multiple calls are idempotent -> no re-provisioning Constitutional Compliance: - Principle VII: TDD (comprehensive test coverage for bug fix) - Principle V: Production Quality (validates edge cases and recovery) - Principle VIII: Type Safety (type-annotated test functions) """ from __future__ import annotations import json from pathlib import Path from unittest.mock import MagicMock import pytest from fastmcp import Context from src.database.session import _initialize_registry_pool @pytest.mark.integration @pytest.mark.asyncio async def test_auto_provision_missing_database(tmp_path: Path) -> None: """Test automatic database provisioning when registry entry orphaned. Validates the bug fix where get_or_create_project() now: 1. Finds project in PostgreSQL registry 2. Checks if database actually exists 3. Auto-provisions database if missing 4. Initializes schema (repositories, code_files, code_chunks tables) Scenario: Manual registry corruption or failed database provisioning Args: tmp_path: Pytest temporary directory fixture """ from src.database.auto_create import get_or_create_project_from_config # Setup: Create config file with project metadata test_repo = tmp_path / "test-repo" test_repo.mkdir() config_dir = test_repo / ".codebase-mcp" config_dir.mkdir() config_file = config_dir / "config.json" # Create config WITHOUT project.id (will be auto-generated) config_data = { "version": "1.0", "project": { "name": "orphaned-registry-project" }, "auto_switch": True } config_file.write_text(json.dumps(config_data, indent=2)) # Step 1: Create project and get registry entry with database project_v1 = await get_or_create_project_from_config(config_file) project_id = project_v1.project_id database_name = project_v1.database_name # Verify database was created registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: db_exists_before = await conn.fetchval( "SELECT 1 FROM pg_database WHERE datname = $1", database_name ) assert db_exists_before == 1, f"Database {database_name} should exist after first creation" # Step 2: Manually drop the database to simulate orphaned registry entry # This simulates the bug scenario: registry entry exists but database doesn't async with registry_pool.acquire() as conn: # Terminate active connections to the database await conn.execute(f""" SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{database_name}' AND pid <> pg_backend_pid() """) # Drop the database await conn.execute(f"DROP DATABASE IF EXISTS {database_name}") # Verify database is gone db_exists_after_drop = await conn.fetchval( "SELECT 1 FROM pg_database WHERE datname = $1", database_name ) assert db_exists_after_drop is None, f"Database {database_name} should be dropped" # Step 3: Verify registry entry still exists (orphaned) async with registry_pool.acquire() as conn: registry_row = await conn.fetchrow( "SELECT id, name, database_name FROM projects WHERE id = $1", project_id ) assert registry_row is not None, "Registry entry should still exist (orphaned)" assert registry_row['database_name'] == database_name # Step 4: Clear in-memory registry to force PostgreSQL lookup # This simulates server restart scenario where in-memory registry is empty from src.database.auto_create import get_registry registry = get_registry() registry._projects_by_id.clear() registry._projects_by_name.clear() # Step 5: Call get_or_create_project again - should auto-provision # This is the bug fix validation: should detect missing database and re-provision project_v2 = await get_or_create_project_from_config(config_file) # Verify same project returned (not a new one) assert project_v2.project_id == project_id, "Should reuse same project ID" assert project_v2.database_name == database_name, "Should reuse same database name" # Step 6: Verify database was auto-provisioned async with registry_pool.acquire() as conn: db_exists_after_recovery = await conn.fetchval( "SELECT 1 FROM pg_database WHERE datname = $1", database_name ) assert db_exists_after_recovery == 1, ( f"BUG FIX FAILED: Database {database_name} was not auto-provisioned. " f"Registry entry was orphaned but database existence check did not trigger recovery." ) # Step 7: Verify schema initialized (tables exist) from src.database.session import get_or_create_project_pool project_pool = await get_or_create_project_pool(database_name) async with project_pool.acquire() as conn: tables = await conn.fetch(""" SELECT tablename FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('repositories', 'code_files', 'code_chunks') """) table_names = {row['tablename'] for row in tables} assert len(table_names) >= 3, ( f"Schema not initialized correctly. Expected repositories, code_files, code_chunks. " f"Found: {table_names}" ) assert 'repositories' in table_names assert 'code_files' in table_names assert 'code_chunks' in table_names @pytest.mark.integration @pytest.mark.asyncio async def test_no_provision_when_database_exists(tmp_path: Path) -> None: """Test no unnecessary provisioning when database already exists. Validates that get_or_create_project(): 1. Finds project in registry 2. Checks database exists 3. Returns project WITHOUT re-provisioning 4. No database creation overhead Scenario: Normal operation with healthy registry and database Args: tmp_path: Pytest temporary directory fixture """ from src.database.auto_create import get_or_create_project_from_config # Setup: Create config and initialize project test_repo = tmp_path / "test-repo" test_repo.mkdir() config_dir = test_repo / ".codebase-mcp" config_dir.mkdir() config_file = config_dir / "config.json" config_data = { "version": "1.0", "project": { "name": "healthy-project" }, "auto_switch": True } config_file.write_text(json.dumps(config_data, indent=2)) # Step 1: Create project (registry + database) project_v1 = await get_or_create_project_from_config(config_file) project_id_v1 = project_v1.project_id database_name_v1 = project_v1.database_name # Verify database exists registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: db_exists_v1 = await conn.fetchval( "SELECT 1 FROM pg_database WHERE datname = $1", database_name_v1 ) assert db_exists_v1 == 1, "Database should exist after creation" # Step 2: Call get_or_create_project again (should be no-op) # Track database count before second call async with registry_pool.acquire() as conn: db_count_before = await conn.fetchval( "SELECT COUNT(*) FROM pg_database WHERE datname LIKE 'cb_proj_healthy_project_%'" ) project_v2 = await get_or_create_project_from_config(config_file) # Track database count after second call async with registry_pool.acquire() as conn: db_count_after = await conn.fetchval( "SELECT COUNT(*) FROM pg_database WHERE datname LIKE 'cb_proj_healthy_project_%'" ) # Verify no new database created assert db_count_after == db_count_before, ( f"BUG: Database was re-provisioned unnecessarily. " f"Before: {db_count_before}, After: {db_count_after}" ) # Verify same project returned assert project_v2.project_id == project_id_v1, "Should return same project ID" assert project_v2.database_name == database_name_v1, "Should return same database name" # Step 3: Verify registry entry unchanged async with registry_pool.acquire() as conn: registry_rows = await conn.fetch( "SELECT id FROM projects WHERE name = $1", "healthy-project" ) assert len(registry_rows) == 1, ( f"Expected exactly 1 registry entry, found {len(registry_rows)}. " f"Duplicate projects created!" ) @pytest.mark.integration @pytest.mark.asyncio async def test_idempotent_multiple_calls(tmp_path: Path) -> None: """Test multiple calls to get_or_create_project are idempotent. Validates that: 1. Create orphaned registry entry (registry exists, database missing) 2. First call auto-provisions database 3. Second call finds database and doesn't re-provision 4. Third call also no-ops 5. No errors on any call Scenario: Recovery followed by normal operation Args: tmp_path: Pytest temporary directory fixture """ from src.database.auto_create import get_or_create_project_from_config # Setup test_repo = tmp_path / "test-repo" test_repo.mkdir() config_dir = test_repo / ".codebase-mcp" config_dir.mkdir() config_file = config_dir / "config.json" config_data = { "version": "1.0", "project": { "name": "idempotent-test" }, "auto_switch": True } config_file.write_text(json.dumps(config_data, indent=2)) # Step 1: Create initial project project_v1 = await get_or_create_project_from_config(config_file) project_id = project_v1.project_id database_name = project_v1.database_name # Step 2: Drop database to create orphaned registry entry registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: await conn.execute(f""" SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{database_name}' AND pid <> pg_backend_pid() """) await conn.execute(f"DROP DATABASE IF EXISTS {database_name}") # Step 3: Clear in-memory registry to force PostgreSQL lookup from src.database.auto_create import get_registry registry = get_registry() registry._projects_by_id.clear() registry._projects_by_name.clear() # Step 4: First recovery call - should auto-provision project_v2 = await get_or_create_project_from_config(config_file) assert project_v2.project_id == project_id assert project_v2.database_name == database_name # Verify database provisioned async with registry_pool.acquire() as conn: db_exists_v2 = await conn.fetchval( "SELECT 1 FROM pg_database WHERE datname = $1", database_name ) assert db_exists_v2 == 1, "Database should be provisioned after first recovery call" # Step 5: Second call - should be no-op (database already exists) project_v3 = await get_or_create_project_from_config(config_file) assert project_v3.project_id == project_id assert project_v3.database_name == database_name # Step 6: Third call - also no-op project_v4 = await get_or_create_project_from_config(config_file) assert project_v4.project_id == project_id assert project_v4.database_name == database_name # Step 7: Verify only ONE project in registry async with registry_pool.acquire() as conn: project_count = await conn.fetchval( "SELECT COUNT(*) FROM projects WHERE name = $1", "idempotent-test" ) assert project_count == 1, ( f"Expected exactly 1 project, found {project_count}. " f"Multiple calls created duplicate projects!" ) # Step 8: Verify only ONE database exists async with registry_pool.acquire() as conn: db_count = await conn.fetchval( "SELECT COUNT(*) FROM pg_database WHERE datname LIKE 'cb_proj_idempotent_test_%'" ) assert db_count == 1, ( f"Expected exactly 1 database, found {db_count}. " f"Database was re-provisioned on subsequent calls!" ) @pytest.mark.integration @pytest.mark.asyncio async def test_end_to_end_indexing_with_orphaned_registry(tmp_path: Path) -> None: """Test complete indexing workflow with orphaned registry entry. Validates end-to-end user scenario: 1. Project created and indexed successfully 2. Database manually deleted (simulating corruption) 3. User attempts to index again 4. System detects missing database and auto-recovers 5. Indexing proceeds successfully Scenario: Real-world recovery from database corruption Args: tmp_path: Pytest temporary directory fixture """ # Setup test repository with Python files test_repo = tmp_path / "test-repo" test_repo.mkdir() config_dir = test_repo / ".codebase-mcp" config_dir.mkdir() config_file = config_dir / "config.json" config_data = { "version": "1.0", "project": { "name": "e2e-recovery-test" }, "auto_switch": True } config_file.write_text(json.dumps(config_data, indent=2)) # Create test files to index (test_repo / "auth.py").write_text( "def authenticate(user, password):\n" " '''Authenticate user credentials'''\n" " return validate(user, password)\n" ) (test_repo / "utils.py").write_text( "def helper():\n" " '''Helper function'''\n" " pass\n" ) # Import MCP tools from src.mcp.tools.project import set_working_directory from src.mcp.tools.background_indexing import start_indexing_background, get_indexing_status from src.mcp.tools.search import search_code import asyncio # Create mock context mock_ctx = MagicMock(spec=Context) mock_ctx.session_id = "test-session-e2e-recovery" # Step 1: Initial indexing - should succeed await set_working_directory.fn(directory=str(test_repo), ctx=mock_ctx) # Start background indexing job job_result = await start_indexing_background.fn( repo_path=str(test_repo), ctx=mock_ctx ) job_id = job_result["job_id"] # Poll for completion for _ in range(30): # Max 30 seconds status_result = await get_indexing_status.fn(job_id=job_id, ctx=mock_ctx) if status_result["status"] in ["completed", "failed"]: break await asyncio.sleep(1) assert status_result["status"] == "completed", f"Indexing failed: {status_result.get('error_message')}" assert status_result["files_indexed"] >= 2 project_id = job_result["project_id"] database_name = job_result["database_name"] # Verify search works search_result_v1 = await search_code.fn( query="authentication", ctx=mock_ctx ) assert len(search_result_v1["results"]) > 0 # Step 2: Simulate database corruption (drop database) registry_pool = await _initialize_registry_pool() async with registry_pool.acquire() as conn: await conn.execute(f""" SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{database_name}' AND pid <> pg_backend_pid() """) await conn.execute(f"DROP DATABASE IF EXISTS {database_name}") # Verify database is gone db_exists = await conn.fetchval( "SELECT 1 FROM pg_database WHERE datname = $1", database_name ) assert db_exists is None, "Database should be dropped" # Step 3: Clear in-memory registry to force PostgreSQL lookup from src.database.auto_create import get_registry registry = get_registry() registry._projects_by_id.clear() registry._projects_by_name.clear() # Step 4: Attempt to index again - should auto-recover # Reset working directory to force config re-load await set_working_directory.fn(directory=str(test_repo), ctx=mock_ctx) # Start background indexing job again job_result_v2 = await start_indexing_background.fn( repo_path=str(test_repo), ctx=mock_ctx ) job_id_v2 = job_result_v2["job_id"] # Poll for completion for _ in range(30): status_result_v2 = await get_indexing_status.fn(job_id=job_id_v2, ctx=mock_ctx) if status_result_v2["status"] in ["completed", "failed"]: break await asyncio.sleep(1) # Verify indexing succeeded after auto-recovery assert status_result_v2["status"] == "completed", ( f"BUG: Indexing failed after database recovery. " f"Status: {status_result_v2.get('status')}, " f"Error: {status_result_v2.get('error_message')}" ) assert job_result_v2["project_id"] == project_id assert job_result_v2["database_name"] == database_name assert status_result_v2["files_indexed"] >= 2 # Step 5: Verify search works after recovery search_result_v2 = await search_code.fn( query="authentication", ctx=mock_ctx ) assert len(search_result_v2["results"]) > 0, ( "Search should work after database recovery" ) # Step 6: Verify database was re-provisioned async with registry_pool.acquire() as conn: db_exists_after_recovery = await conn.fetchval( "SELECT 1 FROM pg_database WHERE datname = $1", database_name ) assert db_exists_after_recovery == 1, ( "Database should exist after auto-recovery" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server