Skip to main content
Glama

Mutation Clinical Trial Matching MCP

by pickleton89
http_client.py22.5 kB
""" Unified HTTP client supporting both sync and async execution patterns. This module provides a single HTTP client interface that can operate in either synchronous (using requests) or asynchronous (using httpx) mode, eliminating code duplication between sync and async implementations. """ import asyncio import logging import time import warnings from collections.abc import Callable from typing import Any import httpx import requests from clinicaltrials.config import get_global_config from utils.circuit_breaker import async_circuit_breaker, circuit_breaker from utils.metrics import gauge, histogram, increment from utils.retry import async_exponential_backoff_retry, exponential_backoff_retry logger = logging.getLogger(__name__) class HttpResponse: """Unified response wrapper for both requests and httpx responses.""" def __init__(self, response: requests.Response | httpx.Response): self._response = response self._is_async = isinstance(response, httpx.Response) @property def status_code(self) -> int: return self._response.status_code @property def headers(self) -> dict[str, str]: return dict(self._response.headers) @property def text(self) -> str: return self._response.text def json(self) -> dict[str, Any]: return self._response.json() def raise_for_status(self) -> None: self._response.raise_for_status() class UnifiedHttpClient: """HTTP client supporting both sync and async execution.""" def __init__( self, async_mode: bool = False, service_name: str = "generic", base_url: str | None = None, headers: dict[str, str] | None = None, timeout_config: dict[str, int | float] | None = None, retry_config: dict[str, Any] | None = None, circuit_breaker_config: dict[str, Any] | None = None, **kwargs ): """ Initialize unified HTTP client. Args: async_mode: Whether to use async (httpx) or sync (requests) mode service_name: Name for metrics and circuit breaker identification base_url: Base URL for requests headers: Default headers to include with requests timeout_config: Timeout configuration dict retry_config: Retry configuration dict circuit_breaker_config: Circuit breaker configuration dict **kwargs: Additional configuration passed to underlying client """ self.async_mode = async_mode self.service_name = service_name self.base_url = base_url # Load global configuration try: self.config = get_global_config() except ValueError as e: logger.warning(f"Failed to load global config: {e}. Using defaults.") self.config = None # Set up headers self.default_headers = self._setup_headers(headers) # Set up timeout configuration self.timeout_config = self._setup_timeout_config(timeout_config) # Set up retry configuration self.retry_config = self._setup_retry_config(retry_config) # Set up circuit breaker configuration self.circuit_breaker_config = self._setup_circuit_breaker_config(circuit_breaker_config) # Initialize the underlying client self._client = None self._session = None self._setup_client(**kwargs) def _setup_headers(self, headers: dict[str, str] | None) -> dict[str, str]: """Set up default headers with config-based fallbacks.""" default_headers = { "Accept": "application/json", "User-Agent": getattr(self.config, 'user_agent', 'UnifiedHttpClient/1.0') } if headers: default_headers.update(headers) return default_headers def _setup_timeout_config(self, timeout_config: dict[str, int | float] | None) -> dict[str, int | float]: """Set up timeout configuration with config-based defaults.""" if timeout_config: return timeout_config if self.async_mode: return { 'connect': getattr(self.config, 'http_connect_timeout', 5.0), 'read': getattr(self.config, 'http_read_timeout', 30.0), 'write': getattr(self.config, 'http_write_timeout', 10.0), 'pool': getattr(self.config, 'http_pool_timeout', 5.0), } else: return { 'timeout': getattr(self.config, 'clinicaltrials_timeout', 10.0) } def _setup_retry_config(self, retry_config: dict[str, Any] | None) -> dict[str, Any]: """Set up retry configuration with config-based defaults.""" if retry_config: return retry_config return { 'max_retries': getattr(self.config, 'max_retries', 3), 'initial_delay': getattr(self.config, 'retry_initial_delay', 1.0), 'backoff_factor': getattr(self.config, 'retry_backoff_factor', 2.0), 'max_delay': getattr(self.config, 'retry_max_delay', 60.0), 'jitter': getattr(self.config, 'retry_jitter', True), 'retry_on_status_codes': (429, 500, 502, 503, 504), } def _setup_circuit_breaker_config(self, circuit_breaker_config: dict[str, Any] | None) -> dict[str, Any]: """Set up circuit breaker configuration with config-based defaults.""" if circuit_breaker_config: return circuit_breaker_config return { 'name': f"{self.service_name}_http_client", 'failure_threshold': getattr(self.config, 'circuit_breaker_failure_threshold', 5), 'recovery_timeout': getattr(self.config, 'circuit_breaker_recovery_timeout', 60.0), } def _setup_client(self, **kwargs): """Initialize the underlying HTTP client based on mode.""" if self.async_mode: self._setup_async_client(**kwargs) else: self._setup_sync_client(**kwargs) def _setup_async_client(self, **kwargs): """Set up async httpx client.""" # Create timeout object timeout = httpx.Timeout( connect=self.timeout_config['connect'], read=self.timeout_config['read'], write=self.timeout_config['write'], pool=self.timeout_config['pool'], ) # Create limits object limits = httpx.Limits( max_connections=getattr(self.config, 'http_max_connections', 100), max_keepalive_connections=getattr(self.config, 'http_max_keepalive_connections', 20), keepalive_expiry=getattr(self.config, 'http_keepalive_expiry', 60.0), ) # Set up client configuration client_config = { 'base_url': self.base_url, 'headers': self.default_headers, 'timeout': timeout, 'limits': limits, **kwargs } self._client = httpx.AsyncClient(**client_config) def _setup_sync_client(self, **kwargs): """Set up sync requests session.""" self._session = requests.Session() self._session.headers.update(self.default_headers) # Store timeout for use in requests self._sync_timeout = self.timeout_config['timeout'] @property def is_async(self) -> bool: """Check if client is in async mode.""" return self.async_mode def _apply_retry_decorator(self, func: Callable) -> Callable: """Apply appropriate retry decorator based on mode.""" if self.async_mode: return async_exponential_backoff_retry(**self.retry_config)(func) else: return exponential_backoff_retry(**self.retry_config)(func) def _apply_circuit_breaker_decorator(self, func: Callable) -> Callable: """Apply appropriate circuit breaker decorator based on mode.""" config = self.circuit_breaker_config if self.async_mode: return async_circuit_breaker( name=config['name'], failure_threshold=config.get('failure_threshold', 5), recovery_timeout=config.get('recovery_timeout', 60) )(func) else: return circuit_breaker( name=config['name'], failure_threshold=config.get('failure_threshold', 5), recovery_timeout=config.get('recovery_timeout', 60) )(func) def request( self, method: str, url: str, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, data: Any | None = None, **kwargs ) -> HttpResponse: """ Unified request method - sync or async based on mode. This method automatically detects the execution context and routes to the appropriate implementation. """ if self.async_mode: # Check if we're in an async context try: loop = asyncio.get_running_loop() # We're in an async context, but this is the sync method warnings.warn( "Using sync request() method in async context. " "Consider using arequest() for better performance.", RuntimeWarning, stacklevel=2 ) # Run the async version in the current loop return loop.run_until_complete( self.arequest(method, url, headers=headers, params=params, json=json, data=data, **kwargs) ) except RuntimeError: # No event loop running, use sync fallback return self._sync_request_fallback(method, url, headers=headers, params=params, json=json, data=data, **kwargs) else: return self._sync_request(method, url, headers=headers, params=params, json=json, data=data, **kwargs) async def arequest( self, method: str, url: str, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, data: Any | None = None, **kwargs ) -> HttpResponse: """Explicit async request method.""" if not self.async_mode: raise RuntimeError("Cannot use arequest() when async_mode=False") return await self._async_request(method, url, headers=headers, params=params, json=json, data=data, **kwargs) def _sync_request( self, method: str, url: str, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, data: Any | None = None, **kwargs ) -> HttpResponse: """Internal sync request implementation.""" @self._apply_circuit_breaker_decorator @self._apply_retry_decorator def _make_request(): # Merge headers request_headers = self.default_headers.copy() if headers: request_headers.update(headers) # Start timing start_time = time.time() try: # Make the request response = self._session.request( method=method, url=url, headers=request_headers, params=params, json=json, data=data, timeout=self._sync_timeout, **kwargs ) # Record metrics request_duration = time.time() - start_time increment("http_requests_total", tags={ "service": self.service_name, "method": method, "status_code": str(response.status_code) }) histogram("http_request_duration", request_duration, tags={ "service": self.service_name, "method": method }) gauge("http_last_request_duration", request_duration, tags={ "service": self.service_name }) logger.info( f"HTTP {method} request completed", extra={ "action": "http_request_completed", "service": self.service_name, "method": method, "url": url, "status_code": response.status_code, "duration": request_duration } ) return HttpResponse(response) except Exception as e: request_duration = time.time() - start_time increment("http_errors_total", tags={ "service": self.service_name, "method": method, "error_type": type(e).__name__ }) histogram("http_request_duration", request_duration, tags={ "service": self.service_name, "method": method, "error": "true" }) logger.error( f"HTTP {method} request failed", extra={ "action": "http_request_failed", "service": self.service_name, "method": method, "url": url, "error": str(e), "error_type": type(e).__name__, "duration": request_duration } ) raise return _make_request() async def _async_request( self, method: str, url: str, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, data: Any | None = None, **kwargs ) -> HttpResponse: """Internal async request implementation.""" @self._apply_circuit_breaker_decorator @self._apply_retry_decorator async def _make_request(): # Merge headers request_headers = self.default_headers.copy() if headers: request_headers.update(headers) # Start timing start_time = time.time() try: # Make the request response = await self._client.request( method=method, url=url, headers=request_headers, params=params, json=json, data=data, **kwargs ) # Record metrics request_duration = time.time() - start_time increment("http_requests_total", tags={ "service": self.service_name, "method": method, "status_code": str(response.status_code) }) histogram("http_request_duration", request_duration, tags={ "service": self.service_name, "method": method }) gauge("http_last_request_duration", request_duration, tags={ "service": self.service_name }) logger.info( f"HTTP {method} request completed", extra={ "action": "async_http_request_completed", "service": self.service_name, "method": method, "url": url, "status_code": response.status_code, "duration": request_duration } ) return HttpResponse(response) except Exception as e: request_duration = time.time() - start_time increment("http_errors_total", tags={ "service": self.service_name, "method": method, "error_type": type(e).__name__ }) histogram("http_request_duration", request_duration, tags={ "service": self.service_name, "method": method, "error": "true" }) logger.error( f"HTTP {method} request failed", extra={ "action": "async_http_request_failed", "service": self.service_name, "method": method, "url": url, "error": str(e), "error_type": type(e).__name__, "duration": request_duration } ) raise return await _make_request() def _sync_request_fallback( self, method: str, url: str, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, data: Any | None = None, **kwargs ) -> HttpResponse: """Fallback sync request when async client is configured but no event loop exists.""" logger.warning( "Using sync fallback for async-configured client", extra={ "action": "http_sync_fallback", "service": self.service_name, "method": method, "url": url } ) # Temporarily create a sync session for this request with requests.Session() as session: session.headers.update(self.default_headers) start_time = time.time() try: response = session.request( method=method, url=url, headers=headers, params=params, json=json, data=data, timeout=self.timeout_config.get('read', 30.0), **kwargs ) time.time() - start_time increment("http_fallback_requests_total", tags={ "service": self.service_name, "method": method }) return HttpResponse(response) except Exception as e: time.time() - start_time increment("http_fallback_errors_total", tags={ "service": self.service_name, "method": method, "error_type": type(e).__name__ }) raise # Convenience methods for common HTTP verbs def get(self, url: str, **kwargs) -> HttpResponse: """Convenience method for GET requests.""" return self.request("GET", url, **kwargs) def post(self, url: str, **kwargs) -> HttpResponse: """Convenience method for POST requests.""" return self.request("POST", url, **kwargs) def put(self, url: str, **kwargs) -> HttpResponse: """Convenience method for PUT requests.""" return self.request("PUT", url, **kwargs) def delete(self, url: str, **kwargs) -> HttpResponse: """Convenience method for DELETE requests.""" return self.request("DELETE", url, **kwargs) async def aget(self, url: str, **kwargs) -> HttpResponse: """Convenience method for async GET requests.""" return await self.arequest("GET", url, **kwargs) async def apost(self, url: str, **kwargs) -> HttpResponse: """Convenience method for async POST requests.""" return await self.arequest("POST", url, **kwargs) async def aput(self, url: str, **kwargs) -> HttpResponse: """Convenience method for async PUT requests.""" return await self.arequest("PUT", url, **kwargs) async def adelete(self, url: str, **kwargs) -> HttpResponse: """Convenience method for async DELETE requests.""" return await self.arequest("DELETE", url, **kwargs) def close(self): """Close the underlying client/session.""" if self.async_mode and self._client: # For async clients, this needs to be called from an async context asyncio.create_task(self._client.aclose()) elif self._session: self._session.close() async def aclose(self): """Async close method.""" if self.async_mode and self._client: await self._client.aclose() elif self._session: self._session.close() def __enter__(self): """Context manager support for sync mode.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager cleanup for sync mode.""" self.close() async def __aenter__(self): """Async context manager support.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager cleanup.""" await self.aclose() # Factory functions for common service types def create_clinicaltrials_client(async_mode: bool = False) -> UnifiedHttpClient: """Create a pre-configured client for ClinicalTrials.gov API.""" return UnifiedHttpClient( async_mode=async_mode, service_name="clinicaltrials", base_url="https://clinicaltrials.gov/api/", headers={ "Accept": "application/json" } ) def create_anthropic_client(async_mode: bool = False, api_key: str | None = None) -> UnifiedHttpClient: """Create a pre-configured client for Anthropic API.""" headers = { "content-type": "application/json", "anthropic-version": "2023-06-01" } if api_key: headers["x-api-key"] = api_key return UnifiedHttpClient( async_mode=async_mode, service_name="anthropic", base_url="https://api.anthropic.com/", headers=headers )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pickleton89/mutation-clinical-trial-matching-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server