"""Deterministic QA test suite for ChunkHound search functionality.
This test converts the manual QA process from .claude/commands/qa.md into
deterministic automated tests. Tests semantic_search and regex_search tools
with real-time indexing using actual MCP server components.
No mocks - tests the full integration path users experience.
"""
import asyncio
import tempfile
import time
import os
from pathlib import Path
import pytest
import shutil
import subprocess
import sys
from typing import Dict, List, Any
from chunkhound.core.config.config import Config
from chunkhound.core.types.common import Language
from chunkhound.database_factory import create_services
from chunkhound.services.realtime_indexing_service import RealtimeIndexingService
from chunkhound.mcp_server.tools import execute_tool
from .test_utils import get_api_key_for_tests
class TestQADeterministic:
"""Deterministic QA test suite - converts manual testing into automated validation."""
@pytest.fixture
async def qa_setup(self):
"""Setup QA test environment with real services."""
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / ".chunkhound" / "test.db"
watch_dir = temp_dir / "project"
watch_dir.mkdir(parents=True)
# Ensure database directory exists
db_path.parent.mkdir(parents=True, exist_ok=True)
# Standard API key discovery for multi-provider support
api_key, provider = get_api_key_for_tests()
# Create embedding config only when explicitly enabled.
#
# This test suite is intended to be deterministic and fast. When real API
# keys are present, enabling embeddings can drastically increase runtime
# (network calls + embedding generation), often exceeding pytest-timeout.
embedding_config = None
if api_key and provider and (os.getenv("CH_TEST_QA_ENABLE_EMBEDDINGS") == "1"):
model = "text-embedding-3-small" if provider == "openai" else "voyage-3.5"
embedding_config = {
"provider": provider,
"api_key": api_key,
"model": model
}
# Use fake args to prevent find_project_root call that fails in CI
from types import SimpleNamespace
fake_args = SimpleNamespace(path=temp_dir)
config = Config(
args=fake_args,
database={"path": str(db_path), "provider": "duckdb"},
embedding=embedding_config,
indexing={"include": ["*"], "exclude": ["*.log", "__pycache__/"]} # More inclusive for QA
)
# Create services - real MCP server components
services = create_services(db_path, config)
services.provider.connect()
# Initialize realtime indexing service
realtime_service = RealtimeIndexingService(services, config)
await realtime_service.start(watch_dir)
# Wait for initial scan
await asyncio.sleep(2.0)
yield services, realtime_service, watch_dir, temp_dir
# Cleanup
try:
await realtime_service.stop()
except Exception:
pass
try:
services.provider.close()
except Exception:
pass
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.asyncio
async def test_file_lifecycle_search_validation(self, qa_setup):
"""QA Items 1-4: Test file lifecycle with search validation."""
services, realtime_service, watch_dir, _ = qa_setup
# QA Item 1: Pick specific existing file and search for it
existing_file = watch_dir / "existing_test.py"
existing_content = """def existing_function():
'''This is an existing function for QA testing'''
return "existing_content"
class ExistingClass:
def existing_method(self):
return "existing_method_result"
"""
existing_file.write_text(existing_content)
await asyncio.sleep(3.0) # Wait for processing (extended for Ollama)
# Search for existing content
existing_regex = await execute_tool("search_regex", services, None, {
"pattern": "existing_function",
"page_size": 10,
"offset": 0
})
# Try semantic search if available, skip if not
existing_semantic = None
try:
existing_semantic = await execute_tool("search_semantic", services, None, {
"query": "existing function QA testing",
"page_size": 10,
"offset": 0
})
semantic_count = len(existing_semantic.get('results', []))
except Exception as e:
print(f"ā Semantic search skipped: {e}")
semantic_count = "N/A"
assert len(existing_regex.get('results', [])) > 0, "Should find existing file content with regex"
print(f"ā Existing file search: regex={len(existing_regex.get('results', []))}, semantic={semantic_count}")
# QA Item 2: Add new file and search for it
new_file = watch_dir / "new_added_file.py"
new_content = """def newly_added_function():
'''This function was just added for QA validation'''
return "newly_added_content_unique_string"
class NewlyAddedClass:
def new_method(self):
return "new_method_qa_test"
"""
new_file.write_text(new_content)
await asyncio.sleep(3.5) # Wait for debounce + processing
# Search for new content
new_regex = await execute_tool("search_regex", services, None, {
"pattern": "newly_added_content_unique_string",
"page_size": 10,
"offset": 0
})
# Try semantic search if available
try:
new_semantic = await execute_tool("search_semantic", services, None, {
"query": "newly added function QA validation",
"page_size": 10,
"offset": 0
})
new_semantic_count = len(new_semantic.get('results', []))
except Exception:
new_semantic_count = "N/A"
assert len(new_regex.get('results', [])) > 0, "Should find newly added file content with regex"
print(f"ā New file search: regex={len(new_regex.get('results', []))}, semantic={new_semantic_count}")
# QA Item 3: Edit existing file - adding, deleting, and modifying content
# 3a: Add content to existing file
modified_content = existing_content + """
def added_during_edit():
'''This function was added during file edit'''
return "added_content_edit_qa"
"""
existing_file.write_text(modified_content)
await asyncio.sleep(3.5)
added_regex = await execute_tool("search_regex", services, None, {
"pattern": "added_content_edit_qa",
"page_size": 10,
"offset": 0
})
assert len(added_regex.get('results', [])) > 0, "Should find content added during edit"
print("ā Edit (add content): Found added content")
# 3b: Delete some content and modify existing
deleted_and_modified_content = """def existing_function():
'''This function was MODIFIED during edit'''
return "MODIFIED_existing_content"
def added_during_edit():
'''This function was added during file edit'''
return "added_content_edit_qa"
# Note: ExistingClass was DELETED
"""
existing_file.write_text(deleted_and_modified_content)
await asyncio.sleep(3.5)
# Check modification worked
modified_regex = await execute_tool("search_regex", services, None, {
"pattern": "MODIFIED_existing_content",
"page_size": 10,
"offset": 0
})
# Check deletion worked - search for the actual class definition
deleted_regex = await execute_tool("search_regex", services, None, {
"pattern": "class ExistingClass:",
"page_size": 10,
"offset": 0
})
assert len(modified_regex.get('results', [])) > 0, "Should find modified content"
assert len(deleted_regex.get('results', [])) == 0, "Should not find deleted content"
print("ā Edit (modify/delete): Found modified content, deleted content removed")
# QA Item 4: Delete file and verify search results
delete_target = new_file # Delete the new file we created
delete_target.unlink()
await asyncio.sleep(3.5)
# Search for deleted file content
deleted_file_regex = await execute_tool("search_regex", services, None, {
"pattern": "newly_added_content_unique_string",
"page_size": 10,
"offset": 0
})
assert len(deleted_file_regex.get('results', [])) == 0, "Should not find content from deleted file"
print("ā File deletion: Deleted file content not found in search")
@pytest.mark.asyncio
async def test_language_coverage_comprehensive(self, qa_setup):
"""QA Items 5-6: Test all supported languages and file types."""
services, realtime_service, watch_dir, _ = qa_setup
# Get all supported languages except UNKNOWN
languages_to_test = [lang for lang in Language if lang != Language.UNKNOWN]
# Create language-specific content templates
content_templates = {
Language.PYTHON: 'def qa_test_function():\n """Python QA test"""\n return "python_qa_unique"',
Language.JAVASCRIPT: 'function qaTestFunction() {\n // JavaScript QA test\n return "javascript_qa_unique";\n}',
Language.TYPESCRIPT: 'function qaTestFunction(): string {\n // TypeScript QA test\n return "typescript_qa_unique";\n}',
Language.JAVA: 'public class QATest {\n // Java QA test\n public String test() { return "java_qa_unique"; }\n}',
Language.GO: 'package main\n\n// Go QA test\nfunc qaTestFunction() string {\n return "go_qa_unique"\n}',
Language.RUST: 'fn qa_test_function() -> &\'static str {\n // Rust QA test\n "rust_qa_unique"\n}',
Language.C: '#include <stdio.h>\n\n// C QA test\nchar* qa_test_function() {\n return "c_qa_unique";\n}',
Language.CPP: '#include <string>\n\n// C++ QA test\nstd::string qaTestFunction() {\n return "cpp_qa_unique";\n}',
Language.BASH: '#!/bin/bash\n# Bash QA test\nqa_test_function() {\n echo "bash_qa_unique"\n}',
Language.MARKDOWN: '# QA Test\n\nThis is a **markdown QA test** with `markdown_qa_unique` content.',
Language.JSON: '{\n "qa_test": true,\n "content": "json_qa_unique",\n "type": "qa_validation"\n}',
Language.YAML: 'qa_test: true\ncontent: "yaml_qa_unique"\ntype: qa_validation',
Language.TOML: '[qa_test]\ncontent = "toml_qa_unique"\ntype = "qa_validation"',
Language.TEXT: 'Plain text QA test file.\nContains: text_qa_unique\nFor validation purposes.',
Language.VUE: '''<template>
<div class="qa-test">
<h1>{{ message }}</h1>
</div>
</template>
<script setup>
function qaTestFunction() {
return "vue_qa_unique";
}
</script>
<style scoped>
.qa-test {
color: blue;
}
</style>''',
Language.SVELTE: '''<script lang="ts">
function qaTestFunction() {
return "svelte_qa_unique";
}
</script>
<main>
<h1>Svelte QA Test</h1>
</main>
<style>
h1 {
color: blue;
}
</style>''',
}
# Create extension mapping for file creation
extension_map = {
Language.PYTHON: ".py",
Language.JAVASCRIPT: ".js",
Language.TYPESCRIPT: ".ts",
Language.TSX: ".tsx",
Language.JSX: ".jsx",
Language.JAVA: ".java",
Language.CSHARP: ".cs",
Language.GO: ".go",
Language.RUST: ".rs",
Language.C: ".c",
Language.CPP: ".cpp",
Language.BASH: ".sh",
Language.MARKDOWN: ".md",
Language.JSON: ".json",
Language.YAML: ".yaml",
Language.TOML: ".toml",
Language.TEXT: ".txt",
Language.GROOVY: ".groovy",
Language.KOTLIN: ".kt",
Language.MAKEFILE: "Makefile", # Special case
Language.MATLAB: ".m",
Language.VUE: ".vue",
Language.SVELTE: ".svelte",
}
created_files = []
search_patterns = []
# Create files for all testable languages
for language in languages_to_test:
if language in content_templates and language in extension_map:
ext = extension_map[language]
if ext == "Makefile":
filename = f"Makefile.qa_{language.value}"
else:
filename = f"qa_test_{language.value}{ext}"
file_path = watch_dir / filename
content = content_templates[language]
unique_pattern = f"{language.value}_qa_unique"
file_path.write_text(content)
created_files.append((file_path, language, unique_pattern))
search_patterns.append(unique_pattern)
print(f"Created {language.value} test file: {filename}")
# Wait for all files to be processed - poll until all files are in database
expected_file_count = len(created_files)
max_wait = 60.0 # Maximum 60 seconds (allow time for embeddings)
poll_interval = 0.5
elapsed = 0.0
while elapsed < max_wait:
db_stats = await execute_tool("get_stats", services, None, {})
indexed_files = db_stats.get('total_files', 0)
if indexed_files >= expected_file_count:
print(f"š All {expected_file_count} files processed in {elapsed:.1f}s")
break
await asyncio.sleep(poll_interval)
elapsed += poll_interval
# Final stats check
db_stats = await execute_tool("get_stats", services, None, {})
print(f"š Final: {db_stats.get('total_files', 0)} files, {db_stats.get('total_chunks', 0)} chunks")
# QA Item 5: Test concurrent processing for all languages
# Search for each language's unique content
successful_languages = []
failed_languages = []
for file_path, language, pattern in created_files:
try:
# Test regex search
regex_results = await execute_tool("search_regex", services, None, {
"pattern": pattern,
"page_size": 10,
"offset": 0
})
if len(regex_results.get('results', [])) > 0:
successful_languages.append(language.value)
else:
failed_languages.append(f"{language.value} (regex not found)")
except Exception as e:
failed_languages.append(f"{language.value} (error: {e})")
print(f"ā Languages successfully tested: {len(successful_languages)}")
print(f"ā Successful languages: {successful_languages}")
if failed_languages:
print(f"ā Failed languages: {failed_languages}")
# QA requirement: At least major languages should work
major_languages = ['python', 'javascript', 'typescript', 'java', 'go']
working_major = [lang for lang in successful_languages if lang in major_languages]
assert len(working_major) >= 3, f"At least 3 major languages should work, got: {working_major}"
# Realistic expectation - at least some languages should work
# This test reveals which languages actually work in the current system
assert len(successful_languages) >= 3, f"At least 3 languages should work, got {len(successful_languages)}: {successful_languages}"
# Report findings for manual review
success_rate = len(successful_languages) / len(created_files) if created_files else 0
print(f"š Language success rate: {success_rate:.1%} ({len(successful_languages)}/{len(created_files)})")
if success_rate < 0.5:
print("ā LOW SUCCESS RATE: This may indicate indexing or parsing issues with some languages")
@pytest.mark.asyncio
async def test_concurrent_operations_and_timing(self, qa_setup):
"""QA Item 7: Test concurrent file operations with search timing."""
services, realtime_service, watch_dir, _ = qa_setup
# Create initial test files
base_files = []
for i in range(3):
file_path = watch_dir / f"concurrent_test_{i}.py"
content = f"""def concurrent_function_{i}():
'''Concurrent test function {i}'''
return "concurrent_qa_test_{i}"
"""
file_path.write_text(content)
base_files.append((file_path, f"concurrent_qa_test_{i}"))
await asyncio.sleep(3.0)
# Function to perform searches during file modifications
async def search_during_modifications():
search_results = []
for i in range(10): # Multiple searches during modifications
try:
start_time = time.time()
results = await execute_tool("search_regex", services, None, {
"pattern": "concurrent_qa_test",
"page_size": 50,
"offset": 0
})
end_time = time.time()
search_time = end_time - start_time
search_results.append({
'iteration': i,
'result_count': len(results.get('results', [])),
'search_time': search_time,
'timestamp': end_time
})
# Small delay between searches
await asyncio.sleep(0.2)
except Exception as e:
search_results.append({
'iteration': i,
'error': str(e),
'timestamp': time.time()
})
return search_results
# Function to perform rapid file modifications
async def rapid_file_modifications():
modifications = []
for i in range(5):
try:
# Create new file
new_file = watch_dir / f"rapid_modify_{i}.py"
content = f"""def rapid_function_{i}():
'''Rapid modification test {i}'''
return "rapid_qa_test_{i}"
class RapidClass_{i}:
def method_{i}(self):
return "rapid_method_{i}"
"""
start_time = time.time()
new_file.write_text(content)
modifications.append({
'type': 'create',
'file': str(new_file),
'timestamp': start_time
})
# Modify existing file
if i < len(base_files):
existing_file, _ = base_files[i]
modified_content = content + f"\n# Modified at iteration {i}\n"
existing_file.write_text(modified_content)
modifications.append({
'type': 'modify',
'file': str(existing_file),
'timestamp': time.time()
})
# Small delay between operations
await asyncio.sleep(0.3)
except Exception as e:
modifications.append({
'type': 'error',
'error': str(e),
'timestamp': time.time()
})
return modifications
# Run searches and modifications concurrently
print("Starting concurrent operations...")
start_concurrent = time.time()
search_task = asyncio.create_task(search_during_modifications())
modify_task = asyncio.create_task(rapid_file_modifications())
search_results, modification_results = await asyncio.gather(search_task, modify_task)
end_concurrent = time.time()
total_concurrent_time = end_concurrent - start_concurrent
# Validate concurrent operation results
successful_searches = [r for r in search_results if 'error' not in r]
failed_searches = [r for r in search_results if 'error' in r]
successful_modifications = [r for r in modification_results if 'error' not in r]
print(f"ā Concurrent operations completed in {total_concurrent_time:.2f}s")
print(f"ā Successful searches: {len(successful_searches)}/{len(search_results)}")
print(f"ā Successful modifications: {len(successful_modifications)}/{len(modification_results)}")
# Key assertions for QA item 7
assert len(successful_searches) > len(search_results) * 0.8, "Most searches should succeed during concurrent operations"
assert len(failed_searches) == 0 or len(failed_searches) < 3, "Should have minimal search failures"
# Measure average search time
search_times = [r['search_time'] for r in successful_searches]
if search_times:
avg_search_time = sum(search_times) / len(search_times)
max_search_time = max(search_times)
print(f"ā Search timing: avg={avg_search_time:.3f}s, max={max_search_time:.3f}s")
# Search should not block - reasonable performance expected
assert avg_search_time < 2.0, f"Average search time should be < 2s, got {avg_search_time:.3f}s"
assert max_search_time < 5.0, f"Max search time should be < 5s, got {max_search_time:.3f}s"
@pytest.mark.asyncio
async def test_pagination_comprehensive(self, qa_setup):
"""QA Item 8: Test pagination functionality comprehensively.
Tests ChunkHound's chunk-based search pagination against ripgrep's line-based search.
Note: ChunkHound searches semantic chunks, so a chunk containing multiple pattern
occurrences counts as 1 result, while ripgrep counts each line occurrence separately.
This explains the expected discrepancy between result counts.
"""
services, realtime_service, watch_dir, _ = qa_setup
# Create files with varying amounts of searchable content
# 1. Search for non-existing value (should return empty)
non_existing_results = await execute_tool("search_regex", services, None, {
"pattern": "non_existing_unique_pattern_qa_test_12345",
"page_size": 10,
"offset": 0
})
assert len(non_existing_results.get('results', [])) == 0, "Non-existing pattern should return empty results"
print("ā Pagination test 1: Non-existing pattern returns empty")
# 2. Create single file with unique content (no pagination needed)
single_file = watch_dir / "single_result_test.py"
single_content = """def single_unique_function():
'''This is a unique function that should appear only once'''
return "single_unique_result_qa_test"
"""
single_file.write_text(single_content)
await asyncio.sleep(3.0)
single_results = await execute_tool("search_regex", services, None, {
"pattern": "single_unique_result_qa_test",
"page_size": 10,
"offset": 0
})
assert len(single_results.get('results', [])) == 1, "Single unique pattern should return exactly 1 result"
print("ā Pagination test 2: Single result handled correctly")
# 3. Create many files with common pattern to test pagination
# Each file must be large enough to avoid cAST merging (>1600 chars each)
# or have diverse enough content to create multiple chunks
common_pattern = "pagination_test_common_pattern"
created_files_for_pagination = []
# Create each file individually to avoid f-string complexity
for i in range(15): # Create substantial files to ensure multiple chunks
file_path = watch_dir / f"pagination_test_{i:03d}.py"
# Build content using string formatting to avoid f-string nesting issues
content_template = '''#!/usr/bin/env python3
"""
Pagination Test Module {file_num}
==========================
This module contains test functions and classes for pagination testing.
It includes multiple components designed to create substantial content
that will result in multiple chunks during parsing.
Module: pagination_test_{file_num_padded}.py
Pattern: {pattern}
Purpose: Generate enough content to exceed cAST merge thresholds
"""
import os
import sys
import json
import datetime
from typing import List, Dict, Optional, Union, Any
# Global constants for pagination testing
PAGINATION_CONSTANT_{file_num} = "{pattern}_constant_{file_num}"
PAGINATION_VERSION_{file_num} = "1.{file_num}.0"
PAGINATION_METADATA_{file_num} = {{
"test_id": {file_num},
"pattern": "{pattern}",
"timestamp": "2024-01-01T00:00:00Z",
"description": "Pagination test file number {file_num}"
}}
class PaginationDataProcessor_{file_num}:
"""
Data processing class for pagination test {file_num}.
This class handles various data processing operations for pagination
testing including data validation, transformation, and storage.
Each instance manages its own state and provides methods for
comprehensive data manipulation.
"""
def __init__(self, test_id: int = {file_num}):
"""Initialize the pagination data processor.
Args:
test_id: Unique identifier for this test instance
"""
self.test_id = test_id
self.pattern = "{pattern}"
self.data_store = []
self.processed_count = 0
def process_pagination_data(self, data: List[Dict]) -> Dict[str, Any]:
"""Process pagination data and return results.
This method takes input data, processes it according to pagination
test requirements, and returns structured results with metadata.
Args:
data: List of data dictionaries to process
Returns:
Dictionary containing processed results and metadata
"""
results = {{
"test_id": self.test_id,
"pattern": self.pattern,
"input_count": len(data),
"processed_items": [],
"timestamp": datetime.datetime.now().isoformat()
}}
for idx, item in enumerate(data):
processed_item = {{
"original_index": idx,
"data": item,
"processed_by": "PaginationDataProcessor_{file_num}",
"pattern_match": "{pattern}_result_" + str(idx),
"test_metadata": PAGINATION_METADATA_{file_num}
}}
results["processed_items"].append(processed_item)
self.processed_count += 1
return results
def validate_pagination_results(self, results: Dict) -> bool:
"""Validate pagination processing results.
Performs comprehensive validation of pagination processing results
to ensure data integrity and correct processing behavior.
Args:
results: Results dictionary from process_pagination_data
Returns:
True if validation passes, False otherwise
"""
required_keys = ["test_id", "pattern", "input_count", "processed_items"]
for key in required_keys:
if key not in results:
return False
if results["test_id"] != self.test_id:
return False
if results["pattern"] != "{pattern}":
return False
return len(results["processed_items"]) == results["input_count"]
class PaginationTestManager_{file_num}:
"""
Manager class for coordinating pagination tests.
This class provides high-level coordination for pagination testing,
managing multiple data processors and aggregating results across
different test scenarios.
"""
def __init__(self):
self.processors = []
self.test_results = []
self.global_pattern = "{pattern}"
def add_processor(self, processor: PaginationDataProcessor_{file_num}) -> None:
"""Add a data processor to the test manager."""
self.processors.append(processor)
def run_pagination_tests(self) -> Dict[str, Any]:
"""Execute pagination tests across all registered processors."""
test_summary = {{
"total_processors": len(self.processors),
"pattern": self.global_pattern,
"test_file": "pagination_test_{file_num_padded}.py",
"individual_results": []
}}
for processor in self.processors:
test_data = [
{{"id": j, "value": "{pattern}_data_" + str(j) + "_processor_{file_num}"}}
for j in range(5)
]
results = processor.process_pagination_data(test_data)
validation_passed = processor.validate_pagination_results(results)
test_summary["individual_results"].append({{
"processor_id": processor.test_id,
"validation_passed": validation_passed,
"processed_count": len(results.get("processed_items", [])),
"pattern_matches": [
item.get("pattern_match", "")
for item in results.get("processed_items", [])
]
}})
return test_summary
def pagination_function_{file_num}():
"""
Main pagination test function for test case {file_num}.
This function demonstrates pagination functionality by creating
test data, processing it through pagination components, and
returning results that can be searched and validated.
Returns:
String containing pattern for search validation
"""
processor = PaginationDataProcessor_{file_num}()
manager = PaginationTestManager_{file_num}()
manager.add_processor(processor)
test_results = manager.run_pagination_tests()
# Return searchable pattern for test validation
return "{pattern}_result_{file_num}_function"
def pagination_utility_{file_num}(input_data: Optional[List] = None) -> str:
"""
Utility function for pagination testing.
Provides utility functionality for pagination tests including
data preparation, result formatting, and pattern generation.
"""
if input_data is None:
input_data = ["default_data_" + str(j) for j in range(3)]
processed = ["{pattern}_utility_" + str(item) + "_{file_num}" for item in input_data]
return "{pattern}_utility_result_{file_num}"
# Module-level execution for pagination testing
if __name__ == "__main__":
print("Executing pagination test module {file_num}")
result = pagination_function_{file_num}()
utility_result = pagination_utility_{file_num}()
print("Pattern: {pattern}")
print("Function result: " + str(result))
print("Utility result: " + str(utility_result))
'''
# Format the content with actual values
content = content_template.format(
file_num=i,
file_num_padded=f"{i:03d}",
pattern=common_pattern
)
file_path.write_text(content)
created_files_for_pagination.append(file_path)
# Wait for all files to be processed with verification
# Poll until we get a stable chunk count
stable_count = None
for _ in range(10): # Try for up to 20 seconds
await asyncio.sleep(2.0)
stats = await services.indexing_coordinator.get_stats()
current_chunks = stats.get('chunks', 0)
if stable_count == current_chunks and current_chunks >= 15: # At least 15 chunks expected
break
stable_count = current_chunks
else:
# Fallback - just wait a bit more
await asyncio.sleep(3.0)
# Test pagination by fetching all pages
all_results = []
page_size = 10
offset = 0
max_pages = 10 # Safety limit
page_count = 0
total_count = 0 # Track actual total from pagination metadata
while page_count < max_pages:
page_results = await execute_tool("search_regex", services, None, {
"pattern": common_pattern,
"page_size": page_size,
"offset": offset
})
page_data = page_results.get('results', [])
if not page_data:
break # No more results
all_results.extend(page_data)
page_count += 1
offset += page_size
print(f"Page {page_count}: {len(page_data)} results (offset={offset-page_size})")
# Check pagination metadata if available
if 'pagination' in page_results:
pagination = page_results['pagination']
total_count = pagination.get('total', len(all_results)) # Track actual total
print(f" Pagination metadata: {pagination}")
print(f"ā Pagination test 3: Retrieved {len(all_results)} total results across {page_count} pages")
# Validate pagination worked correctly
# Note: May not find all files if some aren't processed yet - test pagination behavior with available data
assert len(all_results) >= 10, f"Should find reasonable number of results for pagination testing, got {len(all_results)}"
assert page_count >= 2, f"Should require multiple pages with page_size={page_size}, used {page_count} pages"
# Report actual vs expected for manual review
expected_files = 15 # Updated to match new file count
# Note: Due to cAST algorithm's semantic chunking, files may be merged into fewer chunks
# than expected based on size. This is by design for better semantic coherence.
if len(all_results) < expected_files * 2: # Each substantial file ideally creates multiple chunks
processing_rate = len(all_results) / (expected_files * 2)
print(f"š Chunk processing rate: {processing_rate:.1%} ({len(all_results)}/{expected_files * 2} expected chunks)")
# 4. Compare with external validation using ripgrep if available
try:
# Try to use ripgrep for external validation
rg_result = subprocess.run([
'rg', '--count', '--no-heading', common_pattern, str(watch_dir)
], capture_output=True, text=True, timeout=10)
if rg_result.returncode == 0:
# Parse ripgrep results - count matches across files
rg_lines = rg_result.stdout.strip().split('\n') if rg_result.stdout.strip() else []
rg_total_matches = 0
for line in rg_lines:
if ':' in line:
try:
count = int(line.split(':')[-1])
rg_total_matches += count
except ValueError:
pass
print(f"ā External validation: ripgrep found {rg_total_matches} matches")
# Allow some variance due to different matching behavior
# ChunkHound uses chunk-based search (semantic units) vs ripgrep's line-based search
# A chunk containing multiple pattern occurrences counts as 1 result in ChunkHound
# but each line occurrence counts as 1 result in ripgrep, hence the large discrepancy
match_ratio = len(all_results) / max(rg_total_matches, 1)
assert 0.05 <= match_ratio <= 3.0, f"ChunkHound uses chunk-based search (semantic units) vs ripgrep's line-based search: {len(all_results)} chunks vs {rg_total_matches} line matches"
else:
print("ā ripgrep not available or failed, skipping external validation")
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
print("ā ripgrep not available, skipping external validation")
# 5. Test edge cases
# Test offset beyond available results
# Use total_count from pagination metadata, not len(all_results) which may be partial
actual_total = total_count if total_count > 0 else len(all_results)
beyond_results = await execute_tool("search_regex", services, None, {
"pattern": common_pattern,
"page_size": 10,
"offset": actual_total + 100 # Truly beyond all results
})
assert len(beyond_results.get('results', [])) == 0, f"Offset {actual_total + 100} beyond total {actual_total} should return empty"
# Test large page size
large_page_results = await execute_tool("search_regex", services, None, {
"pattern": common_pattern,
"page_size": 100, # Larger than total results
"offset": 0
})
large_page_count = len(large_page_results.get('results', []))
assert large_page_count <= actual_total, f"Large page size should not exceed total ({large_page_count} <= {actual_total})"
print("ā Pagination edge cases handled correctly")
@pytest.mark.asyncio
async def test_qa_comprehensive_report(self, qa_setup):
"""Generate comprehensive QA report with timing measurements."""
services, realtime_service, watch_dir, _ = qa_setup
print("\n" + "="*60)
print("COMPREHENSIVE QA VALIDATION REPORT")
print("="*60)
# Test file change reflection timing
timing_test_file = watch_dir / "timing_validation.py"
timing_content = f"""def timing_validation_function():
'''Timing test at {time.time()}'''
return "timing_validation_unique_content"
"""
# Measure indexing time
start_write = time.time()
timing_test_file.write_text(timing_content)
# Poll until content is searchable
max_wait = 10.0 # Maximum wait time
poll_interval = 0.5
elapsed = 0.0
while elapsed < max_wait:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
search_results = await execute_tool("search_regex", services, None, {
"pattern": "timing_validation_unique_content",
"page_size": 10,
"offset": 0
})
if len(search_results.get('results', [])) > 0:
indexing_time = elapsed
break
else:
indexing_time = max_wait # Timeout
# Test search performance
search_start = time.time()
performance_results = await execute_tool("search_regex", services, None, {
"pattern": "function",
"page_size": 50,
"offset": 0
})
search_time = time.time() - search_start
# Get database stats
stats_results = await execute_tool("get_stats", services, None, {})
print(f"š DATABASE STATISTICS:")
print(f" Total files: {stats_results.get('total_files', 'Unknown')}")
print(f" Total chunks: {stats_results.get('total_chunks', 'Unknown')}")
print(f" Total embeddings: {stats_results.get('total_embeddings', 'Unknown')}")
print(f"\nā± PERFORMANCE MEASUREMENTS:")
print(f" File change ā searchable: {indexing_time:.2f}s")
print(f" Search execution time: {search_time:.3f}s")
print(f" Search results returned: {len(performance_results.get('results', []))}")
print(f"\nā
QA VALIDATION SUMMARY:")
print(f" File lifecycle operations: TESTED")
print(f" Language coverage: TESTED")
print(f" Concurrent operations: TESTED")
print(f" Pagination functionality: TESTED")
print(f" Performance measurements: COMPLETED")
print(f"\nš QA REQUIREMENTS STATUS:")
print(f" Real-time indexing: {'ā
WORKING' if indexing_time < 10 else 'ā SLOW'}")
print(f" Search performance: {'ā
GOOD' if search_time < 1.0 else 'ā ACCEPTABLE' if search_time < 3.0 else 'ā SLOW'}")
print(f" Non-blocking searches: ā
VERIFIED")
print("="*60)
# Final assertions for QA requirements
assert indexing_time < 10.0, f"File changes should be reflected within 10s, took {indexing_time:.2f}s"
assert search_time < 5.0, f"Search should complete within 5s, took {search_time:.3f}s"