Skip to main content
Glama
file_watcher.py9.41 kB
"""File watching and hot reload functionality for configuration files.""" import asyncio from collections.abc import Awaitable, Callable from pathlib import Path from typing import TYPE_CHECKING import structlog from watchfiles import awatch from .config_loader import DockerMCPConfig, load_config_async from .exceptions import ConfigurationError if TYPE_CHECKING: from ..server import DockerMCPServer logger = structlog.get_logger() class ConfigFileWatcher: """Watches configuration files for changes and triggers hot reload.""" def __init__( self, config_path: str, reload_callback: Callable[[DockerMCPConfig], Awaitable[None]] ): self.config_path = Path(config_path) self.reload_callback = reload_callback self._watch_task: asyncio.Task | None = None self._is_watching = False self._last_config_hash: str | None = None async def start_watching(self) -> None: """Start watching the configuration file for changes.""" if self._is_watching: logger.warning("File watcher is already running") return if not self.config_path.exists(): logger.warning("Configuration file does not exist", path=str(self.config_path)) return self._is_watching = True self._watch_task = asyncio.create_task(self._watch_files()) logger.info("Started configuration file watcher", path=str(self.config_path)) async def stop_watching(self) -> None: """Stop watching the configuration file.""" if not self._is_watching: return self._is_watching = False if self._watch_task and not self._watch_task.done(): self._watch_task.cancel() try: await self._watch_task except asyncio.CancelledError: pass logger.info("Stopped configuration file watcher") async def _watch_files(self) -> None: """Watch for file changes and trigger reloads.""" try: # Watch the specific config file watch_path = str(self.config_path) logger.debug("Starting file watcher", path=watch_path) async for changes in awatch(watch_path): if not self._is_watching: logger.debug("File watcher stopping - not watching") break # Process any changes to the config file for change_type, file_path in changes: logger.debug("File change detected", path=file_path, change_type=change_type) # Always trigger reload since we're watching the specific file await self._handle_config_change() break # Only process one change at a time except asyncio.CancelledError: logger.debug("File watcher cancelled") raise except Exception as e: logger.error("File watcher error", error=str(e)) # Try to restart watching after a delay if self._is_watching: await asyncio.sleep(5) if self._is_watching: # Check again in case we were stopped during sleep logger.info("Restarting file watcher after error") await self._watch_files() async def _handle_config_change(self) -> None: """Handle configuration file changes.""" try: # Add a small delay to avoid processing partial writes await asyncio.sleep(0.1) # Load the new configuration logger.info("Reloading configuration", path=str(self.config_path)) new_config = await load_config_async(str(self.config_path)) # Calculate a simple hash to avoid unnecessary reloads config_hash = self._calculate_config_hash(new_config) if config_hash == self._last_config_hash: logger.debug("Configuration unchanged, skipping reload") return # Call the reload callback await self.reload_callback(new_config) self._last_config_hash = config_hash logger.info("Configuration reloaded successfully", hosts=list(new_config.hosts.keys())) except Exception as e: logger.error("Failed to reload configuration", error=str(e), path=str(self.config_path)) # Don't re-raise - we want to continue watching def _calculate_config_hash(self, config: DockerMCPConfig) -> str: """Calculate a simple hash of the configuration for change detection.""" # Create a string representation of key configuration elements host_data = [] for host_id, host_config in config.hosts.items(): host_data.append( f"{host_id}:{host_config.hostname}:{host_config.user}:{host_config.enabled}" ) config_str = "|".join(sorted(host_data)) return str(hash(config_str)) class HotReloadManager: """Manages hot reload functionality for the FastMCP server.""" def __init__(self) -> None: self.config_watcher: ConfigFileWatcher | None = None self._server_instance: DockerMCPServer | None = None def setup_hot_reload(self, config_path: str, server_instance: "DockerMCPServer") -> None: """Setup hot reload for the given configuration file and server instance.""" self._server_instance = server_instance self.config_watcher = ConfigFileWatcher(config_path, self._reload_server_config) async def start_hot_reload(self) -> None: """Start the hot reload watcher.""" if self.config_watcher: await self.config_watcher.start_watching() async def stop_hot_reload(self) -> None: """Stop the hot reload watcher.""" if self.config_watcher: await self.config_watcher.stop_watching() async def _reload_server_config(self, new_config: DockerMCPConfig) -> None: """Reload server configuration while preserving active connections.""" try: if not self._server_instance: logger.error("No server instance available for hot reload") return logger.info("Applying hot configuration reload") # Detect configuration changes host_changes = self._detect_host_changes(new_config) # Log changes self._log_host_changes(host_changes) # Update the server configuration using the proper method self._server_instance.update_configuration(new_config) # Clear Docker context cache for updated/removed hosts self._clear_context_cache(host_changes) logger.info("Hot reload completed successfully") except Exception as e: logger.error("Hot reload failed", error=str(e)) raise ConfigurationError(f"Hot reload failed: {e}") from e def _detect_host_changes(self, new_config: DockerMCPConfig) -> dict[str, set[str]]: """Detect added, removed, and updated hosts.""" if not self._server_instance: return {"added": set(), "removed": set(), "updated": set()} old_hosts = set(self._server_instance.config.hosts.keys()) new_hosts = set(new_config.hosts.keys()) added_hosts = new_hosts - old_hosts removed_hosts = old_hosts - new_hosts updated_hosts = set() # Check for updated hosts for host_id in old_hosts & new_hosts: if self._is_host_updated(host_id, new_config): updated_hosts.add(host_id) return { "added": added_hosts, "removed": removed_hosts, "updated": updated_hosts, } def _is_host_updated(self, host_id: str, new_config: DockerMCPConfig) -> bool: """Check if a host configuration has changed.""" if not self._server_instance: return False old_host = self._server_instance.config.hosts[host_id] new_host = new_config.hosts[host_id] return ( old_host.hostname != new_host.hostname or old_host.user != new_host.user or old_host.enabled != new_host.enabled ) def _log_host_changes(self, host_changes: dict[str, set[str]]) -> None: """Log host configuration changes.""" if host_changes["added"]: logger.info("Added hosts during hot reload", hosts=list(host_changes["added"])) if host_changes["removed"]: logger.info("Removed hosts during hot reload", hosts=list(host_changes["removed"])) if host_changes["updated"]: logger.info("Updated hosts during hot reload", hosts=list(host_changes["updated"])) def _clear_context_cache(self, host_changes: dict[str, set[str]]) -> None: """Clear Docker context cache for updated/removed hosts.""" if not self._server_instance or not hasattr(self._server_instance, "context_manager"): return context_manager = self._server_instance.context_manager hosts_to_clear = host_changes["removed"] | host_changes["updated"] for host_id in hosts_to_clear: if host_id in context_manager._context_cache: del context_manager._context_cache[host_id] logger.debug("Cleared context cache for host", host_id=host_id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server