Skip to main content
Glama

Codebase MCP Server

by Ravenight13
test_performance_validation.py17.4 kB
""" Integration tests for performance validation (Scenario 7). Tests verify (from quickstart.md): - Performance requirements (FR-034, FR-035) - Indexing: 10,000 files in <60 seconds - Search: p95 latency <500ms - Load testing with concurrent operations - Performance regression detection TDD Compliance: These tests MUST FAIL initially since services are not implemented yet. """ from __future__ import annotations import asyncio import random import time from pathlib import Path from typing import TYPE_CHECKING import pytest from sqlalchemy.ext.asyncio import AsyncSession if TYPE_CHECKING: from src.services.indexer import IndexResult from src.services.searcher import SearchResult @pytest.fixture async def large_test_repository(tmp_path: Path) -> Path: """ Generate a large test repository with 10,000 Python files. Each file contains realistic Python code with functions, classes, and docstrings for comprehensive indexing and search testing. """ repo_path = tmp_path / "large-repo" repo_path.mkdir() # Create directory structure for i in range(100): module_dir = repo_path / f"module_{i:03d}" module_dir.mkdir() # Create 100 files per module (100 * 100 = 10,000 files) for j in range(100): file_path = module_dir / f"file_{j:03d}.py" # Generate realistic Python code code = f'''"""Module {i} File {j} - Generated test code.""" import os import sys from typing import List, Dict, Optional class DataProcessor: """Process data with various transformations.""" def __init__(self, name: str = "processor_{i}_{j}") -> None: """Initialize processor with name.""" self.name = name self.data: List[int] = [] def add_data(self, value: int) -> None: """Add a value to the data list.""" self.data.append(value) def process(self) -> int: """Process the data and return sum.""" return sum(self.data) def transform(self, multiplier: int = 2) -> List[int]: """Transform data by multiplying each value.""" return [x * multiplier for x in self.data] def calculate_average(numbers: List[int]) -> float: """Calculate the average of a list of numbers.""" if not numbers: return 0.0 return sum(numbers) / len(numbers) def find_maximum(numbers: List[int]) -> Optional[int]: """Find the maximum value in a list.""" if not numbers: return None return max(numbers) def filter_positive(numbers: List[int]) -> List[int]: """Filter out negative numbers from the list.""" return [n for n in numbers if n > 0] def main() -> None: """Main entry point for module {i} file {j}.""" processor = DataProcessor() processor.add_data(10) processor.add_data(20) result = processor.process() print(f"Result: {{result}}") if __name__ == "__main__": main() ''' file_path.write_text(code) return repo_path @pytest.fixture async def db_session() -> AsyncSession: """Create async database session for tests.""" pytest.skip("Database session fixture not implemented yet (requires T019-T027)") @pytest.mark.integration @pytest.mark.asyncio @pytest.mark.slow # Mark as slow test (can be skipped in CI) async def test_indexing_10k_files_under_60_seconds_not_implemented( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test indexing 10,000 files in <60 seconds - NOT YET IMPLEMENTED. Performance requirement from spec: - 10,000 files indexed in <60 seconds - This is a critical performance target Expected workflow: 1. Index repository with 10,000 files 2. Measure total indexing time 3. Verify time <60 seconds 4. Verify all files indexed 5. Verify all chunks created This test MUST FAIL until T031 (indexer optimization) is implemented. """ pytest.skip("Indexer service not implemented yet (T031)") # Future implementation: # from src.services.indexer import index_repository # # start_time = time.perf_counter() # # result = await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # duration = time.perf_counter() - start_time # # # Verify performance target # assert duration < 60, f"Indexing took {duration:.2f}s, exceeds 60s target" # # # Verify indexing success # assert result.status == "success" # assert result.files_indexed == 10_000 # assert result.chunks_created > 50_000 # ~5+ chunks per file # assert result.duration_seconds < 60 @pytest.mark.integration @pytest.mark.asyncio async def test_search_p95_latency_under_500ms_not_implemented( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test search p95 latency <500ms - NOT YET IMPLEMENTED. Performance requirement from spec: - Search p95 latency <500ms - This ensures responsive user experience Expected workflow: 1. Index large repository (prerequisite) 2. Run 100 diverse search queries 3. Measure latency for each 4. Calculate p50, p95, p99 percentiles 5. Verify p95 <500ms This test MUST FAIL until T032 (searcher optimization) is implemented. """ pytest.skip("Search service not implemented yet (T032)") # Future implementation: # from src.services.indexer import index_repository # from src.services.searcher import search_code # # # Index repository first # await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # # Generate diverse search queries # queries = [ # "calculate average of numbers", # "find maximum value in list", # "filter positive numbers", # "data processor class", # "transform data with multiplier", # "process data and return sum", # "initialize processor with name", # "add value to data list", # "main entry point", # "import statements", # ] * 10 # 100 total queries # # latencies: list[float] = [] # # for query in queries: # start_time = time.perf_counter() # await search_code(query=query, limit=10) # latency_ms = (time.perf_counter() - start_time) * 1000 # latencies.append(latency_ms) # # # Calculate percentiles # latencies.sort() # p50 = latencies[len(latencies) // 2] # p95 = latencies[int(len(latencies) * 0.95)] # p99 = latencies[int(len(latencies) * 0.99)] # # # Verify performance targets # assert p95 < 500, f"P95 latency {p95:.0f}ms exceeds 500ms target" # assert p99 < 1000, f"P99 latency {p99:.0f}ms exceeds 1000ms target" # # # Log performance metrics # print(f"Search performance: P50={p50:.0f}ms, P95={p95:.0f}ms, P99={p99:.0f}ms") @pytest.mark.integration @pytest.mark.asyncio async def test_concurrent_search_operations( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test concurrent search operations maintain performance. Expected behavior: - Run 10 concurrent search queries - Measure latency for each - Verify all complete successfully - Verify average latency acceptable This test MUST FAIL until T032 (searcher) is implemented. """ pytest.skip("Search service not implemented yet (T032)") # Future implementation: # from src.services.indexer import index_repository # from src.services.searcher import search_code # # # Index repository # await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # async def search_task(query: str) -> float: # """Perform search and return latency in ms.""" # start_time = time.perf_counter() # await search_code(query=query, limit=10) # return (time.perf_counter() - start_time) * 1000 # # # Run 10 concurrent searches # queries = [f"search query {i}" for i in range(10)] # tasks = [search_task(q) for q in queries] # # latencies = await asyncio.gather(*tasks) # # # Verify all completed # assert len(latencies) == 10 # # # Verify average latency reasonable # avg_latency = sum(latencies) / len(latencies) # assert avg_latency < 1000, f"Average concurrent latency {avg_latency:.0f}ms too high" @pytest.mark.integration @pytest.mark.asyncio async def test_incremental_update_performance( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test incremental update performance on large repository. Expected behavior: - Index 10,000 files - Modify 10 files - Incremental update should be much faster than full index - Should complete in <10 seconds This test MUST FAIL until T031 (indexer with incremental logic) is implemented. """ pytest.skip("Incremental indexing not implemented yet (T031)") # Future implementation: # from src.services.indexer import index_repository # # # Initial full index # await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # # Modify 10 files # for i in range(10): # file_path = large_test_repository / f"module_000" / f"file_{i:03d}.py" # content = file_path.read_text() # file_path.write_text(content + "\n# Modified\n") # # # Incremental update # start_time = time.perf_counter() # result = await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=False, # ) # duration = time.perf_counter() - start_time # # # Verify performance # assert duration < 10, f"Incremental update took {duration:.2f}s, should be <10s" # assert result.files_indexed == 10 # Only modified files @pytest.mark.integration @pytest.mark.asyncio async def test_memory_usage_during_large_indexing( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test memory usage stays reasonable during large repository indexing. Expected behavior: - Index 10,000 files - Memory usage should not grow unbounded - Verify batching and streaming work correctly This test MUST FAIL until T031 (indexer with batching) is implemented. """ pytest.skip("Indexer batching not implemented yet (T031)") # Future implementation: # import psutil # import os # from src.services.indexer import index_repository # # process = psutil.Process(os.getpid()) # initial_memory = process.memory_info().rss / 1024 / 1024 # MB # # # Index large repository # await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # final_memory = process.memory_info().rss / 1024 / 1024 # MB # memory_increase = final_memory - initial_memory # # # Verify memory usage reasonable (< 1GB increase) # assert memory_increase < 1024, f"Memory increased by {memory_increase:.0f}MB, too high" @pytest.mark.integration @pytest.mark.asyncio async def test_database_query_performance( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test database query performance with large dataset. Expected behavior: - Index 10,000 files (~50,000+ chunks) - Run complex database queries - Verify queries complete in reasonable time - Verify indexes are working This test MUST FAIL until T026 (migration with indexes) is implemented. """ pytest.skip("Database indexes not implemented yet (T026)") # Future implementation: # from src.services.indexer import index_repository # from src.models.code_chunk import CodeChunk # from sqlalchemy import select, func # # # Index repository # await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # # Test query: Count all chunks # start_time = time.perf_counter() # stmt = select(func.count()).select_from(CodeChunk) # result = await db_session.scalar(stmt) # duration_ms = (time.perf_counter() - start_time) * 1000 # # assert result > 50_000 # Should have many chunks # assert duration_ms < 100, f"Count query took {duration_ms:.0f}ms, too slow" @pytest.mark.integration @pytest.mark.asyncio async def test_embedding_generation_throughput( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test embedding generation throughput with batching. Expected behavior: - Generate embeddings for 50,000+ chunks - Batching should improve throughput - Verify reasonable generation rate This test MUST FAIL until T030 (embedder with batching) is implemented. """ pytest.skip("Embedder batching not implemented yet (T030)") # Future implementation: # from src.services.indexer import index_repository # # start_time = time.perf_counter() # # result = await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # duration = time.perf_counter() - start_time # # # Calculate throughput # chunks_per_second = result.chunks_created / duration # # # Verify reasonable throughput (>800 chunks/sec with batching) # assert chunks_per_second > 800, ( # f"Throughput {chunks_per_second:.0f} chunks/sec too low, " # "batching may not be working" # ) @pytest.mark.integration @pytest.mark.asyncio async def test_search_result_limit_performance( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test that search limit parameter affects performance appropriately. Expected behavior: - Search with limit=10 should be fast - Search with limit=100 should be slower but still reasonable - Verify limit is applied at database level (not post-filtering) This test MUST FAIL until T032 (searcher) is implemented. """ pytest.skip("Search service not implemented yet (T032)") # Future implementation: # from src.services.indexer import index_repository # from src.services.searcher import search_code # # # Index repository # await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # # # Search with small limit # start_time = time.perf_counter() # results_small = await search_code(query="function", limit=10) # latency_small = (time.perf_counter() - start_time) * 1000 # # # Search with larger limit # start_time = time.perf_counter() # results_large = await search_code(query="function", limit=100) # latency_large = (time.perf_counter() - start_time) * 1000 # # # Verify limits respected # assert len(results_small) <= 10 # assert len(results_large) <= 100 # # # Verify performance reasonable for both # assert latency_small < 500, f"Small limit search took {latency_small:.0f}ms" # assert latency_large < 1000, f"Large limit search took {latency_large:.0f}ms" @pytest.mark.integration @pytest.mark.asyncio async def test_performance_regression_baseline( large_test_repository: Path, db_session: AsyncSession, ) -> None: """ Test performance baseline for regression detection. This test establishes performance baselines: - Indexing time for 10K files - Search p95 latency - Memory usage - Database query performance Expected behavior: - All metrics within expected ranges - Can be used to detect regressions in future This test MUST FAIL until T031, T032 are implemented. """ pytest.skip("Indexer and searcher not implemented yet (T031, T032)") # Future implementation: # from src.services.indexer import index_repository # from src.services.searcher import search_code # # # Baseline: Indexing # start_time = time.perf_counter() # result = await index_repository( # path=large_test_repository, # name="Large Test Repository", # force_reindex=True, # ) # indexing_time = time.perf_counter() - start_time # # # Baseline: Search # queries = ["function", "class", "data", "process", "calculate"] * 20 # latencies = [] # for query in queries: # start_time = time.perf_counter() # await search_code(query=query, limit=10) # latencies.append((time.perf_counter() - start_time) * 1000) # # latencies.sort() # search_p95 = latencies[int(len(latencies) * 0.95)] # # # Print baselines for reference # print(f"Performance Baseline:") # print(f" Indexing (10K files): {indexing_time:.2f}s") # print(f" Search P95: {search_p95:.0f}ms") # print(f" Files indexed: {result.files_indexed}") # print(f" Chunks created: {result.chunks_created}") # # # Verify baselines # assert indexing_time < 60 # assert search_p95 < 500

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Ravenight13/codebase-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server