Skip to main content
Glama
instance_tracker.py10.1 kB
""" Cross-platform instance tracking for OpenZIM MCP servers. This module provides functionality to track running OpenZIM MCP server instances using file-based tracking in the user's home directory, replacing the platform-specific process detection approach. """ import json import logging import os import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) def safe_log(log_func: Any, message: str) -> None: """ Safely log a message, handling cases where logging system is shut down. This is particularly important for atexit handlers that may run after the logging system has been shut down. """ try: log_func(message) except Exception: # Catch all exceptions during logging, including: # - ValueError: I/O operation on closed file # - OSError: file descriptor issues # - AttributeError: logging objects may be None during shutdown # - Any other logging-related errors during shutdown pass class ServerInstance: """Represents an OpenZIM MCP server instance.""" def __init__( self, pid: int, config_hash: str, allowed_directories: List[str], start_time: float, server_name: str = "openzim-mcp", ): self.pid = pid self.config_hash = config_hash self.allowed_directories = allowed_directories self.start_time = start_time self.server_name = server_name self.last_heartbeat = time.time() def to_dict(self) -> Dict[str, Any]: """Convert instance to dictionary for JSON serialization.""" return { "pid": self.pid, "config_hash": self.config_hash, "allowed_directories": self.allowed_directories, "start_time": self.start_time, "server_name": self.server_name, "last_heartbeat": self.last_heartbeat, "start_time_iso": datetime.fromtimestamp( self.start_time, tz=timezone.utc ).isoformat(), "last_heartbeat_iso": datetime.fromtimestamp( self.last_heartbeat, tz=timezone.utc ).isoformat(), } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ServerInstance": """Create instance from dictionary.""" instance = cls( pid=data["pid"], config_hash=data["config_hash"], allowed_directories=data["allowed_directories"], start_time=data["start_time"], server_name=data.get("server_name", "openzim-mcp"), ) instance.last_heartbeat = data.get("last_heartbeat", data["start_time"]) return instance def is_alive(self) -> bool: """Check if the process is still running.""" try: # On Unix-like systems, sending signal 0 checks if process exists # On Windows, this will raise an exception for non-existent processes os.kill(self.pid, 0) return True except (OSError, ProcessLookupError): return False def update_heartbeat(self) -> None: """Update the last heartbeat timestamp.""" self.last_heartbeat = time.time() class InstanceTracker: """Manages OpenZIM MCP server instance tracking using file-based storage.""" def __init__(self) -> None: self.instances_dir = Path.home() / ".openzim_mcp_instances" self.instances_dir.mkdir(exist_ok=True) self.current_instance: Optional[ServerInstance] = None def register_instance( self, config_hash: str, allowed_directories: List[str], server_name: str = "openzim-mcp", ) -> ServerInstance: """Register a new server instance.""" pid = os.getpid() start_time = time.time() instance = ServerInstance( pid=pid, config_hash=config_hash, allowed_directories=allowed_directories, start_time=start_time, server_name=server_name, ) # Save instance file instance_file = self.instances_dir / f"server_{pid}.json" try: with open(instance_file, "w") as f: json.dump(instance.to_dict(), f, indent=2) safe_log( logger.info, f"Registered server instance: PID {pid}, config hash {config_hash[:8]}", ) except Exception as e: safe_log(logger.warning, f"Failed to register instance: {e}") self.current_instance = instance return instance def unregister_instance( self, pid: Optional[int] = None, silent: bool = False ) -> None: """Unregister a server instance.""" if pid is None: pid = os.getpid() instance_file = self.instances_dir / f"server_{pid}.json" try: if instance_file.exists(): instance_file.unlink() if not silent: safe_log(logger.info, f"Unregistered server instance: PID {pid}") except Exception as e: if not silent: safe_log(logger.warning, f"Failed to unregister instance: {e}") if self.current_instance and self.current_instance.pid == pid: self.current_instance = None def get_all_instances(self) -> List[ServerInstance]: """Get all registered server instances.""" instances = [] for instance_file in self.instances_dir.glob("server_*.json"): try: with open(instance_file, "r") as f: data = json.load(f) instance = ServerInstance.from_dict(data) instances.append(instance) except Exception as e: logger.warning(f"Failed to load instance from {instance_file}: {e}") # Clean up corrupted files try: instance_file.unlink() except Exception: pass return instances def get_active_instances(self) -> List[ServerInstance]: """Get only active (running) server instances.""" all_instances = self.get_all_instances() active_instances = [] for instance in all_instances: if self._is_process_running(instance.pid): active_instances.append(instance) else: # Clean up stale instance files self.unregister_instance(instance.pid) return active_instances def detect_conflicts(self, current_config_hash: str) -> List[Dict[str, Any]]: """Detect potential conflicts with other server instances.""" active_instances = self.get_active_instances() conflicts = [] for instance in active_instances: if instance.pid == os.getpid(): continue # Skip current instance conflict_info = { "type": "multiple_instances", "instance": instance.to_dict(), "severity": "warning", } # Check for configuration conflicts if instance.config_hash != current_config_hash: conflict_info["type"] = "configuration_mismatch" conflict_info["severity"] = "high" conflict_info["details"] = "Different server configurations detected" conflicts.append(conflict_info) return conflicts def cleanup_stale_instances(self) -> int: """Clean up stale instance files and return count of cleaned files.""" cleaned_count = 0 for instance_file in self.instances_dir.glob("server_*.json"): try: with open(instance_file, "r") as f: data = json.load(f) instance = ServerInstance.from_dict(data) if not self._is_process_running(instance.pid): instance_file.unlink() cleaned_count += 1 logger.debug(f"Cleaned up stale instance file: {instance_file}") except Exception: # If we can't read the file, it's probably corrupted try: instance_file.unlink() cleaned_count += 1 logger.debug(f"Cleaned up corrupted instance file: {instance_file}") except Exception: pass return cleaned_count def update_heartbeat(self) -> None: """Update heartbeat for current instance.""" if self.current_instance: self.current_instance.update_heartbeat() # Update the instance file instance_file = ( self.instances_dir / f"server_{self.current_instance.pid}.json" ) try: with open(instance_file, "w") as f: json.dump(self.current_instance.to_dict(), f, indent=2) except Exception as e: logger.warning(f"Failed to update heartbeat: {e}") def _is_process_running(self, pid: int) -> bool: """Check if a process is running by PID.""" import platform import subprocess if platform.system() == "Windows": try: result = subprocess.run( ["tasklist", "/FI", f"PID eq {pid}"], capture_output=True, text=True, timeout=5, ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False else: try: # On Unix-like systems, sending signal 0 checks if process exists os.kill(pid, 0) return True except (OSError, ProcessLookupError): return False except PermissionError: # Process exists but we don't have permission to signal it return True

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cameronrye/openzim-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server