http_client.py•7.08 kB
"""
HTTP-based registry client implementation with retry logic.
"""
from typing import Any, Dict, Optional
import httpx
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from ..core.logger import get_logger
from ..core.settings import AuthType, RegistrySettings
from .base import RegistryClient
class HTTPRegistryClient(RegistryClient):
"""
HTTP-based registry client with retry logic and authentication support.
"""
def __init__(self, settings: RegistrySettings):
"""
Initialize the HTTP registry client.
Args:
settings: Registry configuration settings
"""
self.settings = settings
self.logger = get_logger(__name__, registry_url=settings.url)
self.base_url = settings.url.rstrip("/")
# Create HTTP client
self.client = httpx.AsyncClient(
timeout=settings.connection.timeout,
headers=self._build_headers(),
)
def _build_headers(self) -> Dict[str, str]:
"""
Build HTTP headers including authentication.
Returns:
Dictionary of HTTP headers
"""
headers = {
"Content-Type": "application/json",
"User-Agent": "mcp-template/0.1.0",
}
# Add authentication headers
if self.settings.auth.type == AuthType.API_KEY and self.settings.auth.api_key:
headers["X-API-Key"] = self.settings.auth.api_key
elif self.settings.auth.type == AuthType.BEARER and self.settings.auth.api_key:
headers["Authorization"] = f"Bearer {self.settings.auth.api_key}"
return headers
def _create_retry_decorator(self) -> Any:
"""
Create retry decorator based on configuration.
Returns:
Tenacity retry decorator
"""
return retry(
stop=stop_after_attempt(self.settings.connection.retry.max_attempts),
wait=wait_exponential(
multiplier=self.settings.connection.retry.initial_delay,
max=self.settings.connection.retry.max_delay,
exp_base=self.settings.connection.retry.exponential_base,
),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
reraise=True,
)
async def register(self, server_info: Dict[str, Any]) -> Dict[str, Any]:
"""
Register the MCP server with the registry.
Args:
server_info: Server information to register
Returns:
Registration response from registry including server_id
Raises:
httpx.HTTPError: If registration fails
"""
self.logger.info("Registering server with registry", server_info=server_info)
@self._create_retry_decorator()
async def _register() -> Dict[str, Any]:
response = await self.client.post(
f"{self.base_url}/servers",
json=server_info,
)
response.raise_for_status()
return response.json()
try:
result = await _register()
self.logger.info(
"Server registered successfully", server_id=result.get("server_id")
)
return result
except Exception as e:
self.logger.error("Failed to register server", error=str(e))
raise
async def deregister(self, server_id: str) -> bool:
"""
Deregister the MCP server from the registry.
Args:
server_id: Unique server identifier
Returns:
True if deregistration was successful
"""
self.logger.info("Deregistering server", server_id=server_id)
@self._create_retry_decorator()
async def _deregister() -> httpx.Response:
response = await self.client.delete(f"{self.base_url}/servers/{server_id}")
response.raise_for_status()
return response
try:
await _deregister()
self.logger.info("Server deregistered successfully", server_id=server_id)
return True
except Exception as e:
self.logger.error("Failed to deregister server", error=str(e), server_id=server_id)
return False
async def heartbeat(self, server_id: str) -> bool:
"""
Send heartbeat to maintain registration.
Args:
server_id: Unique server identifier
Returns:
True if heartbeat was successful
"""
self.logger.debug("Sending heartbeat", server_id=server_id)
@self._create_retry_decorator()
async def _heartbeat() -> httpx.Response:
response = await self.client.post(
f"{self.base_url}/servers/{server_id}/heartbeat"
)
response.raise_for_status()
return response
try:
await _heartbeat()
self.logger.debug("Heartbeat sent successfully", server_id=server_id)
return True
except Exception as e:
self.logger.warning("Heartbeat failed", error=str(e), server_id=server_id)
return False
async def update_metadata(self, server_id: str, metadata: Dict[str, Any]) -> bool:
"""
Update server metadata in the registry.
Args:
server_id: Unique server identifier
metadata: Updated metadata
Returns:
True if update was successful
"""
self.logger.info("Updating server metadata", server_id=server_id)
@self._create_retry_decorator()
async def _update() -> httpx.Response:
response = await self.client.patch(
f"{self.base_url}/servers/{server_id}",
json={"metadata": metadata},
)
response.raise_for_status()
return response
try:
await _update()
self.logger.info("Metadata updated successfully", server_id=server_id)
return True
except Exception as e:
self.logger.error("Failed to update metadata", error=str(e), server_id=server_id)
return False
async def health_check(self) -> bool:
"""
Check if registry is accessible.
Returns:
True if registry is healthy
"""
try:
response = await self.client.get(f"{self.base_url}/health")
return response.status_code == 200
except Exception as e:
self.logger.warning("Registry health check failed", error=str(e))
return False
async def close(self) -> None:
"""Close the HTTP client."""
await self.client.aclose()
async def __aenter__(self) -> "HTTPRegistryClient":
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Async context manager exit."""
await self.close()