"""Integration tests for the complete embedding generation pipeline.
These tests verify the end-to-end flow from file processing through chunk creation
to embedding generation and storage. They should be added to catch pipeline issues.
"""
import pytest
import asyncio
from pathlib import Path
from chunkhound.database_factory import create_services
from chunkhound.core.config.config import Config
from chunkhound.services.embedding_service import EmbeddingService
from .test_utils import get_api_key_for_tests
@pytest.fixture
async def pipeline_services(tmp_path):
"""Create database services for pipeline testing."""
db_path = tmp_path / "pipeline_test.duckdb"
# Standard API key discovery
api_key, provider = get_api_key_for_tests()
if not api_key:
pytest.skip("No embedding API key available for pipeline integration test")
# Standard embedding config
model = "text-embedding-3-small" if provider == "openai" else "voyage-3.5"
embedding_config = {
"provider": provider,
"api_key": api_key,
"model": model
}
# Standard config creation
config = Config(
database={"path": str(db_path), "provider": "duckdb"},
embedding=embedding_config
)
# Set target_dir after initialization since it's an excluded field
config.target_dir = tmp_path
# Standard service creation
services = create_services(db_path, config)
yield services
@pytest.mark.asyncio
async def test_complete_pipeline_file_to_embeddings(pipeline_services, tmp_path):
"""Test complete pipeline: file → chunks → embeddings."""
services = pipeline_services
# Create test file with various code structures
test_file = tmp_path / "pipeline_test.py"
test_file.write_text("""
'''Module docstring for pipeline testing.'''
def pipeline_function():
'''A function to test the complete pipeline.'''
return "pipeline test"
class PipelineClass:
'''A class to test embedding generation.'''
def __init__(self):
'''Constructor for pipeline testing.'''
self.value = "pipeline"
def pipeline_method(self):
'''Method for pipeline verification.'''
return self.value + " method"
@staticmethod
def static_pipeline_method():
'''Static method for pipeline testing.'''
return "static pipeline"
async def async_pipeline_function():
'''Async function for pipeline testing.'''
await asyncio.sleep(0.1)
return "async pipeline"
""")
# Get initial state
initial_stats = await services.indexing_coordinator.get_stats()
# Process file through complete pipeline
result = await services.indexing_coordinator.process_file(test_file, skip_embeddings=False)
# Verify file processing succeeded
assert result['status'] == 'success', f"Pipeline processing failed: {result.get('error')}"
assert result['chunks'] > 0, "Should create chunks"
assert result.get('embeddings_skipped', True) == False, "Should not skip embeddings"
# Wait for any async embedding processing
await asyncio.sleep(3.0)
# Verify pipeline results
final_stats = await services.indexing_coordinator.get_stats()
chunks_created = final_stats['chunks'] - initial_stats['chunks']
embeddings_created = final_stats.get('embeddings', 0) - initial_stats.get('embeddings', 0)
# Critical pipeline verification
assert chunks_created > 0, f"Expected chunks to be created, got {chunks_created}"
assert embeddings_created > 0, f"Expected embeddings to be created, got {embeddings_created}"
assert embeddings_created == chunks_created, \
f"Embeddings ({embeddings_created}) should match chunks ({chunks_created})"
@pytest.mark.asyncio
async def test_pipeline_with_skip_then_generate(pipeline_services, tmp_path):
"""Test two-phase pipeline: skip embeddings, then generate them."""
services = pipeline_services
# Create test file
test_file = tmp_path / "two_phase_test.py"
test_file.write_text("""
def two_phase_function():
'''Function for two-phase pipeline testing.'''
return "two phase test"
class TwoPhaseClass:
'''Class for two-phase testing.'''
pass
""")
# Phase 1: Process with skip_embeddings=True
result1 = await services.indexing_coordinator.process_file(test_file, skip_embeddings=True)
assert result1['status'] == 'success'
assert result1['embeddings_skipped'] == True
assert result1['chunks'] > 0
# Verify only chunks exist, no embeddings yet
stats_phase1 = await services.indexing_coordinator.get_stats()
chunks_after_phase1 = stats_phase1['chunks']
embeddings_after_phase1 = stats_phase1.get('embeddings', 0)
# Verify embeddings were skipped in phase 1
assert embeddings_after_phase1 == 0, f"Expected 0 embeddings after skip_embeddings=True, got {embeddings_after_phase1}"
# Phase 2: Generate missing embeddings
embedding_result = await services.indexing_coordinator.generate_missing_embeddings()
assert embedding_result['status'] in ['success', 'complete'], f"Embedding generation failed: {embedding_result.get('error')}"
# If status is 'complete', it means all chunks already have embeddings (unexpected but handle gracefully)
if embedding_result['status'] == 'success':
assert embedding_result['generated'] > 0, "Should generate embeddings when status is success"
elif embedding_result['status'] == 'complete':
# This shouldn't happen given we verified no embeddings exist, but handle it
assert embedding_result['generated'] == 0, "Should not generate embeddings when status is complete"
# Verify embeddings were created (only if we expected them to be generated)
stats_phase2 = await services.indexing_coordinator.get_stats()
embeddings_after_phase2 = stats_phase2.get('embeddings', 0)
embeddings_generated = embeddings_after_phase2 - embeddings_after_phase1
if embedding_result['status'] == 'success':
assert embeddings_after_phase2 > embeddings_after_phase1, "Should have more embeddings after generation"
assert embeddings_generated > 0, f"Expected embeddings to be generated, got {embeddings_generated}"
assert embeddings_after_phase2 == chunks_after_phase1, \
f"Final embeddings ({embeddings_after_phase2}) should match chunks ({chunks_after_phase1})"
elif embedding_result['status'] == 'complete':
# This means chunks already had embeddings (unexpected scenario)
# Just verify the system is in a consistent state
print(f"⚠️ Unexpected: chunks already had embeddings (phase1: {embeddings_after_phase1}, phase2: {embeddings_after_phase2})")
assert embeddings_after_phase2 >= embeddings_after_phase1, "Embedding count should not decrease"
@pytest.mark.asyncio
async def test_pipeline_error_recovery(pipeline_services, tmp_path):
"""Test that pipeline recovers from errors gracefully."""
services = pipeline_services
# Create file with valid content
test_file = tmp_path / "recovery_test.py"
test_file.write_text("""
def valid_function():
return "valid"
""")
# Process normally first
result = await services.indexing_coordinator.process_file(test_file, skip_embeddings=False)
assert result['status'] == 'success'
# Wait for processing
await asyncio.sleep(1.0)
# Verify system is still functional after any potential errors
stats = await services.indexing_coordinator.get_stats()
assert stats['chunks'] > 0
# Try processing another file to ensure system recovered
test_file2 = tmp_path / "recovery_test2.py"
test_file2.write_text("""
def another_valid_function():
return "also valid"
""")
result2 = await services.indexing_coordinator.process_file(test_file2, skip_embeddings=False)
assert result2['status'] == 'success', "System should recover and continue processing"
@pytest.mark.asyncio
async def test_embedding_service_direct_integration(pipeline_services, tmp_path):
"""Test EmbeddingService integration directly."""
services = pipeline_services
# Create test file and process it with skip_embeddings=True
test_file = tmp_path / "embedding_service_test.py"
test_file.write_text("""
def embedding_service_function():
'''Function for testing EmbeddingService directly.'''
return "embedding service test"
""")
# Process file to create chunks without embeddings
result = await services.indexing_coordinator.process_file(test_file, skip_embeddings=True)
assert result['status'] == 'success'
assert result['chunks'] > 0
# Use EmbeddingService directly to generate embeddings
embedding_service = services.embedding_service
# Generate missing embeddings
embedding_result = await embedding_service.generate_missing_embeddings()
assert embedding_result['status'] in ['success', 'complete'], f"EmbeddingService failed: {embedding_result.get('error')}"
# Handle both success and complete cases
if embedding_result['status'] == 'success':
assert embedding_result['generated'] > 0, "EmbeddingService should generate embeddings when status is success"
elif embedding_result['status'] == 'complete':
assert embedding_result['generated'] == 0, "EmbeddingService should not generate when status is complete"
# Verify consistency (only if embeddings were actually generated)
stats = await services.indexing_coordinator.get_stats()
if embedding_result['status'] == 'success':
assert stats.get('embeddings', 0) == stats['chunks'], \
"EmbeddingService should achieve embedding/chunk consistency when generating"
elif embedding_result['status'] == 'complete':
# When status is complete, we don't enforce consistency since no work was done
print(f"⚠️ Status 'complete': {stats.get('embeddings', 0)} embeddings, {stats['chunks']} chunks")
@pytest.mark.asyncio
async def test_pipeline_batch_processing(pipeline_services, tmp_path):
"""Test pipeline with batch processing of multiple files."""
services = pipeline_services
# Create multiple test files
test_files = []
for i in range(5):
test_file = tmp_path / f"batch_test_{i}.py"
test_file.write_text(f"""
def batch_function_{i}():
'''Function {i} for batch testing.'''
return "batch test {i}"
class BatchClass_{i}:
'''Class {i} for batch testing.'''
def method_{i}(self):
return "method {i}"
""")
test_files.append(test_file)
# Process all files in batch mode (skip embeddings initially)
processed_chunks = 0
for test_file in test_files:
result = await services.indexing_coordinator.process_file(test_file, skip_embeddings=True)
assert result['status'] == 'success'
processed_chunks += result['chunks']
# Verify chunks were created
stats_after_chunking = await services.indexing_coordinator.get_stats()
assert stats_after_chunking['chunks'] >= processed_chunks
# Generate embeddings for all chunks
embedding_result = await services.indexing_coordinator.generate_missing_embeddings()
assert embedding_result['status'] in ['success', 'complete']
# Handle both success and complete cases
if embedding_result['status'] == 'success':
assert embedding_result['generated'] > 0, "Should generate embeddings when status is success"
elif embedding_result['status'] == 'complete':
assert embedding_result['generated'] == 0, "Should not generate when status is complete"
# Wait for batch processing
await asyncio.sleep(3.0)
# Verify final consistency (only if embeddings were expected to be generated)
final_stats = await services.indexing_coordinator.get_stats()
if embedding_result['status'] == 'success':
assert final_stats.get('embeddings', 0) == final_stats['chunks'], \
f"Batch processing should achieve consistency when generating: " \
f"{final_stats.get('embeddings', 0)} embeddings vs {final_stats['chunks']} chunks"
elif embedding_result['status'] == 'complete':
# When status is complete, we don't enforce strict consistency
print(f"⚠️ Batch status 'complete': {final_stats.get('embeddings', 0)} embeddings, {final_stats['chunks']} chunks")
@pytest.mark.asyncio
async def test_pipeline_file_modification_embeddings(pipeline_services, tmp_path):
"""Test that file modifications properly update embeddings."""
services = pipeline_services
# Create initial file
test_file = tmp_path / "modification_test.py"
test_file.write_text("""
def original_function():
'''Original function.'''
return "original"
""")
# Process initial file
result1 = await services.indexing_coordinator.process_file(test_file, skip_embeddings=False)
assert result1['status'] == 'success'
# Wait for initial processing
await asyncio.sleep(2.0)
initial_stats = await services.indexing_coordinator.get_stats()
# Modify file by adding new function (truly additive)
original_content = test_file.read_text()
test_file.write_text(original_content + """
def new_function():
'''Newly added function.'''
return "new"
""")
# Process modified file
result2 = await services.indexing_coordinator.process_file(test_file, skip_embeddings=False)
assert result2['status'] == 'success'
# Wait for modification processing
await asyncio.sleep(3.0)
# Verify embeddings were updated/added
final_stats = await services.indexing_coordinator.get_stats()
# Should have same or more chunks (smart diff may replace if content changed)
assert final_stats['chunks'] >= initial_stats['chunks'], \
"Should have same or more chunks after file modification"
# Verify embedding consistency
if 'embeddings' in final_stats:
assert final_stats['embeddings'] == final_stats['chunks'], \
f"Modified file should maintain embedding consistency: " \
f"{final_stats['embeddings']} embeddings vs {final_stats['chunks']} chunks"
if __name__ == "__main__":
# Run pipeline integration tests
import subprocess
subprocess.run(["python", "-m", "pytest", __file__, "-v", "--tb=short"])