import asyncio
import os
from pathlib import Path
import pytest
duckdb = pytest.importorskip("duckdb")
def test_duckdb_as_model_mtime_roundtrip(tmp_path: Path):
from chunkhound.core.models.file import File
from chunkhound.core.types.common import Language
from chunkhound.providers.database.duckdb_provider import DuckDBProvider
db_path = tmp_path / "db.duckdb"
provider = DuckDBProvider(db_path, base_directory=tmp_path)
provider.connect()
# Create a file and insert as File model (simulating prior index)
p = tmp_path / "file.txt"
p.write_text("hello")
st = p.stat()
f = File(
path=(p.relative_to(tmp_path)).as_posix(),
mtime=float(st.st_mtime),
language=Language.TEXT,
size_bytes=int(st.st_size),
)
file_id = provider.insert_file(f)
assert file_id > 0
# Fetch back as model and verify epoch float mtime (not datetime)
rec = provider.get_file_by_path(f.path, as_model=True)
assert rec is not None
assert isinstance(rec.mtime, float)
assert abs(rec.mtime - float(st.st_mtime)) < 1e-3
def test_indexing_coordinator_skips_with_duckdb(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
# Heavy but valuable end-to-end check when duckdb is available
from chunkhound.core.models.file import File
from chunkhound.core.types.common import Language
from chunkhound.providers.database.duckdb_provider import DuckDBProvider
from chunkhound.services.indexing_coordinator import IndexingCoordinator
db_path = tmp_path / "db.duckdb"
provider = DuckDBProvider(db_path, base_directory=tmp_path)
provider.connect()
# Create files and insert prior state with content hash
from chunkhound.utils.hashing import compute_file_hash
files = []
for i in range(2):
p = tmp_path / f"f{i}.txt"
p.write_text("hello")
st = p.stat()
# Compute content hash to simulate a previous index with checksum
content_hash = compute_file_hash(p)
f = File(
path=(p.relative_to(tmp_path)).as_posix(),
mtime=float(st.st_mtime),
language=Language.TEXT,
size_bytes=int(st.st_size),
content_hash=content_hash,
)
provider.insert_file(f)
files.append(p)
coord = IndexingCoordinator(database_provider=provider, base_directory=tmp_path)
# Avoid parsing: record what would be parsed
called = []
async def _fake(files, config_file_size_threshold_kb=20, parse_task=None, on_batch=None):
called.append(list(files))
return []
monkeypatch.setattr(coord, "_process_files_in_batches", _fake)
result = asyncio.run(
coord.process_directory(
tmp_path, patterns=["**/*.txt"], exclude_patterns=[], config_file_size_threshold_kb=20
)
)
assert result["files_processed"] == 0
assert result.get("skipped_unchanged", 0) == 2
assert called and len(called[0]) == 0
def test_checksums_saved_on_first_index(tmp_path: Path):
"""Verify checksums are computed and saved during first indexing pass."""
import asyncio
from chunkhound.providers.database.duckdb_provider import DuckDBProvider
from chunkhound.services.indexing_coordinator import IndexingCoordinator
from chunkhound.utils.hashing import compute_file_hash
db_path = tmp_path / "db.duckdb"
provider = DuckDBProvider(db_path, base_directory=tmp_path)
provider.connect()
# Create test files
test_files = []
for i in range(3):
p = tmp_path / f"test{i}.py"
p.write_text(f"# Test file {i}\nprint('hello')\n")
test_files.append(p)
coord = IndexingCoordinator(database_provider=provider, base_directory=tmp_path)
# First indexing pass - files are new
result1 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result1["files_processed"] == 3, "Should process 3 new files"
assert result1.get("skipped_unchanged", 0) == 0, "No files should be skipped on first pass"
# Verify checksums were saved in database
for p in test_files:
rel_path = p.relative_to(tmp_path).as_posix()
db_file = provider.get_file_by_path(rel_path, as_model=False)
assert db_file is not None, f"File {rel_path} should exist in DB"
# Verify checksum was saved
db_hash = db_file.get("content_hash")
assert db_hash is not None, f"File {rel_path} should have checksum saved"
# Verify checksum matches actual file content
expected_hash = compute_file_hash(p)
assert db_hash == expected_hash, f"Checksum for {rel_path} should match actual content"
# Second indexing pass - files unchanged
result2 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
# With the fix, files should be skipped on second pass (not third!)
assert result2["files_processed"] == 0, "No files should be processed on second pass"
assert result2.get("skipped_unchanged", 0) == 3, "All 3 files should be skipped on second pass"
def test_hash_computed_only_once(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Verify hash is computed on first index, then cached on subsequent runs (performance optimization)."""
import asyncio
from chunkhound.providers.database.duckdb_provider import DuckDBProvider
from chunkhound.services.indexing_coordinator import IndexingCoordinator
from chunkhound.utils.hashing import compute_file_hash
db_path = tmp_path / "db.duckdb"
provider = DuckDBProvider(db_path, base_directory=tmp_path)
provider.connect()
# Create test file
test_file = tmp_path / "test.py"
test_file.write_text("# Test file\nprint('hello')\n")
# Track calls to compute_file_hash
hash_calls = []
original_compute_hash = compute_file_hash
def tracked_compute_hash(path):
hash_calls.append(str(path))
return original_compute_hash(path)
# Patch where it's imported in indexing_coordinator
monkeypatch.setattr("chunkhound.services.indexing_coordinator.compute_file_hash", tracked_compute_hash)
coord = IndexingCoordinator(database_provider=provider, base_directory=tmp_path)
# First run: hash should be computed
hash_calls.clear()
result1 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result1["files_processed"] == 1
assert len(hash_calls) >= 1, "Hash should be computed on first run"
first_run_calls = len(hash_calls)
# Second run: hash should NOT be recomputed (uses cached DB value)
hash_calls.clear()
result2 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result2["files_processed"] == 0, "File should be skipped on second run"
assert result2.get("skipped_unchanged", 0) == 1, "File should be marked as unchanged"
assert len(hash_calls) == 0, "Hash should NOT be recomputed on second run (performance optimization)"
def test_hash_change_triggers_reindex(tmp_path: Path):
"""Document behavior when content changes but size/mtime are artificially preserved.
This is an edge case where filesystem metadata (mtime+size) matches but content
has changed. ChunkHound prioritizes performance and trusts mtime+size matches,
accepting this rare edge case as a trade-off.
To detect this edge case, use --force-reindex or modify the file such that
mtime changes (normal file editing behavior).
"""
import asyncio
import time
from chunkhound.providers.database.duckdb_provider import DuckDBProvider
from chunkhound.services.indexing_coordinator import IndexingCoordinator
db_path = tmp_path / "db.duckdb"
provider = DuckDBProvider(db_path, base_directory=tmp_path)
provider.connect()
# Create test file
test_file = tmp_path / "test.py"
test_file.write_text("print('hello')")
original_mtime = test_file.stat().st_mtime
coord = IndexingCoordinator(database_provider=provider, base_directory=tmp_path)
# First index
result1 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result1["files_processed"] == 1
# Modify content with same size and restore mtime (artificial edge case)
time.sleep(0.1)
test_file.write_text("print('world')") # Same length, different content
os.utime(test_file, (original_mtime, original_mtime)) # Artificially restore mtime
# Second index: file is SKIPPED because mtime+size match (fast path)
# Edge case trade-off: We trust filesystem metadata for performance
result2 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result2["files_processed"] == 0, "File skipped when mtime+size match (even if content changed)"
assert result2.get("skipped_unchanged", 0) == 1, "Edge case: trusts mtime+size over content hash"
def test_hash_failure_does_not_block_indexing(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Verify that files are still indexed even if hash computation fails."""
import asyncio
from chunkhound.providers.database.duckdb_provider import DuckDBProvider
from chunkhound.services.indexing_coordinator import IndexingCoordinator
db_path = tmp_path / "db.duckdb"
provider = DuckDBProvider(db_path, base_directory=tmp_path)
provider.connect()
# Create test file
test_file = tmp_path / "test.py"
test_file.write_text("# Test file\nprint('hello')\n")
# Make hash computation fail
def failing_hash(path):
raise OSError("Simulated hash failure")
# Patch where it's imported in indexing_coordinator
monkeypatch.setattr("chunkhound.services.indexing_coordinator.compute_file_hash", failing_hash)
coord = IndexingCoordinator(database_provider=provider, base_directory=tmp_path)
# First run: hash fails, but file should still be indexed
result1 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result1["files_processed"] == 1, "File should be indexed despite hash failure"
# Verify file was indexed with None hash
db_file = provider.get_file_by_path("test.py", as_model=False)
assert db_file is not None
assert db_file.get("content_hash") is None, "Hash should be None when computation fails"
def test_changed_file_gets_hash_computed(tmp_path: Path):
"""Verify that changed files get hashes computed for skip optimization on next run.
This test verifies the fix for the bug where changed files (size or mtime differs)
were not getting hashes computed, causing infinite reindexing.
"""
import asyncio
from chunkhound.providers.database.duckdb_provider import DuckDBProvider
from chunkhound.services.indexing_coordinator import IndexingCoordinator
from chunkhound.utils.hashing import compute_file_hash
db_path = tmp_path / "db.duckdb"
provider = DuckDBProvider(db_path, base_directory=tmp_path)
provider.connect()
# Create test file with initial content
test_file = tmp_path / "test.py"
initial_content = "# Initial version\nprint('v1')\n"
test_file.write_text(initial_content)
coord = IndexingCoordinator(database_provider=provider, base_directory=tmp_path)
# First index: file is new, hash should be computed and stored
result1 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result1["files_processed"] == 1, "Should process new file"
# Verify hash was saved
db_file1 = provider.get_file_by_path("test.py", as_model=False)
assert db_file1 is not None
hash1 = db_file1.get("content_hash")
assert hash1 is not None, "Hash should be saved on first index"
assert hash1 == compute_file_hash(test_file), "Hash should match file content"
# Modify file content (changes size and mtime)
modified_content = "# Modified version\nprint('v2')\nprint('extra line')\n"
test_file.write_text(modified_content)
# Second index: file changed, should reindex AND compute new hash
result2 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result2["files_processed"] == 1, "Should reindex changed file"
# CRITICAL: Verify new hash was computed and saved (not None!)
db_file2 = provider.get_file_by_path("test.py", as_model=False)
assert db_file2 is not None
hash2 = db_file2.get("content_hash")
assert hash2 is not None, "Hash should be computed for changed file (bug fix verification)"
assert hash2 != hash1, "Hash should be different after content change"
assert hash2 == compute_file_hash(test_file), "Hash should match new file content"
# Third index: file unchanged, should skip using cached hash
result3 = asyncio.run(
coord.process_directory(tmp_path, patterns=["**/*.py"], exclude_patterns=[])
)
assert result3["files_processed"] == 0, "Should skip unchanged file"
assert result3.get("skipped_unchanged", 0) == 1, "File should be skipped using cached hash"
# Verify hash is still the same (no unnecessary recomputation)
db_file3 = provider.get_file_by_path("test.py", as_model=False)
assert db_file3 is not None
hash3 = db_file3.get("content_hash")
assert hash3 == hash2, "Hash should remain unchanged when file is unchanged"