"""Integration tests that use actual MCP server components.
These tests verify the real integration path that users experience:
Filesystem Event → Watchdog → AsyncHandler → IndexingCoordinator → Database → Search Tools
"""
import asyncio
import shutil
import tempfile
import time
from pathlib import Path
import pytest
from chunkhound.core.config.config import Config
from chunkhound.database_factory import create_services
from chunkhound.mcp_server.tools import execute_tool
from chunkhound.services.realtime_indexing_service import RealtimeIndexingService
from tests.utils.windows_compat import (
get_fs_event_timeout,
should_use_polling,
wait_for_regex_searchable,
)
from .test_utils import get_api_key_for_tests, get_embedding_config_for_tests, build_embedding_config_from_dict, create_embedding_manager_for_tests
class TestMCPIntegration:
"""Test real MCP server integration with realtime indexing."""
@pytest.fixture
async def mcp_setup(self):
"""Setup MCP server with real services and temp directory."""
# Get embedding config using centralized helper
config_dict = get_embedding_config_for_tests()
embedding_config = build_embedding_config_from_dict(config_dict)
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / ".chunkhound" / "test.db"
watch_dir = temp_dir / "project"
watch_dir.mkdir(parents=True)
# Ensure database directory exists
db_path.parent.mkdir(parents=True, exist_ok=True)
# Use fake args to prevent find_project_root call that fails in CI
from types import SimpleNamespace
fake_args = SimpleNamespace(path=temp_dir)
config = Config(
args=fake_args,
database={"path": str(db_path), "provider": "duckdb"},
embedding=embedding_config,
indexing={"include": ["*.py", "*.js"], "exclude": ["*.log"]}
)
# Create embedding manager if API key is available
# create_services() handles None manager gracefully
embedding_manager = create_embedding_manager_for_tests(config_dict)
# Create services - this is what MCP server uses
services = create_services(db_path, config, embedding_manager)
services.provider.connect()
# Initialize realtime indexing service (what MCP server should do)
# Use polling mode on Windows CI where watchdog is unreliable
force_polling = should_use_polling()
realtime_service = RealtimeIndexingService(services, config, force_polling=force_polling)
await realtime_service.start(watch_dir)
yield services, realtime_service, watch_dir, temp_dir, embedding_manager
# Cleanup
try:
await realtime_service.stop()
except Exception:
pass
try:
services.provider.disconnect()
except Exception:
pass
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.skipif(get_api_key_for_tests()[0] is None, reason="No API key available")
@pytest.mark.asyncio
async def test_mcp_semantic_search_finds_new_files(self, mcp_setup):
"""Test that MCP semantic search finds newly created files."""
services, realtime_service, watch_dir, _, embedding_manager = mcp_setup
# Wait for initial scan
await asyncio.sleep(1.0)
# Get initial search results using MCP tool execution
initial_results = await execute_tool(
tool_name="search",
services=services,
embedding_manager=embedding_manager,
arguments={
"type": "semantic",
"query": "unique_mcp_test_function",
"page_size": 10,
"offset": 0
}
)
initial_count = len(initial_results.get('results', []))
# Create new file with unique content
new_file = watch_dir / "mcp_test.py"
new_file.write_text("""
def unique_mcp_test_function():
'''This is a unique function for MCP integration testing'''
return "mcp_realtime_success"
""")
# Wait for debounce + processing
await asyncio.sleep(2.0)
# Search for new content using MCP tool execution
new_results = await execute_tool(
tool_name="search",
services=services,
embedding_manager=embedding_manager,
arguments={
"type": "semantic",
"query": "unique_mcp_test_function",
"page_size": 10,
"offset": 0
}
)
new_count = len(new_results.get('results', []))
assert new_count > initial_count, \
f"MCP semantic search should find new file (was {initial_count}, now {new_count})"
@pytest.mark.asyncio
async def test_mcp_regex_search_finds_modified_files(self, mcp_setup):
"""Test that MCP regex search finds modified file content."""
services, realtime_service, watch_dir, _, _ = mcp_setup
# Create initial file
test_file = watch_dir / "modify_test.py"
test_file.write_text("def initial_function(): pass")
# Wait for initial content to be searchable (handles Windows CI polling delay)
found = await wait_for_regex_searchable(services, "initial_function", timeout=get_fs_event_timeout())
assert found, "Initial content should be found"
# Modify file with new unique content
test_file.write_text("""
def initial_function(): pass
def modified_unique_regex_pattern():
'''Added by modification - should be found by regex'''
return "modification_success"
""")
# Wait for modified content to be searchable (handles Windows CI polling delay)
found = await wait_for_regex_searchable(services, "modified_unique_regex_pattern", timeout=get_fs_event_timeout())
assert found, "MCP regex search should find modified content"
@pytest.mark.asyncio
async def test_mcp_database_stats_change_with_realtime(self, mcp_setup):
"""Test that database stats reflect real-time indexing changes."""
services, realtime_service, watch_dir, _, _ = mcp_setup
# Wait for initial scan
await asyncio.sleep(1.0)
# Get initial stats directly from database provider
initial_stats = services.provider.get_stats()
initial_files = initial_stats.get('files', 0)
initial_chunks = initial_stats.get('chunks', 0)
# Create multiple new files
for i in range(3):
new_file = watch_dir / f"stats_test_{i}.py"
new_file.write_text(f"""
def stats_test_function_{i}():
'''File {i} for testing database stats updates'''
return "stats_test_{i}"
class StatsTestClass_{i}:
def method_{i}(self):
pass
""")
# Wait for files to be processed with polling
timeout = get_fs_event_timeout() * 1.5 # Extra margin for multiple files
deadline = time.monotonic() + timeout
updated_stats = None
while time.monotonic() < deadline:
updated_stats = services.provider.get_stats()
if updated_stats.get('files', 0) > initial_files:
break
await asyncio.sleep(0.3)
updated_files = updated_stats.get('files', 0) if updated_stats else 0
updated_chunks = updated_stats.get('chunks', 0) if updated_stats else 0
assert updated_files > initial_files, \
f"File count should increase (was {initial_files}, now {updated_files})"
assert updated_chunks > initial_chunks, \
f"Chunk count should increase (was {initial_chunks}, now {updated_chunks})"
@pytest.mark.asyncio
async def test_mcp_search_after_file_deletion(self, mcp_setup):
"""Test that MCP search handles file deletions correctly."""
services, realtime_service, watch_dir, _, _ = mcp_setup
# Create file with unique content
delete_file = watch_dir / "delete_test.py"
delete_file.write_text("""
def delete_test_unique_function():
'''This function will be deleted'''
return "to_be_deleted"
""")
# Wait for processing
await asyncio.sleep(2.0)
# Verify content is searchable
before_delete = await execute_tool(
tool_name="search",
services=services,
embedding_manager=None,
arguments={
"type": "regex",
"query": "delete_test_unique_function",
"page_size": 10,
"offset": 0
}
)
assert len(before_delete.get('results', [])) > 0, "Content should be found before deletion"
# Delete the file
delete_file.unlink()
# Wait for deletion processing
await asyncio.sleep(2.0)
# Verify content is no longer searchable
after_delete = await execute_tool(
tool_name="search",
services=services,
embedding_manager=None,
arguments={
"type": "regex",
"query": "delete_test_unique_function",
"page_size": 10,
"offset": 0
}
)
assert len(after_delete.get('results', [])) == 0, "Content should not be found after deletion"
@pytest.mark.asyncio
async def test_file_modification_detection_comprehensive(self, mcp_setup):
"""Comprehensive test to reproduce file modification detection issues."""
services, realtime_service, watch_dir, _, _ = mcp_setup
# Create initial file
test_file = watch_dir / "comprehensive_modify_test.py"
initial_content = """def original_function():
return "version_1"
"""
test_file.write_text(initial_content)
# Wait for initial content to be searchable (handles Windows CI polling delay)
found = await wait_for_regex_searchable(services, "original_function", timeout=get_fs_event_timeout())
assert found, "Initial content should be indexed"
# Verify initial content is indexed (use multiline-compatible regex)
initial_results = services.provider.search_chunks_regex("original_function")
assert len(initial_results) > 0, "Initial content should be indexed"
# Get initial file record
initial_record = services.provider.get_file_by_path(str(test_file.resolve()))
assert initial_record is not None, "Initial file should exist"
# Get chunk count for initial state
initial_chunks = services.provider.search_chunks_regex(".*", file_path=str(test_file.resolve()))
initial_chunk_count = len(initial_chunks)
print(f"Initial state: chunks={initial_chunk_count}")
# Modify the file - change existing and add new content
modified_content = """def original_function():
return "version_2" # CHANGED
def newly_added_function():
'''This function was added during modification'''
return "modification_detected"
class NewlyAddedClass:
'''This class was added to test modification detection'''
def new_method(self):
return "class_method_added"
"""
test_file.write_text(modified_content)
# Touch file to ensure modification time changes
import time
time.sleep(0.1)
test_file.touch()
# Wait for new content to be searchable (handles Windows CI polling delay)
found = await wait_for_regex_searchable(services, "newly_added_function", timeout=get_fs_event_timeout())
assert found, "Modified content should be searchable"
# Check if modification was detected
modified_record = services.provider.get_file_by_path(str(test_file.resolve()))
assert modified_record is not None, "Modified file should still exist"
# Get chunk count for modified state
modified_chunks = services.provider.search_chunks_regex(".*", file_path=str(test_file.resolve()))
modified_chunk_count = len(modified_chunks)
print(f"Modified state: chunks={modified_chunk_count}")
# Key assertions for content-based change detection
assert modified_chunk_count >= initial_chunk_count, \
f"Chunk count should not decrease (was {initial_chunk_count}, now {modified_chunk_count})"
# Check if new content is searchable
new_func_results = services.provider.search_chunks_regex("newly_added_function")
assert len(new_func_results) > 0, "New function should be searchable after modification"
new_class_results = services.provider.search_chunks_regex("NewlyAddedClass")
assert len(new_class_results) > 0, "New class should be indexed after modification"
# Check that content-based deduplication works - old version replaced by new
v1_results = services.provider.search_chunks_regex("version_1")
v2_results = services.provider.search_chunks_regex("version_2")
assert len(v1_results) == 0, "Old version_1 should be replaced via content-based chunk deduplication"
assert len(v2_results) > 0, "New version_2 should be indexed"
@pytest.mark.asyncio
async def test_file_modification_with_filesystem_ops(self, mcp_setup):
"""Test modification using different filesystem operations to ensure OS detection."""
services, realtime_service, watch_dir, _, _ = mcp_setup
import os
test_file = watch_dir / "fs_ops_test.py"
# Create with explicit file operations
with open(test_file, 'w') as f:
f.write("def func(): return 'initial'")
f.flush()
os.fsync(f.fileno())
# Wait for initial content to be searchable (handles Windows CI polling delay)
found = await wait_for_regex_searchable(services, "func.*initial", timeout=get_fs_event_timeout())
assert found, "Initial content should be indexed"
# Modify with explicit operations and different content
with open(test_file, 'w') as f:
f.write("def func(): return 'modified'\ndef new_func(): return 'added'")
f.flush()
os.fsync(f.fileno())
# Also change mtime explicitly
import time
current_time = time.time()
os.utime(test_file, (current_time, current_time))
# Wait for modified content to be searchable (handles Windows CI polling delay)
found = await wait_for_regex_searchable(services, "new_func.*added", timeout=get_fs_event_timeout())
assert found, "Added content should be indexed"
# Verify modification was detected
modified_results = services.provider.search_chunks_regex("func.*modified")
assert len(modified_results) > 0, "Modified content should be indexed"
# Original should be gone
old_results = services.provider.search_chunks_regex("func.*initial")
assert len(old_results) == 0, "Original content should be replaced"