test_performance_validation.py•17.4 kB
"""
Integration tests for performance validation (Scenario 7).
Tests verify (from quickstart.md):
- Performance requirements (FR-034, FR-035)
- Indexing: 10,000 files in <60 seconds
- Search: p95 latency <500ms
- Load testing with concurrent operations
- Performance regression detection
TDD Compliance: These tests MUST FAIL initially since services are not implemented yet.
"""
from __future__ import annotations
import asyncio
import random
import time
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
if TYPE_CHECKING:
    from src.services.indexer import IndexResult
    from src.services.searcher import SearchResult
@pytest.fixture
async def large_test_repository(tmp_path: Path) -> Path:
    """
    Generate a large test repository with 10,000 Python files.
    Each file contains realistic Python code with functions, classes,
    and docstrings for comprehensive indexing and search testing.
    """
    repo_path = tmp_path / "large-repo"
    repo_path.mkdir()
    # Create directory structure
    for i in range(100):
        module_dir = repo_path / f"module_{i:03d}"
        module_dir.mkdir()
        # Create 100 files per module (100 * 100 = 10,000 files)
        for j in range(100):
            file_path = module_dir / f"file_{j:03d}.py"
            # Generate realistic Python code
            code = f'''"""Module {i} File {j} - Generated test code."""
import os
import sys
from typing import List, Dict, Optional
class DataProcessor:
    """Process data with various transformations."""
    def __init__(self, name: str = "processor_{i}_{j}") -> None:
        """Initialize processor with name."""
        self.name = name
        self.data: List[int] = []
    def add_data(self, value: int) -> None:
        """Add a value to the data list."""
        self.data.append(value)
    def process(self) -> int:
        """Process the data and return sum."""
        return sum(self.data)
    def transform(self, multiplier: int = 2) -> List[int]:
        """Transform data by multiplying each value."""
        return [x * multiplier for x in self.data]
def calculate_average(numbers: List[int]) -> float:
    """Calculate the average of a list of numbers."""
    if not numbers:
        return 0.0
    return sum(numbers) / len(numbers)
def find_maximum(numbers: List[int]) -> Optional[int]:
    """Find the maximum value in a list."""
    if not numbers:
        return None
    return max(numbers)
def filter_positive(numbers: List[int]) -> List[int]:
    """Filter out negative numbers from the list."""
    return [n for n in numbers if n > 0]
def main() -> None:
    """Main entry point for module {i} file {j}."""
    processor = DataProcessor()
    processor.add_data(10)
    processor.add_data(20)
    result = processor.process()
    print(f"Result: {{result}}")
if __name__ == "__main__":
    main()
'''
            file_path.write_text(code)
    return repo_path
@pytest.fixture
async def db_session() -> AsyncSession:
    """Create async database session for tests."""
    pytest.skip("Database session fixture not implemented yet (requires T019-T027)")
