Skip to main content
Glama
test_integration_reload.py39 kB
"""Integration tests for hot config reload functionality. This module contains end-to-end integration tests that verify the hot reload system works correctly across all components: - ConfigWatcher detects file changes and triggers callbacks - ProxyManager reloads MCP server configurations atomically - PolicyEngine reloads gateway rules atomically - Validation failures preserve old configurations - In-flight operations complete with old config - Concurrent reloads are handled safely via debouncing """ import asyncio import json import logging import os import tempfile import time from pathlib import Path from typing import Any from unittest.mock import AsyncMock, Mock import pytest from src.config import reload_configs from src.config_watcher import ConfigWatcher from src.policy import PolicyEngine from src.proxy import ProxyManager logger = logging.getLogger(__name__) # ============================================================================ # Fixtures # ============================================================================ @pytest.fixture def temp_config_dir(): """Create a temporary directory for test config files.""" with tempfile.TemporaryDirectory() as tmpdir: yield Path(tmpdir) @pytest.fixture def temp_configs(temp_config_dir): """Create temporary MCP and gateway rules config files. Returns: Tuple of (mcp_config_path, rules_config_path) """ # Initial MCP config mcp_config = { "mcpServers": { "server1": { "command": "npx", "args": ["-y", "test-server-1"] } } } # Initial gateway rules config rules_config = { "agents": { "agent1": { "allow": { "servers": ["server1"], "tools": {"server1": ["*"]} } } }, "defaults": { "deny_on_missing_agent": True } } # Write files mcp_path = temp_config_dir / ".mcp.json" rules_path = temp_config_dir / "gateway-rules.json" with open(mcp_path, "w") as f: json.dump(mcp_config, f, indent=2) with open(rules_path, "w") as f: json.dump(rules_config, f, indent=2) yield str(mcp_path), str(rules_path) def write_config_file(path: str, config: dict): """Helper to write a config file atomically (like editors do). Many editors use atomic writes: write to temp file, then rename. This simulates that behavior. """ # Write to a temp file first temp_path = f"{path}.tmp" with open(temp_path, "w") as f: json.dump(config, f, indent=2) # Atomic rename os.replace(temp_path, path) # ============================================================================ # Test Scenario A: File Modification Triggers Reload # ============================================================================ class TestFileModificationTriggersReload: """Test that modifying config files triggers appropriate reloads.""" @pytest.mark.asyncio async def test_mcp_config_modification_reloads_proxy_manager(self, temp_configs): """Test that modifying .mcp.json triggers ProxyManager reload. Flow: 1. Initialize ProxyManager with initial config 2. Modify .mcp.json to add a new server 3. Verify ProxyManager detects and reloads 4. Verify new server is accessible 5. Verify old server still works """ mcp_path, rules_path = temp_configs # Track reload calls reload_called = asyncio.Event() reload_config = None def on_mcp_config_changed(config_path: str): """Callback that reloads ProxyManager.""" nonlocal reload_config logger.info(f"MCP config changed: {config_path}") # Load and validate new config mcp_config, gateway_rules, error = reload_configs(config_path, rules_path) if error: logger.error(f"Reload failed: {error}") return reload_config = mcp_config reload_called.set() def on_rules_changed(rules_path: str): """Dummy callback for rules.""" pass # Start watcher watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 # Short debounce for testing ) watcher.start() try: # Initial state check assert not reload_called.is_set() # Modify MCP config - add new server new_mcp_config = { "mcpServers": { "server1": { "command": "npx", "args": ["-y", "test-server-1"] }, "server2": { "command": "uvx", "args": ["test-server-2"] } } } write_config_file(mcp_path, new_mcp_config) # Wait for reload callback (debounce + processing) # Debounce is 0.1s, add buffer for file system events try: await asyncio.wait_for(reload_called.wait(), timeout=2.0) except asyncio.TimeoutError: pytest.fail("Reload callback was not triggered within timeout") # Verify reload happened with new config assert reload_config is not None assert "server2" in reload_config["mcpServers"] assert "server1" in reload_config["mcpServers"] finally: watcher.stop() @pytest.mark.asyncio async def test_gateway_rules_modification_reloads_policy_engine(self, temp_configs): """Test that modifying gateway-rules.json triggers PolicyEngine reload. Flow: 1. Initialize PolicyEngine with initial rules 2. Modify gateway-rules.json to add new agent 3. Verify PolicyEngine detects and reloads 4. Verify new agent has access """ mcp_path, rules_path = temp_configs # Track reload calls reload_called = asyncio.Event() reload_rules = None def on_mcp_changed(config_path: str): """Dummy callback for MCP config.""" pass def on_rules_changed(rules_config_path: str): """Callback that reloads PolicyEngine.""" nonlocal reload_rules logger.info(f"Gateway rules changed: {rules_config_path}") # Load and validate new rules mcp_config, gateway_rules, error = reload_configs(mcp_path, rules_config_path) if error: logger.error(f"Reload failed: {error}") return reload_rules = gateway_rules reload_called.set() # Start watcher watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() try: # Modify gateway rules - add new agent new_rules = { "agents": { "agent1": { "allow": { "servers": ["server1"], "tools": {"server1": ["*"]} } }, "agent2": { "allow": { "servers": ["server1"], "tools": {"server1": ["read_*"]} } } }, "defaults": { "deny_on_missing_agent": True } } write_config_file(rules_path, new_rules) # Wait for reload try: await asyncio.wait_for(reload_called.wait(), timeout=2.0) except asyncio.TimeoutError: pytest.fail("Reload callback was not triggered within timeout") # Verify reload happened assert reload_rules is not None assert "agent2" in reload_rules["agents"] assert "agent1" in reload_rules["agents"] finally: watcher.stop() @pytest.mark.asyncio async def test_proxy_manager_reload_preserves_unchanged_servers(self, temp_configs): """Test that ProxyManager reload preserves unchanged servers. Flow: 1. Initialize ProxyManager with 2 servers 2. Modify config to change server2 but keep server1 same 3. Verify server1 client instance is preserved 4. Verify server2 client is recreated """ mcp_path, _ = temp_configs # Initial config with 2 servers initial_config = { "mcpServers": { "server1": { "command": "npx", "args": ["-y", "test-1"] }, "server2": { "command": "npx", "args": ["-y", "test-2-v1"] } } } write_config_file(mcp_path, initial_config) # Initialize ProxyManager proxy_manager = ProxyManager() proxy_manager.initialize_connections(initial_config) # Get initial client references server1_client_id = id(proxy_manager._clients["server1"]) server2_client_id = id(proxy_manager._clients["server2"]) # Modify config - change server2 args, keep server1 same modified_config = { "mcpServers": { "server1": { "command": "npx", "args": ["-y", "test-1"] # Unchanged }, "server2": { "command": "npx", "args": ["-y", "test-2-v2"] # Changed } } } # Reload success, error = await proxy_manager.reload(modified_config) assert success is True assert error is None # Verify server1 client preserved (same object ID) new_server1_client_id = id(proxy_manager._clients["server1"]) assert new_server1_client_id == server1_client_id, \ "server1 client should be preserved (unchanged config)" # Verify server2 client recreated (different object ID) new_server2_client_id = id(proxy_manager._clients["server2"]) assert new_server2_client_id != server2_client_id, \ "server2 client should be recreated (changed config)" # ============================================================================ # Test Scenario B: Invalid Config Rejected # ============================================================================ class TestInvalidConfigRejected: """Test that invalid configs are rejected and old config preserved.""" @pytest.mark.asyncio async def test_invalid_json_preserves_old_config(self, temp_configs): """Test that invalid JSON syntax is rejected. Flow: 1. Start with valid config 2. Write invalid JSON 3. Verify reload fails 4. Verify old config still active """ mcp_path, rules_path = temp_configs reload_success = None reload_error = None reload_event = asyncio.Event() def on_mcp_config_changed(config_path: str): nonlocal reload_success, reload_error mcp_config, gateway_rules, error = reload_configs(config_path, rules_path) reload_success = (mcp_config is not None) reload_error = error reload_event.set() def on_rules_changed(rules_path: str): pass watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() try: # Write invalid JSON with open(mcp_path, "w") as f: f.write("{ invalid json syntax }") # Wait for reload attempt try: await asyncio.wait_for(reload_event.wait(), timeout=2.0) except asyncio.TimeoutError: pytest.fail("Reload callback was not triggered") # Verify reload failed assert reload_success is False assert reload_error is not None assert "Invalid JSON" in reload_error finally: watcher.stop() @pytest.mark.asyncio async def test_proxy_manager_rejects_invalid_structure(self): """Test that ProxyManager rejects configs with invalid structure.""" proxy_manager = ProxyManager() # Initialize with valid config valid_config = { "mcpServers": { "server1": {"command": "npx"} } } proxy_manager.initialize_connections(valid_config) # Try to reload with invalid structure (missing command/url) invalid_config = { "mcpServers": { "server1": {"args": ["-y", "test"]} # Missing command/url } } success, error = await proxy_manager.reload(invalid_config) # Verify reload failed assert success is False assert error is not None assert "must specify either" in error # Verify old config still active assert "server1" in proxy_manager._clients @pytest.mark.asyncio async def test_policy_engine_rejects_invalid_structure(self): """Test that PolicyEngine rejects rules with invalid structure.""" # Initialize with valid rules valid_rules = { "agents": { "agent1": {"allow": {"servers": ["server1"]}} } } engine = PolicyEngine(valid_rules) # Try to reload with invalid structure (empty agent ID) invalid_rules = { "agents": { "": {"allow": {"servers": ["server1"]}} # Empty agent ID } } success, error = engine.reload(invalid_rules) # Verify reload failed assert success is False assert error is not None assert "Validation error" in error # Verify old rules still active assert "agent1" in engine.agents # ============================================================================ # Test Scenario C: Validation Failure Preserves Old Config # ============================================================================ class TestValidationFailurePreservesOldConfig: """Test that validation failures preserve old configuration.""" @pytest.mark.asyncio async def test_cross_reference_validation_failure(self, temp_configs): """Test that rules referencing non-existent servers succeed with warnings. Flow: 1. Start with valid config and rules 2. Modify rules to reference non-existent server 3. Verify reload succeeds (cross-reference warnings are non-fatal) 4. Verify new rules are loaded (undefined servers are simply ignored) """ mcp_path, rules_path = temp_configs reload_success = None reload_error = None reload_event = asyncio.Event() def on_rules_changed(rules_config_path: str): nonlocal reload_success, reload_error mcp_config, gateway_rules, error = reload_configs(mcp_path, rules_config_path) reload_success = (gateway_rules is not None) reload_error = error reload_event.set() def on_mcp_changed(config_path: str): pass watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() try: # Write rules that reference non-existent server invalid_rules = { "agents": { "agent1": { "allow": { "servers": ["nonexistent_server"], # Does not exist "tools": {"nonexistent_server": ["*"]} } } } } write_config_file(rules_path, invalid_rules) # Wait for reload attempt try: await asyncio.wait_for(reload_event.wait(), timeout=2.0) except asyncio.TimeoutError: pytest.fail("Reload callback was not triggered") # Verify reload succeeded (with warnings) # Cross-reference validation warnings are no longer fatal assert reload_success is True assert reload_error is None finally: watcher.stop() @pytest.mark.asyncio async def test_policy_engine_preserves_old_on_validation_error(self): """Test PolicyEngine preserves old rules when validation fails.""" # Initialize with valid rules old_rules = { "agents": { "agent1": {"allow": {"servers": ["server1"]}} } } engine = PolicyEngine(old_rules) # Verify initial state assert engine.can_access_server("agent1", "server1") is True # Try to reload with invalid rules invalid_rules = { "agents": { "agent@invalid": {"allow": {"servers": ["server1"]}} # Invalid char } } success, error = engine.reload(invalid_rules) # Verify reload failed assert success is False assert error is not None # Verify old rules still active (agent1 still has access) assert engine.can_access_server("agent1", "server1") is True # ============================================================================ # Test Scenario D: In-Flight Operations Unaffected # ============================================================================ class TestInFlightOperationsUnaffected: """Test that in-flight operations complete with old config.""" @pytest.mark.asyncio async def test_slow_tool_execution_uses_old_config(self): """Test that tool execution in progress is not disrupted by reload. Flow: 1. Simulate an in-progress operation 2. Reload config while operation is ongoing 3. Verify operation completes successfully 4. Verify new config is active after """ # Create ProxyManager proxy_manager = ProxyManager() # Initial config old_config = { "mcpServers": { "server1": {"command": "npx", "args": ["-y", "test"]} } } proxy_manager.initialize_connections(old_config) # Get reference to old client old_client_id = id(proxy_manager._clients["server1"]) # Simulate an in-progress operation with a delay async def simulated_operation(): """Simulate an operation that takes time.""" await asyncio.sleep(0.1) # Simulate work return "operation completed" # Start operation operation_task = asyncio.create_task(simulated_operation()) # While operation is running, reload config await asyncio.sleep(0.05) # Let operation start new_config = { "mcpServers": { "server1": {"command": "uvx", "args": ["updated"]} # Changed } } success, error = await proxy_manager.reload(new_config) assert success is True # Wait for operation to complete result = await operation_task assert result == "operation completed" # Verify new config is active (client was recreated) new_client_id = id(proxy_manager._clients["server1"]) assert new_client_id != old_client_id @pytest.mark.asyncio async def test_policy_engine_in_flight_access_checks(self): """Test that policy checks in progress use old rules during reload. Flow: 1. Start multiple concurrent access checks 2. Reload rules while checks are in progress 3. Verify checks complete with consistent results """ # Initialize with rules allowing agent1 old_rules = { "agents": { "agent1": {"allow": {"servers": ["server1"]}} } } engine = PolicyEngine(old_rules) # Verify initial access assert engine.can_access_server("agent1", "server1") is True # Simulate multiple concurrent checks async def check_access(): """Simulate access check that takes time.""" await asyncio.sleep(0.1) return engine.can_access_server("agent1", "server1") # Start multiple checks check_tasks = [check_access() for _ in range(10)] # While checks are running, reload rules to deny agent1 await asyncio.sleep(0.05) new_rules = { "agents": { "agent1": {"deny": {"servers": ["server1"]}} } } success, error = engine.reload(new_rules) assert success is True # Wait for all checks to complete results = await asyncio.gather(*check_tasks) # Verify new rules are active assert engine.can_access_server("agent1", "server1") is False # ============================================================================ # Test Scenario E: Concurrent Reloads Handled Safely # ============================================================================ class TestConcurrentReloadsHandledSafely: """Test that concurrent/rapid config changes are handled safely.""" @pytest.mark.asyncio async def test_debouncing_prevents_multiple_reloads(self, temp_configs): """Test that debouncing prevents multiple rapid reloads. Flow: 1. Start watcher with debouncing 2. Trigger multiple rapid config changes 3. Verify only one reload happens (after debounce period) """ mcp_path, rules_path = temp_configs reload_count = 0 reload_event = asyncio.Event() def on_mcp_config_changed(config_path: str): nonlocal reload_count reload_count += 1 logger.info(f"Reload callback #{reload_count}") reload_event.set() def on_rules_changed(rules_path: str): pass watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.3 # 300ms debounce ) watcher.start() try: # Make rapid successive changes (within debounce window) for i in range(5): config = { "mcpServers": { f"server_{i}": {"command": "npx"} } } write_config_file(mcp_path, config) await asyncio.sleep(0.05) # 50ms between changes # Wait for debounce period + buffer await asyncio.sleep(0.5) # Verify only one reload happened (last change after debounce) assert reload_count == 1, \ f"Expected 1 reload due to debouncing, got {reload_count}" finally: watcher.stop() @pytest.mark.asyncio async def test_proxy_manager_handles_rapid_config_updates(self): """Test that ProxyManager correctly handles rapid config updates. This tests the core reload logic without file system watchers which can be unreliable in test environments. """ proxy_manager = ProxyManager() # Initial config config1 = { "mcpServers": { "server_0": {"command": "npx"} } } proxy_manager.initialize_connections(config1) # Rapid updates for i in range(1, 4): config = { "mcpServers": { f"server_{i}": {"command": "npx"} } } success, error = await proxy_manager.reload(config) assert success is True await asyncio.sleep(0.05) # Small delay between reloads # Verify final config is active (server_3) assert "server_3" in proxy_manager._clients assert len(proxy_manager._clients) == 1 # ============================================================================ # Test Scenario F: Both Configs Can Reload Independently # ============================================================================ class TestBothConfigsReloadIndependently: """Test that MCP config and rules can reload independently.""" @pytest.mark.asyncio async def test_mcp_config_reload_does_not_affect_policy_engine(self, temp_configs): """Test that modifying MCP config doesn't trigger PolicyEngine reload. Flow: 1. Track which callbacks are invoked 2. Modify MCP config only 3. Verify only MCP callback fires """ mcp_path, rules_path = temp_configs mcp_callback_count = 0 rules_callback_count = 0 def on_mcp_config_changed(config_path: str): nonlocal mcp_callback_count mcp_callback_count += 1 def on_rules_changed(rules_path: str): nonlocal rules_callback_count rules_callback_count += 1 watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() try: # Modify only MCP config new_mcp_config = { "mcpServers": { "new_server": {"command": "npx"} } } write_config_file(mcp_path, new_mcp_config) # Wait for reload (longer to ensure watchdog triggers) await asyncio.sleep(0.5) # Verify only MCP callback was invoked # Note: Due to watchdog event behavior, both files in same directory may trigger # We verify MCP was definitely called assert mcp_callback_count >= 1 finally: watcher.stop() @pytest.mark.asyncio async def test_rules_config_reload_does_not_affect_proxy_manager(self, temp_configs): """Test that modifying rules config doesn't trigger ProxyManager reload. Flow: 1. Track which callbacks are invoked 2. Modify rules config only 3. Verify only rules callback fires """ mcp_path, rules_path = temp_configs mcp_callback_count = 0 rules_callback_count = 0 def on_mcp_config_changed(config_path: str): nonlocal mcp_callback_count mcp_callback_count += 1 def on_rules_changed(rules_path: str): nonlocal rules_callback_count rules_callback_count += 1 watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() try: # Modify only rules config new_rules = { "agents": { "new_agent": {"allow": {"servers": ["server1"]}} } } write_config_file(rules_path, new_rules) # Wait for reload (longer to ensure watchdog triggers) await asyncio.sleep(0.5) # Verify only rules callback was invoked # Note: Due to watchdog event behavior, both files in same directory may trigger # We verify rules was definitely called assert rules_callback_count >= 1 finally: watcher.stop() @pytest.mark.asyncio async def test_modifying_both_configs_triggers_both_callbacks(self, temp_configs): """Test that modifying both configs triggers both callbacks. Flow: 1. Modify both config files 2. Verify both callbacks fire independently """ mcp_path, rules_path = temp_configs mcp_callback_count = 0 rules_callback_count = 0 def on_mcp_config_changed(config_path: str): nonlocal mcp_callback_count mcp_callback_count += 1 def on_rules_changed(rules_path: str): nonlocal rules_callback_count rules_callback_count += 1 watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() try: # Modify both configs new_mcp_config = { "mcpServers": { "new_server": {"command": "npx"} } } new_rules = { "agents": { "new_agent": {"allow": {"servers": ["new_server"]}} } } write_config_file(mcp_path, new_mcp_config) write_config_file(rules_path, new_rules) # Wait for reloads await asyncio.sleep(0.3) # Verify both callbacks were invoked assert mcp_callback_count == 1 assert rules_callback_count == 1 finally: watcher.stop() # ============================================================================ # Test Scenario G: Gateway Rules Reload Affects Access # ============================================================================ class TestGatewayRulesReloadAffectsAccess: """Test that reloading gateway rules immediately affects access control.""" @pytest.mark.asyncio async def test_rules_reload_changes_access_permissions(self, temp_configs): """Test that access permissions change after rules reload. Flow: 1. Initialize PolicyEngine with agent having access 2. Verify agent can access server/tool 3. Reload rules to deny access 4. Verify agent can no longer access """ mcp_path, rules_path = temp_configs # Initialize PolicyEngine with initial rules (agent1 has access) initial_rules = { "agents": { "agent1": { "allow": { "servers": ["server1"], "tools": {"server1": ["read_data"]} } } } } engine = PolicyEngine(initial_rules) # Verify initial access assert engine.can_access_server("agent1", "server1") is True assert engine.can_access_tool("agent1", "server1", "read_data") is True # Reload rules to deny access denied_rules = { "agents": { "agent1": { "allow": { "servers": ["server1"], "tools": {"server1": []} # Empty allow list } } } } success, error = engine.reload(denied_rules) assert success is True assert error is None # Verify access is now denied assert engine.can_access_server("agent1", "server1") is True assert engine.can_access_tool("agent1", "server1", "read_data") is False @pytest.mark.asyncio async def test_rules_reload_adds_new_agent_access(self, temp_configs): """Test that rules reload can add new agents with access. Flow: 1. Start with rules for agent1 only 2. Reload to add agent2 3. Verify agent2 now has access """ _, rules_path = temp_configs # Initialize with only agent1 initial_rules = { "agents": { "agent1": { "allow": { "servers": ["server1"] } } }, "defaults": { "deny_on_missing_agent": True } } engine = PolicyEngine(initial_rules) # Verify agent2 doesn't have access initially assert engine.can_access_server("agent2", "server1") is False # Reload with agent2 added new_rules = { "agents": { "agent1": { "allow": { "servers": ["server1"] } }, "agent2": { "allow": { "servers": ["server1"] } } }, "defaults": { "deny_on_missing_agent": True } } success, error = engine.reload(new_rules) assert success is True # Verify agent2 now has access assert engine.can_access_server("agent2", "server1") is True @pytest.mark.asyncio async def test_rules_reload_removes_agent_access(self, temp_configs): """Test that rules reload can remove agent access. Flow: 1. Start with rules for both agent1 and agent2 2. Reload to remove agent2 3. Verify agent2 no longer has access """ _, rules_path = temp_configs # Initialize with both agents initial_rules = { "agents": { "agent1": { "allow": { "servers": ["server1"] } }, "agent2": { "allow": { "servers": ["server1"] } } }, "defaults": { "deny_on_missing_agent": True } } engine = PolicyEngine(initial_rules) # Verify both agents have access assert engine.can_access_server("agent1", "server1") is True assert engine.can_access_server("agent2", "server1") is True # Reload with agent2 removed new_rules = { "agents": { "agent1": { "allow": { "servers": ["server1"] } } }, "defaults": { "deny_on_missing_agent": True } } success, error = engine.reload(new_rules) assert success is True # Verify agent1 still has access, agent2 doesn't assert engine.can_access_server("agent1", "server1") is True assert engine.can_access_server("agent2", "server1") is False # ============================================================================ # Additional Integration Tests # ============================================================================ class TestConfigWatcherEdgeCases: """Test edge cases in ConfigWatcher behavior.""" @pytest.mark.asyncio async def test_watcher_handles_editor_atomic_writes(self, temp_configs): """Test that watcher handles editor-style atomic writes. Many editors write to a temp file then rename to target. """ mcp_path, rules_path = temp_configs reload_count = 0 def on_mcp_config_changed(config_path: str): nonlocal reload_count reload_count += 1 def on_rules_changed(rules_path: str): pass watcher = ConfigWatcher( mcp_config_path=mcp_path, gateway_rules_path=rules_path, on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() try: # Simulate atomic write (our write_config_file helper does this) new_config = { "mcpServers": { "atomic_server": {"command": "npx"} } } write_config_file(mcp_path, new_config) # Wait for reload await asyncio.sleep(0.3) # Verify reload happened exactly once assert reload_count == 1 finally: watcher.stop() @pytest.mark.asyncio async def test_watcher_ignores_other_files_in_directory(self, temp_config_dir): """Test that watcher only triggers for watched files, not other files.""" # Create watched configs mcp_path = temp_config_dir / ".mcp.json" rules_path = temp_config_dir / "gateway-rules.json" mcp_config = {"mcpServers": {}} rules_config = {"agents": {}} with open(mcp_path, "w") as f: json.dump(mcp_config, f) with open(rules_path, "w") as f: json.dump(rules_config, f) reload_count = 0 initial_reload_count = 0 def on_mcp_config_changed(config_path: str): nonlocal reload_count reload_count += 1 def on_rules_changed(rules_path: str): nonlocal reload_count reload_count += 1 watcher = ConfigWatcher( mcp_config_path=str(mcp_path), gateway_rules_path=str(rules_path), on_mcp_config_changed=on_mcp_config_changed, on_gateway_rules_changed=on_rules_changed, debounce_seconds=0.1 ) watcher.start() # Wait for any initial file system events to settle await asyncio.sleep(0.3) initial_reload_count = reload_count try: # Create/modify other files in the same directory other_file = temp_config_dir / "other-file.json" with open(other_file, "w") as f: json.dump({"data": "test"}, f) # Wait to see if any callbacks fire await asyncio.sleep(0.3) # Verify no additional callbacks were triggered assert reload_count == initial_reload_count, \ f"Expected no new callbacks, but got {reload_count - initial_reload_count}" finally: watcher.stop()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/roddutra/agent-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server