Skip to main content
Glama
test_config_watcher.py35.2 kB
"""Unit tests for ConfigWatcher file monitoring and hot reload.""" import json import logging import tempfile import threading import time from pathlib import Path from unittest.mock import Mock, patch import pytest from src.config_watcher import ConfigWatcher, _ConfigFileEventHandler class TestConfigWatcherInitialization: """Test cases for ConfigWatcher initialization.""" def test_initialization_with_absolute_paths(self, tmp_path): """Test initialization with absolute paths.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") callback1 = Mock() callback2 = Mock() watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=callback1, on_gateway_rules_changed=callback2, debounce_seconds=0.3 ) assert watcher.mcp_config_path == mcp_file.resolve() assert watcher.gateway_rules_path == rules_file.resolve() assert watcher.on_mcp_config_changed == callback1 assert watcher.on_gateway_rules_changed == callback2 assert watcher.debounce_seconds == 0.3 assert watcher.observer is None assert watcher._pending_timers == {} def test_initialization_with_relative_paths(self, tmp_path, monkeypatch): """Test initialization with relative paths.""" monkeypatch.chdir(tmp_path) mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") callback1 = Mock() callback2 = Mock() watcher = ConfigWatcher( mcp_config_path="mcp.json", gateway_rules_path="rules.json", on_mcp_config_changed=callback1, on_gateway_rules_changed=callback2 ) # Paths should be resolved to absolute assert watcher.mcp_config_path.is_absolute() assert watcher.gateway_rules_path.is_absolute() assert watcher.mcp_config_path.name == "mcp.json" assert watcher.gateway_rules_path.name == "rules.json" def test_initialization_default_debounce(self, tmp_path): """Test default debounce time.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) assert watcher.debounce_seconds == 0.1 def test_initialization_custom_debounce(self, tmp_path): """Test custom debounce time.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock(), debounce_seconds=0.5 ) assert watcher.debounce_seconds == 0.5 class TestConfigWatcherStartStop: """Test cases for ConfigWatcher start/stop lifecycle.""" def test_start_creates_observer(self, tmp_path): """Test that start creates and starts observer.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) watcher.start() assert watcher.observer is not None assert watcher.observer.is_alive() watcher.stop() def test_start_same_directory(self, tmp_path): """Test starting watcher when both files in same directory.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) watcher.start() # Should only schedule one directory watch assert watcher.observer is not None watcher.stop() def test_start_different_directories(self, tmp_path): """Test starting watcher when files in different directories.""" mcp_dir = tmp_path / "config" rules_dir = tmp_path / "rules" mcp_dir.mkdir() rules_dir.mkdir() mcp_file = mcp_dir / "mcp.json" rules_file = rules_dir / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) watcher.start() # Should schedule watches for both directories assert watcher.observer is not None watcher.stop() def test_start_already_running_raises_error(self, tmp_path): """Test that starting twice raises RuntimeError.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) watcher.start() with pytest.raises(RuntimeError) as exc_info: watcher.start() assert "already running" in str(exc_info.value) watcher.stop() def test_start_nonexistent_directory_raises_error(self, tmp_path): """Test error when directory doesn't exist.""" # Note: This test is skipped because watchdog may not raise an error immediately # for nonexistent directories on all platforms. The error would occur later when # the observer thread tries to watch the directory. pytest.skip("Watchdog behavior varies across platforms for nonexistent directories") def test_stop_cleans_up_observer(self, tmp_path): """Test that stop cleans up observer.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) watcher.start() assert watcher.observer is not None watcher.stop() assert watcher.observer is None def test_stop_cancels_pending_timers(self, tmp_path): """Test that stop cancels pending debounce timers.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") callback = Mock() watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=callback, on_gateway_rules_changed=Mock(), debounce_seconds=1.0 # Long debounce to test cancellation ) watcher.start() # Trigger a file change mcp_file.write_text('{"changed": true}') time.sleep(0.1) # Let event be detected # Stop before debounce timer fires watcher.stop() # Wait to ensure timer doesn't fire time.sleep(1.2) # Callback should not have been called since timer was cancelled callback.assert_not_called() def test_stop_idempotent(self, tmp_path): """Test that stop can be called multiple times safely.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) watcher.start() watcher.stop() watcher.stop() # Should not raise error watcher.stop() # Should not raise error def test_stop_without_start(self, tmp_path): """Test that stop without start is safe.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text("{}") rules_file.write_text("{}") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) watcher.stop() # Should not raise error class TestConfigWatcherFileChangeDetection: """Test cases for detecting file changes.""" def test_detect_mcp_config_modification(self, tmp_path): """Test detection of MCP config file modification.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') callback_calls = [] def mcp_callback(path): callback_calls.append(("mcp", path)) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() # Modify MCP config time.sleep(0.2) mcp_file.write_text('{"mcpServers": {"new": {}}}') # Wait for debounce + callback time.sleep(0.3) watcher.stop() # Callback should have been invoked assert len(callback_calls) > 0 assert callback_calls[0][0] == "mcp" assert mcp_file.resolve() == Path(callback_calls[0][1]) def test_detect_gateway_rules_modification(self, tmp_path): """Test detection of gateway rules file modification.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') callback_calls = [] def rules_callback(path): callback_calls.append(("rules", path)) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=rules_callback, debounce_seconds=0.1 ) watcher.start() # Modify gateway rules time.sleep(0.2) rules_file.write_text('{"agents": {"new": {}}}') # Wait for debounce + callback time.sleep(0.3) watcher.stop() # Callback should have been invoked assert len(callback_calls) > 0 assert callback_calls[0][0] == "rules" assert rules_file.resolve() == Path(callback_calls[0][1]) def test_detect_file_creation(self, tmp_path): """Test detection when file is created.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" # Create directory but not files initially rules_file.write_text('{"agents": {}}') callback_calls = [] def mcp_callback(path): callback_calls.append(path) # Write initial file so watcher can start mcp_file.write_text('{}') watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) # Delete and recreate file (simulates some editors) mcp_file.unlink() time.sleep(0.1) mcp_file.write_text('{"created": true}') # Wait for debounce + callback time.sleep(0.3) watcher.stop() # Callback should have been invoked for creation assert len(callback_calls) > 0 def test_detect_atomic_write_move(self, tmp_path): """Test detection of atomic write (temp file + rename).""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') callback_calls = [] def mcp_callback(path): callback_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) # Simulate atomic write: write to temp, rename to target temp_file = tmp_path / "mcp.json.tmp" temp_file.write_text('{"mcpServers": {"new": {}}}') temp_file.replace(mcp_file) # Wait for debounce + callback time.sleep(0.3) watcher.stop() # Callback should have been invoked assert len(callback_calls) > 0 def test_ignores_other_files_in_directory(self, tmp_path): """Test that changes to other files in directory are ignored.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" other_file = tmp_path / "other.txt" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') mcp_callback = Mock() rules_callback = Mock() watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=rules_callback, debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) # Clear any initialization events mcp_callback.reset_mock() rules_callback.reset_mock() # Modify other file other_file.write_text("This should be ignored") # Wait and verify callback NOT called time.sleep(0.3) watcher.stop() mcp_callback.assert_not_called() rules_callback.assert_not_called() def test_ignores_directory_events(self, tmp_path): """Test that directory events are ignored.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') mcp_callback = Mock() rules_callback = Mock() watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=rules_callback, debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) # Clear any initialization events mcp_callback.reset_mock() rules_callback.reset_mock() # Create subdirectory (directory event) subdir = tmp_path / "subdir" subdir.mkdir() # Wait and verify callback NOT called time.sleep(0.3) watcher.stop() mcp_callback.assert_not_called() rules_callback.assert_not_called() class TestConfigWatcherDebouncing: """Test cases for debouncing behavior.""" def test_debouncing_single_callback_for_rapid_changes(self, tmp_path): """Test that rapid changes trigger only one callback.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') callback_calls = [] def mcp_callback(path): callback_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.3 ) watcher.start() time.sleep(0.2) # Make rapid changes for i in range(5): mcp_file.write_text(f'{{"mcpServers": {{"change": {i}}}}}') time.sleep(0.05) # Less than debounce time # Wait for debounce period after last change time.sleep(0.5) watcher.stop() # Should only have one callback despite multiple changes assert len(callback_calls) == 1 def test_debouncing_resets_timer_on_new_change(self, tmp_path): """Test that new change resets debounce timer.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') callback_calls = [] callback_times = [] def mcp_callback(path): callback_calls.append(path) callback_times.append(time.time()) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.3 ) watcher.start() time.sleep(0.2) start_time = time.time() # First change mcp_file.write_text('{"mcpServers": {"v": 1}}') time.sleep(0.2) # Wait less than debounce # Second change (should reset timer) mcp_file.write_text('{"mcpServers": {"v": 2}}') # Wait for debounce period time.sleep(0.5) watcher.stop() # Should only have one callback assert len(callback_calls) == 1 # Callback should have fired after debounce from second change # (not from first change) assert callback_times[0] - start_time >= 0.5 def test_debouncing_independent_per_file(self, tmp_path): """Test that debouncing is independent for each file.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') mcp_calls = [] rules_calls = [] def mcp_callback(path): mcp_calls.append(path) def rules_callback(path): rules_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=rules_callback, debounce_seconds=0.2 ) watcher.start() time.sleep(0.3) # Change both files mcp_file.write_text('{"mcpServers": {"new": {}}}') time.sleep(0.05) rules_file.write_text('{"agents": {"new": {}}}') # Wait for debounce time.sleep(0.5) watcher.stop() # Both callbacks should have been invoked at least once assert len(mcp_calls) >= 1 assert len(rules_calls) >= 1 class TestConfigWatcherCallbackExecution: """Test cases for callback invocation.""" def test_callback_receives_correct_path(self, tmp_path): """Test that callback receives the correct file path.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') received_path = None def mcp_callback(path): nonlocal received_path received_path = path watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) mcp_file.write_text('{"mcpServers": {"changed": true}}') time.sleep(0.3) watcher.stop() assert received_path is not None assert Path(received_path) == mcp_file.resolve() def test_callback_exception_doesnt_crash_watcher(self, tmp_path): """Test that callback exception doesn't crash watcher.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') call_count = [0] def failing_callback(path): call_count[0] += 1 raise Exception("Callback error") watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=failing_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) # First change mcp_file.write_text('{"mcpServers": {"v": 1}}') time.sleep(0.3) # Watcher should still be running despite exception assert watcher.observer is not None assert watcher.observer.is_alive() # Make another change to verify watcher still works time.sleep(0.2) mcp_file.write_text('{"mcpServers": {"v": 2}}') time.sleep(0.3) watcher.stop() # At least one change should have triggered callback (despite exceptions) # Note: File system events can be triggered multiple times, so we just verify >= 1 assert call_count[0] >= 1 def test_callback_runs_in_separate_thread(self, tmp_path): """Test that callback runs in separate thread from main.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') main_thread_id = threading.get_ident() callback_thread_id = None def mcp_callback(path): nonlocal callback_thread_id callback_thread_id = threading.get_ident() watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) mcp_file.write_text('{"mcpServers": {"changed": true}}') time.sleep(0.3) watcher.stop() assert callback_thread_id is not None assert callback_thread_id != main_thread_id class TestConfigWatcherThreadSafety: """Test cases for thread safety.""" def test_concurrent_file_changes(self, tmp_path): """Test handling concurrent changes to different files.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') mcp_calls = [] rules_calls = [] mcp_lock = threading.Lock() rules_lock = threading.Lock() def mcp_callback(path): with mcp_lock: mcp_calls.append(path) def rules_callback(path): with rules_lock: rules_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=rules_callback, debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) # Change both files simultaneously def change_mcp(): for i in range(3): mcp_file.write_text(f'{{"mcpServers": {{"v": {i}}}}}') time.sleep(0.05) def change_rules(): for i in range(3): rules_file.write_text(f'{{"agents": {{"v": {i}}}}}') time.sleep(0.05) thread1 = threading.Thread(target=change_mcp) thread2 = threading.Thread(target=change_rules) thread1.start() thread2.start() thread1.join() thread2.join() # Wait for debounce time.sleep(0.3) watcher.stop() # Both callbacks should have been invoked with mcp_lock: assert len(mcp_calls) >= 1 with rules_lock: assert len(rules_calls) >= 1 def test_stop_while_callback_running(self, tmp_path): """Test stopping watcher while callback is executing.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{"mcpServers": {}}') rules_file.write_text('{"agents": {}}') callback_started = threading.Event() callback_finished = threading.Event() def slow_callback(path): callback_started.set() time.sleep(0.5) # Simulate slow callback callback_finished.set() watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=slow_callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) mcp_file.write_text('{"mcpServers": {"changed": true}}') # Wait for callback to start callback_started.wait(timeout=1.0) # Stop while callback is running watcher.stop() # Callback should still complete assert callback_finished.wait(timeout=1.0) class TestEventHandler: """Test cases for _ConfigFileEventHandler.""" def test_handler_forwards_modified_events(self, tmp_path): """Test that handler forwards file modification events.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{}') rules_file.write_text('{}') watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) handler = _ConfigFileEventHandler(watcher) # Mock event event = Mock() event.is_directory = False event.src_path = str(mcp_file) # Manually call handler (without starting watcher) with patch.object(watcher, '_handle_file_change') as mock_handle: handler.on_modified(event) mock_handle.assert_called_once() # Verify path was resolved call_args = mock_handle.call_args[0][0] assert call_args == mcp_file.resolve() def test_handler_forwards_created_events(self, tmp_path): """Test that handler forwards file creation events.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{}') rules_file.write_text('{}') watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) handler = _ConfigFileEventHandler(watcher) event = Mock() event.is_directory = False event.src_path = str(mcp_file) with patch.object(watcher, '_handle_file_change') as mock_handle: handler.on_created(event) mock_handle.assert_called_once() def test_handler_forwards_moved_events(self, tmp_path): """Test that handler forwards file move events.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{}') rules_file.write_text('{}') watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) handler = _ConfigFileEventHandler(watcher) event = Mock() event.is_directory = False event.dest_path = str(mcp_file) with patch.object(watcher, '_handle_file_change') as mock_handle: handler.on_moved(event) mock_handle.assert_called_once() def test_handler_ignores_directory_events(self, tmp_path): """Test that handler ignores directory events.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{}') rules_file.write_text('{}') watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) handler = _ConfigFileEventHandler(watcher) event = Mock() event.is_directory = True event.src_path = str(tmp_path) with patch.object(watcher, '_handle_file_change') as mock_handle: handler.on_modified(event) mock_handle.assert_not_called() def test_handler_error_handling(self, tmp_path): """Test that handler catches and logs exceptions.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{}') rules_file.write_text('{}') watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=Mock(), on_gateway_rules_changed=Mock() ) handler = _ConfigFileEventHandler(watcher) event = Mock() event.is_directory = False event.src_path = str(mcp_file) # Make _handle_file_change raise exception with patch.object(watcher, '_handle_file_change', side_effect=Exception("Test error")): # Should not raise exception handler.on_modified(event) class TestConfigWatcherEdgeCases: """Test edge cases and boundary conditions.""" def test_very_short_debounce(self, tmp_path): """Test with very short debounce time.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{}') rules_file.write_text('{}') callback_calls = [] def callback(path): callback_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.01 # Very short ) watcher.start() time.sleep(0.1) mcp_file.write_text('{"changed": true}') time.sleep(0.1) watcher.stop() assert len(callback_calls) > 0 def test_zero_debounce(self, tmp_path): """Test with zero debounce time.""" mcp_file = tmp_path / "mcp.json" rules_file = tmp_path / "rules.json" mcp_file.write_text('{}') rules_file.write_text('{}') callback_calls = [] def callback(path): callback_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(mcp_file), gateway_rules_path=str(rules_file), on_mcp_config_changed=callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.0 ) watcher.start() time.sleep(0.1) mcp_file.write_text('{"changed": true}') time.sleep(0.1) watcher.stop() assert len(callback_calls) > 0 def test_same_file_for_both_configs(self, tmp_path): """Test behavior when same file is used for both configs. Note: When the same file is used for both configs, the ConfigWatcher will handle each change event twice (once for mcp_config and once for gateway_rules), but debouncing may coalesce them. The exact behavior depends on the file system event timing. """ config_file = tmp_path / "config.json" config_file.write_text('{}') mcp_calls = [] rules_calls = [] def mcp_callback(path): mcp_calls.append(path) def rules_callback(path): rules_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(config_file), gateway_rules_path=str(config_file), # Same file on_mcp_config_changed=mcp_callback, on_gateway_rules_changed=rules_callback, debounce_seconds=0.1 ) watcher.start() time.sleep(0.3) config_file.write_text('{"changed": true}') time.sleep(0.5) watcher.stop() # At least one callback should be invoked (could be both) # The exact behavior depends on how the file system and debouncing interact total_calls = len(mcp_calls) + len(rules_calls) assert total_calls >= 1, f"Expected at least 1 callback, got {total_calls}" def test_symlink_handling(self, tmp_path): """Test handling of symlinked config files.""" # Create actual files actual_mcp = tmp_path / "actual_mcp.json" actual_rules = tmp_path / "actual_rules.json" actual_mcp.write_text('{"mcpServers": {}}') actual_rules.write_text('{"agents": {}}') # Create symlinks link_mcp = tmp_path / "mcp.json" link_rules = tmp_path / "rules.json" link_mcp.symlink_to(actual_mcp) link_rules.symlink_to(actual_rules) callback_calls = [] def callback(path): callback_calls.append(path) watcher = ConfigWatcher( mcp_config_path=str(link_mcp), gateway_rules_path=str(link_rules), on_mcp_config_changed=callback, on_gateway_rules_changed=Mock(), debounce_seconds=0.1 ) watcher.start() time.sleep(0.2) # Modify through symlink link_mcp.write_text('{"mcpServers": {"new": {}}}') time.sleep(0.3) watcher.stop() # Should detect change assert len(callback_calls) > 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/roddutra/agent-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server