@pytest.mark.integration
@pytest.mark.asyncio
@pytest.mark.slow  # Mark as slow test (can be skipped in CI)
async def test_indexing_10k_files_under_60_seconds_not_implemented(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test indexing 10,000 files in <60 seconds - NOT YET IMPLEMENTED.
    Performance requirement from spec:
    - 10,000 files indexed in <60 seconds
    - This is a critical performance target
    Expected workflow:
    1. Index repository with 10,000 files
    2. Measure total indexing time
    3. Verify time <60 seconds
    4. Verify all files indexed
    5. Verify all chunks created
    This test MUST FAIL until T031 (indexer optimization) is implemented.
    """
    pytest.skip("Indexer service not implemented yet (T031)")
    # Future implementation:
    # from src.services.indexer import index_repository
    #
    # start_time = time.perf_counter()
    #
    # result = await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # duration = time.perf_counter() - start_time
    #
    # # Verify performance target
    # assert duration < 60, f"Indexing took {duration:.2f}s, exceeds 60s target"
    #
    # # Verify indexing success
    # assert result.status == "success"
    # assert result.files_indexed == 10_000
    # assert result.chunks_created > 50_000  # ~5+ chunks per file
    # assert result.duration_seconds < 60
@pytest.mark.integration
@pytest.mark.asyncio
async def test_search_p95_latency_under_500ms_not_implemented(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test search p95 latency <500ms - NOT YET IMPLEMENTED.
    Performance requirement from spec:
    - Search p95 latency <500ms
    - This ensures responsive user experience
    Expected workflow:
    1. Index large repository (prerequisite)
    2. Run 100 diverse search queries
    3. Measure latency for each
    4. Calculate p50, p95, p99 percentiles
    5. Verify p95 <500ms
    This test MUST FAIL until T032 (searcher optimization) is implemented.
    """
    pytest.skip("Search service not implemented yet (T032)")
    # Future implementation:
    # from src.services.indexer import index_repository
    # from src.services.searcher import search_code
    #
    # # Index repository first
    # await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # # Generate diverse search queries
    # queries = [
    #     "calculate average of numbers",
    #     "find maximum value in list",
    #     "filter positive numbers",
    #     "data processor class",
    #     "transform data with multiplier",
    #     "process data and return sum",
    #     "initialize processor with name",
    #     "add value to data list",
    #     "main entry point",
    #     "import statements",
    # ] * 10  # 100 total queries
    #
    # latencies: list[float] = []
    #
    # for query in queries:
    #     start_time = time.perf_counter()
    #     await search_code(query=query, limit=10)
    #     latency_ms = (time.perf_counter() - start_time) * 1000
    #     latencies.append(latency_ms)
    #
    # # Calculate percentiles
    # latencies.sort()
    # p50 = latencies[len(latencies) // 2]
    # p95 = latencies[int(len(latencies) * 0.95)]
    # p99 = latencies[int(len(latencies) * 0.99)]
    #
    # # Verify performance targets
    # assert p95 < 500, f"P95 latency {p95:.0f}ms exceeds 500ms target"
    # assert p99 < 1000, f"P99 latency {p99:.0f}ms exceeds 1000ms target"
    #
    # # Log performance metrics
    # print(f"Search performance: P50={p50:.0f}ms, P95={p95:.0f}ms, P99={p99:.0f}ms")
@pytest.mark.integration
@pytest.mark.asyncio
async def test_concurrent_search_operations(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test concurrent search operations maintain performance.
    Expected behavior:
    - Run 10 concurrent search queries
    - Measure latency for each
    - Verify all complete successfully
    - Verify average latency acceptable
    This test MUST FAIL until T032 (searcher) is implemented.
    """
    pytest.skip("Search service not implemented yet (T032)")
    # Future implementation:
    # from src.services.indexer import index_repository
    # from src.services.searcher import search_code
    #
    # # Index repository
    # await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # async def search_task(query: str) -> float:
    #     """Perform search and return latency in ms."""
    #     start_time = time.perf_counter()
    #     await search_code(query=query, limit=10)
    #     return (time.perf_counter() - start_time) * 1000
    #
    # # Run 10 concurrent searches
    # queries = [f"search query {i}" for i in range(10)]
    # tasks = [search_task(q) for q in queries]
    #
    # latencies = await asyncio.gather(*tasks)
    #
    # # Verify all completed
    # assert len(latencies) == 10
    #
    # # Verify average latency reasonable
    # avg_latency = sum(latencies) / len(latencies)
    # assert avg_latency < 1000, f"Average concurrent latency {avg_latency:.0f}ms too high"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_incremental_update_performance(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test incremental update performance on large repository.
    Expected behavior:
    - Index 10,000 files
    - Modify 10 files
    - Incremental update should be much faster than full index
    - Should complete in <10 seconds
    This test MUST FAIL until T031 (indexer with incremental logic) is implemented.
    """
    pytest.skip("Incremental indexing not implemented yet (T031)")
    # Future implementation:
    # from src.services.indexer import index_repository
    #
    # # Initial full index
    # await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # # Modify 10 files
    # for i in range(10):
    #     file_path = large_test_repository / f"module_000" / f"file_{i:03d}.py"
    #     content = file_path.read_text()
    #     file_path.write_text(content + "\n# Modified\n")
    #
    # # Incremental update
    # start_time = time.perf_counter()
    # result = await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=False,
    # )
    # duration = time.perf_counter() - start_time
    #
    # # Verify performance
    # assert duration < 10, f"Incremental update took {duration:.2f}s, should be <10s"
    # assert result.files_indexed == 10  # Only modified files
@pytest.mark.integration
@pytest.mark.asyncio
async def test_memory_usage_during_large_indexing(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test memory usage stays reasonable during large repository indexing.
    Expected behavior:
    - Index 10,000 files
    - Memory usage should not grow unbounded
    - Verify batching and streaming work correctly
    This test MUST FAIL until T031 (indexer with batching) is implemented.
    """
    pytest.skip("Indexer batching not implemented yet (T031)")
    # Future implementation:
    # import psutil
    # import os
    # from src.services.indexer import index_repository
    #
    # process = psutil.Process(os.getpid())
    # initial_memory = process.memory_info().rss / 1024 / 1024  # MB
    #
    # # Index large repository
    # await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # final_memory = process.memory_info().rss / 1024 / 1024  # MB
    # memory_increase = final_memory - initial_memory
    #
    # # Verify memory usage reasonable (< 1GB increase)
    # assert memory_increase < 1024, f"Memory increased by {memory_increase:.0f}MB, too high"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_database_query_performance(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test database query performance with large dataset.
    Expected behavior:
    - Index 10,000 files (~50,000+ chunks)
    - Run complex database queries
    - Verify queries complete in reasonable time
    - Verify indexes are working
    This test MUST FAIL until T026 (migration with indexes) is implemented.
    """
    pytest.skip("Database indexes not implemented yet (T026)")
    # Future implementation:
    # from src.services.indexer import index_repository
    # from src.models.code_chunk import CodeChunk
    # from sqlalchemy import select, func
    #
    # # Index repository
    # await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # # Test query: Count all chunks
    # start_time = time.perf_counter()
    # stmt = select(func.count()).select_from(CodeChunk)
    # result = await db_session.scalar(stmt)
    # duration_ms = (time.perf_counter() - start_time) * 1000
    #
    # assert result > 50_000  # Should have many chunks
    # assert duration_ms < 100, f"Count query took {duration_ms:.0f}ms, too slow"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_embedding_generation_throughput(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test embedding generation throughput with batching.
    Expected behavior:
    - Generate embeddings for 50,000+ chunks
    - Batching should improve throughput
    - Verify reasonable generation rate
    This test MUST FAIL until T030 (embedder with batching) is implemented.
    """
    pytest.skip("Embedder batching not implemented yet (T030)")
    # Future implementation:
    # from src.services.indexer import index_repository
    #
    # start_time = time.perf_counter()
    #
    # result = await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # duration = time.perf_counter() - start_time
    #
    # # Calculate throughput
    # chunks_per_second = result.chunks_created / duration
    #
    # # Verify reasonable throughput (>800 chunks/sec with batching)
    # assert chunks_per_second > 800, (
    #     f"Throughput {chunks_per_second:.0f} chunks/sec too low, "
    #     "batching may not be working"
    # )
@pytest.mark.integration
@pytest.mark.asyncio
async def test_search_result_limit_performance(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test that search limit parameter affects performance appropriately.
    Expected behavior:
    - Search with limit=10 should be fast
    - Search with limit=100 should be slower but still reasonable
    - Verify limit is applied at database level (not post-filtering)
    This test MUST FAIL until T032 (searcher) is implemented.
    """
    pytest.skip("Search service not implemented yet (T032)")
    # Future implementation:
    # from src.services.indexer import index_repository
    # from src.services.searcher import search_code
    #
    # # Index repository
    # await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    #
    # # Search with small limit
    # start_time = time.perf_counter()
    # results_small = await search_code(query="function", limit=10)
    # latency_small = (time.perf_counter() - start_time) * 1000
    #
    # # Search with larger limit
    # start_time = time.perf_counter()
    # results_large = await search_code(query="function", limit=100)
    # latency_large = (time.perf_counter() - start_time) * 1000
    #
    # # Verify limits respected
    # assert len(results_small) <= 10
    # assert len(results_large) <= 100
    #
    # # Verify performance reasonable for both
    # assert latency_small < 500, f"Small limit search took {latency_small:.0f}ms"
    # assert latency_large < 1000, f"Large limit search took {latency_large:.0f}ms"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_performance_regression_baseline(
    large_test_repository: Path,
    db_session: AsyncSession,
) -> None:
    """
    Test performance baseline for regression detection.
    This test establishes performance baselines:
    - Indexing time for 10K files
    - Search p95 latency
    - Memory usage
    - Database query performance
    Expected behavior:
    - All metrics within expected ranges
    - Can be used to detect regressions in future
    This test MUST FAIL until T031, T032 are implemented.
    """
    pytest.skip("Indexer and searcher not implemented yet (T031, T032)")
    # Future implementation:
    # from src.services.indexer import index_repository
    # from src.services.searcher import search_code
    #
    # # Baseline: Indexing
    # start_time = time.perf_counter()
    # result = await index_repository(
    #     path=large_test_repository,
    #     name="Large Test Repository",
    #     force_reindex=True,
    # )
    # indexing_time = time.perf_counter() - start_time
    #
    # # Baseline: Search
    # queries = ["function", "class", "data", "process", "calculate"] * 20
    # latencies = []
    # for query in queries:
    #     start_time = time.perf_counter()
    #     await search_code(query=query, limit=10)
    #     latencies.append((time.perf_counter() - start_time) * 1000)
    #
    # latencies.sort()
    # search_p95 = latencies[int(len(latencies) * 0.95)]
    #
    # # Print baselines for reference
    # print(f"Performance Baseline:")
    # print(f"  Indexing (10K files): {indexing_time:.2f}s")
    # print(f"  Search P95: {search_p95:.0f}ms")
    # print(f"  Files indexed: {result.files_indexed}")
    # print(f"  Chunks created: {result.chunks_created}")
    #
    # # Verify baselines
    # assert indexing_time < 60
    # assert search_p95 < 500