Skip to main content
Glama

MCP Template

manager.py7.25 kB
""" Registry manager for handling server registration lifecycle. """ import asyncio from typing import Any, Dict, Optional from ..core.logger import get_logger from ..core.settings import Settings from .base import RegistryClient from .http_client import HTTPRegistryClient class RegistryManager: """ Manages the lifecycle of server registration with the registry. Handles: - Initial registration - Periodic heartbeats - Graceful deregistration - Automatic reconnection on failures """ def __init__(self, settings: Settings, client: Optional[RegistryClient] = None): """ Initialize the registry manager. Args: settings: Application settings client: Optional custom registry client (defaults to HTTPRegistryClient) """ self.settings = settings self.logger = get_logger(__name__) # Create registry client self.client = client or HTTPRegistryClient(settings.registry) # State self.server_id: Optional[str] = None self.registered = False self._heartbeat_task: Optional[asyncio.Task] = None self._shutdown = False def _build_server_info(self) -> Dict[str, Any]: """ Build server information for registration. Returns: Dictionary containing server information """ return { "name": self.settings.server.name, "version": self.settings.server.version, "description": self.settings.server.description, "metadata": { "tags": self.settings.registry.metadata.tags, "capabilities": self.settings.registry.metadata.capabilities, "environment": self.settings.registry.metadata.environment.value, }, } async def register(self) -> bool: """ Register the server with the registry. Returns: True if registration was successful Raises: Exception: If registration fails after all retries """ if not self.settings.registry.enabled: self.logger.info("Registry integration disabled") return False try: server_info = self._build_server_info() result = await self.client.register(server_info) self.server_id = result.get("server_id") or result.get("id") self.registered = True self.logger.info( "Successfully registered with registry", server_id=self.server_id, ) # Start heartbeat if enabled if self.settings.registry.heartbeat.enabled: await self.start_heartbeat() return True except Exception as e: self.logger.error("Failed to register with registry", error=str(e)) raise async def deregister(self) -> bool: """ Deregister the server from the registry. Returns: True if deregistration was successful """ if not self.registered or not self.server_id: self.logger.debug("Server not registered, skipping deregistration") return True self._shutdown = True # Stop heartbeat await self.stop_heartbeat() try: result = await self.client.deregister(self.server_id) self.registered = False self.server_id = None self.logger.info("Successfully deregistered from registry") return result except Exception as e: self.logger.error("Failed to deregister from registry", error=str(e)) return False async def start_heartbeat(self) -> None: """Start the periodic heartbeat task.""" if self._heartbeat_task is not None: self.logger.warning("Heartbeat task already running") return self.logger.info( "Starting heartbeat task", interval=self.settings.registry.heartbeat.interval, ) self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) async def stop_heartbeat(self) -> None: """Stop the periodic heartbeat task.""" if self._heartbeat_task is None: return self.logger.info("Stopping heartbeat task") self._heartbeat_task.cancel() try: await self._heartbeat_task except asyncio.CancelledError: pass self._heartbeat_task = None async def _heartbeat_loop(self) -> None: """ Periodic heartbeat loop. Sends heartbeats at configured intervals and handles failures. """ interval = self.settings.registry.heartbeat.interval while not self._shutdown: try: await asyncio.sleep(interval) if not self.server_id: self.logger.warning("No server_id available for heartbeat") continue success = await self.client.heartbeat(self.server_id) if not success: self.logger.warning("Heartbeat failed, will retry") # Could implement reconnection logic here except asyncio.CancelledError: self.logger.debug("Heartbeat loop cancelled") break except Exception as e: self.logger.error("Error in heartbeat loop", error=str(e), exc_info=True) # Continue the loop even on errors async def update_metadata(self, metadata: Dict[str, Any]) -> bool: """ Update server metadata in the registry. Args: metadata: Updated metadata Returns: True if update was successful """ if not self.registered or not self.server_id: self.logger.warning("Server not registered, cannot update metadata") return False try: result = await self.client.update_metadata(self.server_id, metadata) if result: self.logger.info("Metadata updated successfully") return result except Exception as e: self.logger.error("Failed to update metadata", error=str(e)) return False async def health_check(self) -> Dict[str, Any]: """ Perform health check on registry connection. Returns: Dictionary containing health status """ health = { "registry_enabled": self.settings.registry.enabled, "registered": self.registered, "server_id": self.server_id, } if self.settings.registry.enabled: health["registry_reachable"] = await self.client.health_check() return health async def __aenter__(self) -> "RegistryManager": """Async context manager entry.""" if self.settings.registry.enabled: await self.register() return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" if self.settings.registry.enabled: await self.deregister()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/riteshsonawala/mcp-template'

If you have feedback or need assistance with the MCP directory API, please join our Discord server