"""
Integration tests for performance validation (Scenario 7).
Tests verify (from quickstart.md):
- Performance requirements (FR-034, FR-035)
- Indexing: 10,000 files in <60 seconds
- Search: p95 latency <500ms
- Load testing with concurrent operations
- Performance regression detection
TDD Compliance: These tests MUST FAIL initially since services are not implemented yet.
"""
from __future__ import annotations
import asyncio
import random
import time
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
if TYPE_CHECKING:
from src.services.indexer import IndexResult
from src.services.searcher import SearchResult
@pytest.fixture
async def large_test_repository(tmp_path: Path) -> Path:
"""
Generate a large test repository with 10,000 Python files.
Each file contains realistic Python code with functions, classes,
and docstrings for comprehensive indexing and search testing.
"""
repo_path = tmp_path / "large-repo"
repo_path.mkdir()
# Create directory structure
for i in range(100):
module_dir = repo_path / f"module_{i:03d}"
module_dir.mkdir()
# Create 100 files per module (100 * 100 = 10,000 files)
for j in range(100):
file_path = module_dir / f"file_{j:03d}.py"
# Generate realistic Python code
code = f'''"""Module {i} File {j} - Generated test code."""
import os
import sys
from typing import List, Dict, Optional
class DataProcessor:
"""Process data with various transformations."""
def __init__(self, name: str = "processor_{i}_{j}") -> None:
"""Initialize processor with name."""
self.name = name
self.data: List[int] = []
def add_data(self, value: int) -> None:
"""Add a value to the data list."""
self.data.append(value)
def process(self) -> int:
"""Process the data and return sum."""
return sum(self.data)
def transform(self, multiplier: int = 2) -> List[int]:
"""Transform data by multiplying each value."""
return [x * multiplier for x in self.data]
def calculate_average(numbers: List[int]) -> float:
"""Calculate the average of a list of numbers."""
if not numbers:
return 0.0
return sum(numbers) / len(numbers)
def find_maximum(numbers: List[int]) -> Optional[int]:
"""Find the maximum value in a list."""
if not numbers:
return None
return max(numbers)
def filter_positive(numbers: List[int]) -> List[int]:
"""Filter out negative numbers from the list."""
return [n for n in numbers if n > 0]
def main() -> None:
"""Main entry point for module {i} file {j}."""
processor = DataProcessor()
processor.add_data(10)
processor.add_data(20)
result = processor.process()
print(f"Result: {{result}}")
if __name__ == "__main__":
main()
'''
file_path.write_text(code)
return repo_path
@pytest.fixture
async def db_session() -> AsyncSession:
"""Create async database session for tests."""
pytest.skip("Database session fixture not implemented yet (requires T019-T027)")
@pytest.mark.integration
@pytest.mark.asyncio
@pytest.mark.slow # Mark as slow test (can be skipped in CI)
async def test_indexing_10k_files_under_60_seconds_not_implemented(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test indexing 10,000 files in <60 seconds - NOT YET IMPLEMENTED.
Performance requirement from spec:
- 10,000 files indexed in <60 seconds
- This is a critical performance target
Expected workflow:
1. Index repository with 10,000 files
2. Measure total indexing time
3. Verify time <60 seconds
4. Verify all files indexed
5. Verify all chunks created
This test MUST FAIL until T031 (indexer optimization) is implemented.
"""
pytest.skip("Indexer service not implemented yet (T031)")
# Future implementation:
# from src.services.indexer import index_repository
#
# start_time = time.perf_counter()
#
# result = await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# duration = time.perf_counter() - start_time
#
# # Verify performance target
# assert duration < 60, f"Indexing took {duration:.2f}s, exceeds 60s target"
#
# # Verify indexing success
# assert result.status == "success"
# assert result.files_indexed == 10_000
# assert result.chunks_created > 50_000 # ~5+ chunks per file
# assert result.duration_seconds < 60
@pytest.mark.integration
@pytest.mark.asyncio
async def test_search_p95_latency_under_500ms_not_implemented(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test search p95 latency <500ms - NOT YET IMPLEMENTED.
Performance requirement from spec:
- Search p95 latency <500ms
- This ensures responsive user experience
Expected workflow:
1. Index large repository (prerequisite)
2. Run 100 diverse search queries
3. Measure latency for each
4. Calculate p50, p95, p99 percentiles
5. Verify p95 <500ms
This test MUST FAIL until T032 (searcher optimization) is implemented.
"""
pytest.skip("Search service not implemented yet (T032)")
# Future implementation:
# from src.services.indexer import index_repository
# from src.services.searcher import search_code
#
# # Index repository first
# await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# # Generate diverse search queries
# queries = [
# "calculate average of numbers",
# "find maximum value in list",
# "filter positive numbers",
# "data processor class",
# "transform data with multiplier",
# "process data and return sum",
# "initialize processor with name",
# "add value to data list",
# "main entry point",
# "import statements",
# ] * 10 # 100 total queries
#
# latencies: list[float] = []
#
# for query in queries:
# start_time = time.perf_counter()
# await search_code(query=query, limit=10)
# latency_ms = (time.perf_counter() - start_time) * 1000
# latencies.append(latency_ms)
#
# # Calculate percentiles
# latencies.sort()
# p50 = latencies[len(latencies) // 2]
# p95 = latencies[int(len(latencies) * 0.95)]
# p99 = latencies[int(len(latencies) * 0.99)]
#
# # Verify performance targets
# assert p95 < 500, f"P95 latency {p95:.0f}ms exceeds 500ms target"
# assert p99 < 1000, f"P99 latency {p99:.0f}ms exceeds 1000ms target"
#
# # Log performance metrics
# print(f"Search performance: P50={p50:.0f}ms, P95={p95:.0f}ms, P99={p99:.0f}ms")
@pytest.mark.integration
@pytest.mark.asyncio
async def test_concurrent_search_operations(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test concurrent search operations maintain performance.
Expected behavior:
- Run 10 concurrent search queries
- Measure latency for each
- Verify all complete successfully
- Verify average latency acceptable
This test MUST FAIL until T032 (searcher) is implemented.
"""
pytest.skip("Search service not implemented yet (T032)")
# Future implementation:
# from src.services.indexer import index_repository
# from src.services.searcher import search_code
#
# # Index repository
# await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# async def search_task(query: str) -> float:
# """Perform search and return latency in ms."""
# start_time = time.perf_counter()
# await search_code(query=query, limit=10)
# return (time.perf_counter() - start_time) * 1000
#
# # Run 10 concurrent searches
# queries = [f"search query {i}" for i in range(10)]
# tasks = [search_task(q) for q in queries]
#
# latencies = await asyncio.gather(*tasks)
#
# # Verify all completed
# assert len(latencies) == 10
#
# # Verify average latency reasonable
# avg_latency = sum(latencies) / len(latencies)
# assert avg_latency < 1000, f"Average concurrent latency {avg_latency:.0f}ms too high"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_incremental_update_performance(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test incremental update performance on large repository.
Expected behavior:
- Index 10,000 files
- Modify 10 files
- Incremental update should be much faster than full index
- Should complete in <10 seconds
This test MUST FAIL until T031 (indexer with incremental logic) is implemented.
"""
pytest.skip("Incremental indexing not implemented yet (T031)")
# Future implementation:
# from src.services.indexer import index_repository
#
# # Initial full index
# await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# # Modify 10 files
# for i in range(10):
# file_path = large_test_repository / f"module_000" / f"file_{i:03d}.py"
# content = file_path.read_text()
# file_path.write_text(content + "\n# Modified\n")
#
# # Incremental update
# start_time = time.perf_counter()
# result = await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=False,
# )
# duration = time.perf_counter() - start_time
#
# # Verify performance
# assert duration < 10, f"Incremental update took {duration:.2f}s, should be <10s"
# assert result.files_indexed == 10 # Only modified files
@pytest.mark.integration
@pytest.mark.asyncio
async def test_memory_usage_during_large_indexing(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test memory usage stays reasonable during large repository indexing.
Expected behavior:
- Index 10,000 files
- Memory usage should not grow unbounded
- Verify batching and streaming work correctly
This test MUST FAIL until T031 (indexer with batching) is implemented.
"""
pytest.skip("Indexer batching not implemented yet (T031)")
# Future implementation:
# import psutil
# import os
# from src.services.indexer import index_repository
#
# process = psutil.Process(os.getpid())
# initial_memory = process.memory_info().rss / 1024 / 1024 # MB
#
# # Index large repository
# await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# final_memory = process.memory_info().rss / 1024 / 1024 # MB
# memory_increase = final_memory - initial_memory
#
# # Verify memory usage reasonable (< 1GB increase)
# assert memory_increase < 1024, f"Memory increased by {memory_increase:.0f}MB, too high"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_database_query_performance(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test database query performance with large dataset.
Expected behavior:
- Index 10,000 files (~50,000+ chunks)
- Run complex database queries
- Verify queries complete in reasonable time
- Verify indexes are working
This test MUST FAIL until T026 (migration with indexes) is implemented.
"""
pytest.skip("Database indexes not implemented yet (T026)")
# Future implementation:
# from src.services.indexer import index_repository
# from src.models.code_chunk import CodeChunk
# from sqlalchemy import select, func
#
# # Index repository
# await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# # Test query: Count all chunks
# start_time = time.perf_counter()
# stmt = select(func.count()).select_from(CodeChunk)
# result = await db_session.scalar(stmt)
# duration_ms = (time.perf_counter() - start_time) * 1000
#
# assert result > 50_000 # Should have many chunks
# assert duration_ms < 100, f"Count query took {duration_ms:.0f}ms, too slow"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_embedding_generation_throughput(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test embedding generation throughput with batching.
Expected behavior:
- Generate embeddings for 50,000+ chunks
- Batching should improve throughput
- Verify reasonable generation rate
This test MUST FAIL until T030 (embedder with batching) is implemented.
"""
pytest.skip("Embedder batching not implemented yet (T030)")
# Future implementation:
# from src.services.indexer import index_repository
#
# start_time = time.perf_counter()
#
# result = await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# duration = time.perf_counter() - start_time
#
# # Calculate throughput
# chunks_per_second = result.chunks_created / duration
#
# # Verify reasonable throughput (>800 chunks/sec with batching)
# assert chunks_per_second > 800, (
# f"Throughput {chunks_per_second:.0f} chunks/sec too low, "
# "batching may not be working"
# )
@pytest.mark.integration
@pytest.mark.asyncio
async def test_search_result_limit_performance(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test that search limit parameter affects performance appropriately.
Expected behavior:
- Search with limit=10 should be fast
- Search with limit=100 should be slower but still reasonable
- Verify limit is applied at database level (not post-filtering)
This test MUST FAIL until T032 (searcher) is implemented.
"""
pytest.skip("Search service not implemented yet (T032)")
# Future implementation:
# from src.services.indexer import index_repository
# from src.services.searcher import search_code
#
# # Index repository
# await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
#
# # Search with small limit
# start_time = time.perf_counter()
# results_small = await search_code(query="function", limit=10)
# latency_small = (time.perf_counter() - start_time) * 1000
#
# # Search with larger limit
# start_time = time.perf_counter()
# results_large = await search_code(query="function", limit=100)
# latency_large = (time.perf_counter() - start_time) * 1000
#
# # Verify limits respected
# assert len(results_small) <= 10
# assert len(results_large) <= 100
#
# # Verify performance reasonable for both
# assert latency_small < 500, f"Small limit search took {latency_small:.0f}ms"
# assert latency_large < 1000, f"Large limit search took {latency_large:.0f}ms"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_performance_regression_baseline(
large_test_repository: Path,
db_session: AsyncSession,
) -> None:
"""
Test performance baseline for regression detection.
This test establishes performance baselines:
- Indexing time for 10K files
- Search p95 latency
- Memory usage
- Database query performance
Expected behavior:
- All metrics within expected ranges
- Can be used to detect regressions in future
This test MUST FAIL until T031, T032 are implemented.
"""
pytest.skip("Indexer and searcher not implemented yet (T031, T032)")
# Future implementation:
# from src.services.indexer import index_repository
# from src.services.searcher import search_code
#
# # Baseline: Indexing
# start_time = time.perf_counter()
# result = await index_repository(
# path=large_test_repository,
# name="Large Test Repository",
# force_reindex=True,
# )
# indexing_time = time.perf_counter() - start_time
#
# # Baseline: Search
# queries = ["function", "class", "data", "process", "calculate"] * 20
# latencies = []
# for query in queries:
# start_time = time.perf_counter()
# await search_code(query=query, limit=10)
# latencies.append((time.perf_counter() - start_time) * 1000)
#
# latencies.sort()
# search_p95 = latencies[int(len(latencies) * 0.95)]
#
# # Print baselines for reference
# print(f"Performance Baseline:")
# print(f" Indexing (10K files): {indexing_time:.2f}s")
# print(f" Search P95: {search_p95:.0f}ms")
# print(f" Files indexed: {result.files_indexed}")
# print(f" Chunks created: {result.chunks_created}")
#
# # Verify baselines
# assert indexing_time < 60
# assert search_p95 < 500