import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from pathlib import Path
import asyncio
import time
from typing import Any, cast
from code_flow.core.models import FunctionElement
from code_flow.mcp_server.analyzer import MCPAnalyzer
@pytest.fixture
def mock_core_components():
"""Mock core components to avoid actual initialization."""
with patch('code_flow.mcp_server.analyzer.TreeSitterPythonExtractor') as mock_extractor, \
patch('code_flow.mcp_server.analyzer.CallGraphBuilder') as mock_builder, \
patch('code_flow.mcp_server.analyzer.CodeVectorStore') as mock_store:
yield mock_extractor, mock_builder, mock_store
@pytest.fixture
def mock_core_components_rust():
"""Mock core components for Rust analyzer initialization."""
with patch('code_flow.mcp_server.analyzer.TreeSitterRustExtractor') as mock_extractor, \
patch('code_flow.mcp_server.analyzer.CallGraphBuilder') as mock_builder, \
patch('code_flow.mcp_server.analyzer.CodeVectorStore') as mock_store:
yield mock_extractor, mock_builder, mock_store
def test_mcp_analyzer_init(mock_core_components):
"""Test MCPAnalyzer initialization with config dict."""
mock_extractor, mock_builder, mock_store = mock_core_components
config = {
'project_root': '.',
'watch_directories': ['.'],
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'memory_enabled': False
}
analyzer = MCPAnalyzer(config)
assert analyzer.config == config
assert analyzer.extractor == mock_extractor.return_value
assert analyzer.builder == mock_builder.return_value
assert analyzer.vector_store is None
def test_mcp_analyzer_init_rust(mock_core_components_rust):
"""Test MCPAnalyzer initialization for Rust."""
mock_extractor, mock_builder, mock_store = mock_core_components_rust
config = {
'project_root': '.',
'watch_directories': ['.'],
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'language': 'rust',
'memory_enabled': False
}
analyzer = MCPAnalyzer(config)
assert analyzer.config == config
assert analyzer.extractor == mock_extractor.return_value
assert analyzer.builder == mock_builder.return_value
assert analyzer.vector_store is None
def test_mcp_analyzer_init_with_existing_store():
"""Test MCPAnalyzer initialization when ChromaDB path exists."""
config = {
'project_root': '.',
'watch_directories': ['.'],
'chroma_dir': './.codeflow/chroma',
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'embedding_model': 'all-MiniLM-L6-v2',
'memory_enabled': False
}
with patch('code_flow.mcp_server.analyzer.Path.exists', return_value=True):
with patch('code_flow.mcp_server.analyzer.CodeVectorStore') as mock_store:
analyzer = MCPAnalyzer(config)
assert Path(mock_store.call_args.kwargs["persist_directory"]).as_posix().endswith(".codeflow/chroma")
assert mock_store.call_args.kwargs["embedding_model_name"] == "all-MiniLM-L6-v2"
assert mock_store.call_args.kwargs["max_tokens"] == 256
assert analyzer.vector_store is not None
@pytest.mark.asyncio
async def test_analyze(mock_core_components):
"""Test analyze method with mocked core components."""
mock_extractor, mock_builder, mock_store = mock_core_components
config = {
'project_root': '.',
'watch_directories': ['.'],
'chroma_dir': './.codeflow/chroma',
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'memory_enabled': False
}
# Mock elements
mock_elements = [
FunctionElement(
name='test_func',
kind='function',
file_path='test.py',
line_start=1,
line_end=5,
full_source='def test_func(): pass',
parameters=[],
return_type=None,
is_async=False,
docstring=None,
is_method=False,
class_name=None
)
]
analyzer = MCPAnalyzer(config)
analyzer.start_background_cleanup = MagicMock()
# Mock the extractor
analyzer.extractor.extract_from_directory = MagicMock(return_value=mock_elements)
analyzer.structured_extractor.extract_from_directory = MagicMock(return_value=[])
# Mock builder
analyzer.builder.build_from_elements = MagicMock()
analyzer.builder.functions = {'test.test_func': MagicMock()}
# Mock vector store if present
if analyzer.vector_store:
analyzer.vector_store.add_function_nodes_batch = MagicMock()
analyzer.vector_store.add_edges_batch = MagicMock()
with patch('code_flow.mcp_server.analyzer.watchdog.observers.Observer') as mock_observer:
mock_observer.return_value = MagicMock()
await analyzer.analyze()
# Verify calls
analyzer.extractor.extract_from_directory.assert_called_once()
analyzer.builder.build_from_elements.assert_called_once_with(mock_elements)
# If vector store exists, check populate was called
if analyzer.vector_store:
cast(Any, analyzer.vector_store.add_function_nodes_batch).assert_called()
cast(Any, analyzer.vector_store.add_edges_batch).assert_called()
@pytest.mark.asyncio
async def test_analyze_no_vector_store(mock_core_components):
"""Test analyze when vector store is None."""
mock_extractor, mock_builder, mock_store = mock_core_components
config = {
'project_root': '.',
'watch_directories': ['.'],
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'memory_enabled': False
}
analyzer = MCPAnalyzer(config)
analyzer.start_background_cleanup = MagicMock()
assert analyzer.vector_store is None
# Mock extractor and builder
analyzer.extractor.extract_from_directory = MagicMock(return_value=[])
analyzer.structured_extractor.extract_from_directory = MagicMock(return_value=[])
analyzer.builder.build_from_elements = MagicMock()
# Should not raise error
with patch('code_flow.mcp_server.analyzer.watchdog.observers.Observer') as mock_observer:
mock_observer.return_value = MagicMock()
await analyzer.analyze()
analyzer.extractor.extract_from_directory.assert_called_once()
analyzer.builder.build_from_elements.assert_called_once()
@pytest.mark.asyncio
async def test_populate_vector_store(mock_core_components):
"""Test _populate_vector_store method."""
mock_extractor, mock_builder, mock_store = mock_core_components
config = {
'project_root': '.',
'watch_directories': ['.'],
'chroma_dir': './.codeflow/chroma',
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'embedding_model': 'all-MiniLM-L6-v2',
'memory_enabled': False
}
# Mock the store initialization
mock_store_instance = MagicMock()
mock_store.return_value = mock_store_instance
with patch('code_flow.mcp_server.analyzer.Path.exists', return_value=True):
analyzer = MCPAnalyzer(config)
# Mock builder with functions and edges
mock_func_node = MagicMock()
mock_func_node.file_path = 'test.py'
analyzer.builder.functions = {'test.func': mock_func_node}
analyzer.builder.edges = [MagicMock()]
# Mock file reading
with patch('builtins.open', MagicMock()) as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = 'def func(): pass'
mock_open.return_value.__enter__.return_value = mock_file
analyzer._populate_vector_store()
mock_store_instance.add_function_nodes_batch.assert_called_once()
mock_store_instance.add_edges_batch.assert_called_once()
@pytest.mark.asyncio
async def test_watcher_handler_on_modified(mock_core_components):
"""Test WatcherHandler on_modified triggers incremental update."""
mock_extractor, mock_builder, mock_store = mock_core_components
config = {
'project_root': '.',
'watch_directories': ['.'],
'chroma_dir': './.codeflow/chroma',
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'incremental_debounce_seconds': 0.05,
'memory_enabled': False
}
analyzer = MCPAnalyzer(config)
analyzer.loop = asyncio.get_running_loop()
analyzer._is_ignored_path = MagicMock(return_value=False)
# Mock the extractor for incremental update
mock_elements = [
FunctionElement(
name='new_func',
kind='function',
file_path='modified.py',
line_start=1,
line_end=5,
full_source='def new_func(): pass',
parameters=[],
return_type=None,
is_async=False,
docstring=None,
is_method=False,
class_name=None
)
]
# Set fqn for the element
cast(Any, mock_elements[0]).fqn = 'new_func'
analyzer.extractor.extract_from_file = MagicMock(return_value=mock_elements)
analyzer.builder.functions = {} # Empty initially
# Mock vector store
analyzer.vector_store = MagicMock()
analyzer.vector_store.get_all_nodes.return_value = []
# Create handler
from code_flow.mcp_server.analyzer import WatcherHandler
handler = WatcherHandler(analyzer=analyzer)
# Mock event
mock_event = MagicMock()
mock_event.src_path = 'test.py'
mock_event.is_directory = False
# Mock file reading
with patch('builtins.open', MagicMock()) as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = 'def new_func(): pass'
mock_open.return_value.__enter__.return_value = mock_file
# Call on_modified
handler.on_modified(mock_event)
# Wait for the task to complete
await asyncio.sleep(0.2)
# Assert _incremental_update was called (via the task)
analyzer.extractor.extract_from_file.assert_called_once_with(Path('test.py'))
# Check that new function was added
assert 'new_func' in analyzer.builder.functions
# Since vector store is mocked, check add_function_node was called
analyzer.vector_store.add_function_node.assert_called_once()
@pytest.mark.asyncio
async def test_watcher_handler_rapid_duplicate_events_debounced(mock_core_components):
"""Rapid duplicate events for one file should result in one extraction pass."""
mock_extractor, mock_builder, mock_store = mock_core_components
config = {
'project_root': '.',
'watch_directories': ['.'],
'chroma_dir': './.codeflow/chroma',
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'incremental_debounce_seconds': 0.05,
'memory_enabled': False
}
analyzer = MCPAnalyzer(config)
analyzer.loop = asyncio.get_running_loop()
analyzer._is_ignored_path = MagicMock(return_value=False)
mock_elements = [
FunctionElement(
name='new_func',
kind='function',
file_path='test.py',
line_start=1,
line_end=5,
full_source='def new_func(): pass',
parameters=[],
return_type=None,
is_async=False,
docstring=None,
is_method=False,
class_name=None
)
]
cast(Any, mock_elements[0]).fqn = 'new_func'
analyzer.extractor.extract_from_file = MagicMock(return_value=mock_elements)
analyzer.builder.functions = {}
analyzer.vector_store = MagicMock()
analyzer.vector_store.add_function_node = MagicMock()
from code_flow.mcp_server.analyzer import WatcherHandler
handler = WatcherHandler(analyzer=analyzer)
mock_event = MagicMock()
mock_event.src_path = 'test.py'
mock_event.is_directory = False
with patch('builtins.open', MagicMock()) as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = 'def new_func(): pass'
mock_open.return_value.__enter__.return_value = mock_file
handler.on_modified(mock_event)
handler.on_modified(mock_event)
handler.on_modified(mock_event)
await asyncio.sleep(0.25)
analyzer.extractor.extract_from_file.assert_called_once_with(Path('test.py'))
analyzer.vector_store.add_function_node.assert_called_once()
@pytest.mark.asyncio
async def test_inflight_dedupe_schedules_single_follow_up(mock_core_components):
"""A duplicate event during in-flight processing should schedule at most one follow-up pass."""
mock_extractor, mock_builder, mock_store = mock_core_components
config = {
'project_root': '.',
'watch_directories': ['.'],
'chroma_dir': './.codeflow/chroma',
'memory_dir': './.codeflow/memory',
'reports_dir': './.codeflow/reports',
'incremental_debounce_seconds': 0.01,
'incremental_inflight_dedupe_enabled': True,
'incremental_max_pending_per_file': 1,
'memory_enabled': False
}
analyzer = MCPAnalyzer(config)
analyzer._is_ignored_path = MagicMock(return_value=False)
analyzer.schedule_incremental_update = MagicMock()
def slow_extract(_path):
time.sleep(0.05)
return []
analyzer.extractor.extract_from_file = MagicMock(side_effect=slow_extract)
task1 = asyncio.create_task(analyzer._run_incremental_update('test.py'))
await asyncio.sleep(0.01)
task2 = asyncio.create_task(analyzer._run_incremental_update('test.py'))
await asyncio.gather(task1, task2)
analyzer.extractor.extract_from_file.assert_called_once_with(Path('test.py'))
analyzer.schedule_incremental_update.assert_called_once_with('test.py')