manager.py•7.25 kB
"""
Registry manager for handling server registration lifecycle.
"""
import asyncio
from typing import Any, Dict, Optional
from ..core.logger import get_logger
from ..core.settings import Settings
from .base import RegistryClient
from .http_client import HTTPRegistryClient
class RegistryManager:
"""
Manages the lifecycle of server registration with the registry.
Handles:
- Initial registration
- Periodic heartbeats
- Graceful deregistration
- Automatic reconnection on failures
"""
def __init__(self, settings: Settings, client: Optional[RegistryClient] = None):
"""
Initialize the registry manager.
Args:
settings: Application settings
client: Optional custom registry client (defaults to HTTPRegistryClient)
"""
self.settings = settings
self.logger = get_logger(__name__)
# Create registry client
self.client = client or HTTPRegistryClient(settings.registry)
# State
self.server_id: Optional[str] = None
self.registered = False
self._heartbeat_task: Optional[asyncio.Task] = None
self._shutdown = False
def _build_server_info(self) -> Dict[str, Any]:
"""
Build server information for registration.
Returns:
Dictionary containing server information
"""
return {
"name": self.settings.server.name,
"version": self.settings.server.version,
"description": self.settings.server.description,
"metadata": {
"tags": self.settings.registry.metadata.tags,
"capabilities": self.settings.registry.metadata.capabilities,
"environment": self.settings.registry.metadata.environment.value,
},
}
async def register(self) -> bool:
"""
Register the server with the registry.
Returns:
True if registration was successful
Raises:
Exception: If registration fails after all retries
"""
if not self.settings.registry.enabled:
self.logger.info("Registry integration disabled")
return False
try:
server_info = self._build_server_info()
result = await self.client.register(server_info)
self.server_id = result.get("server_id") or result.get("id")
self.registered = True
self.logger.info(
"Successfully registered with registry",
server_id=self.server_id,
)
# Start heartbeat if enabled
if self.settings.registry.heartbeat.enabled:
await self.start_heartbeat()
return True
except Exception as e:
self.logger.error("Failed to register with registry", error=str(e))
raise
async def deregister(self) -> bool:
"""
Deregister the server from the registry.
Returns:
True if deregistration was successful
"""
if not self.registered or not self.server_id:
self.logger.debug("Server not registered, skipping deregistration")
return True
self._shutdown = True
# Stop heartbeat
await self.stop_heartbeat()
try:
result = await self.client.deregister(self.server_id)
self.registered = False
self.server_id = None
self.logger.info("Successfully deregistered from registry")
return result
except Exception as e:
self.logger.error("Failed to deregister from registry", error=str(e))
return False
async def start_heartbeat(self) -> None:
"""Start the periodic heartbeat task."""
if self._heartbeat_task is not None:
self.logger.warning("Heartbeat task already running")
return
self.logger.info(
"Starting heartbeat task",
interval=self.settings.registry.heartbeat.interval,
)
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
async def stop_heartbeat(self) -> None:
"""Stop the periodic heartbeat task."""
if self._heartbeat_task is None:
return
self.logger.info("Stopping heartbeat task")
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
self._heartbeat_task = None
async def _heartbeat_loop(self) -> None:
"""
Periodic heartbeat loop.
Sends heartbeats at configured intervals and handles failures.
"""
interval = self.settings.registry.heartbeat.interval
while not self._shutdown:
try:
await asyncio.sleep(interval)
if not self.server_id:
self.logger.warning("No server_id available for heartbeat")
continue
success = await self.client.heartbeat(self.server_id)
if not success:
self.logger.warning("Heartbeat failed, will retry")
# Could implement reconnection logic here
except asyncio.CancelledError:
self.logger.debug("Heartbeat loop cancelled")
break
except Exception as e:
self.logger.error("Error in heartbeat loop", error=str(e), exc_info=True)
# Continue the loop even on errors
async def update_metadata(self, metadata: Dict[str, Any]) -> bool:
"""
Update server metadata in the registry.
Args:
metadata: Updated metadata
Returns:
True if update was successful
"""
if not self.registered or not self.server_id:
self.logger.warning("Server not registered, cannot update metadata")
return False
try:
result = await self.client.update_metadata(self.server_id, metadata)
if result:
self.logger.info("Metadata updated successfully")
return result
except Exception as e:
self.logger.error("Failed to update metadata", error=str(e))
return False
async def health_check(self) -> Dict[str, Any]:
"""
Perform health check on registry connection.
Returns:
Dictionary containing health status
"""
health = {
"registry_enabled": self.settings.registry.enabled,
"registered": self.registered,
"server_id": self.server_id,
}
if self.settings.registry.enabled:
health["registry_reachable"] = await self.client.health_check()
return health
async def __aenter__(self) -> "RegistryManager":
"""Async context manager entry."""
if self.settings.registry.enabled:
await self.register()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Async context manager exit."""
if self.settings.registry.enabled:
await self.deregister()