Skip to main content
Glama

OpenAPI Lambda MCP Server

by ingeno
http_client.py10 kB
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """HTTP client utilities for the OpenAPI MCP Server. This module provides enhanced HTTP client functionality with retry logic and other improvements. It can use different backends based on configuration. """ import asyncio import httpx from awslabs.openapi_mcp_server import logger from awslabs.openapi_mcp_server.utils.config import ( HTTP_MAX_CONNECTIONS, HTTP_MAX_KEEPALIVE, USE_TENACITY, ) from awslabs.openapi_mcp_server.utils.metrics_provider import api_call_timer from typing import Any, Dict, Optional, Union # Try to import tenacity if enabled TENACITY_AVAILABLE = False tenacity = None if USE_TENACITY: try: import tenacity TENACITY_AVAILABLE = True logger.info('tenacity retry logic enabled') except ImportError: logger.warning('tenacity requested but not installed. Install with: pip install tenacity') class HttpClientFactory: """Factory for creating HTTP clients with enhanced functionality.""" @staticmethod def create_client( base_url: str, headers: Optional[Dict[str, str]] = None, auth: Optional[httpx.Auth] = None, cookies: Optional[Dict[str, str]] = None, timeout: Union[float, httpx.Timeout] = 30.0, follow_redirects: bool = True, max_connections: Optional[int] = None, max_keepalive: Optional[int] = None, ) -> httpx.AsyncClient: """Create an HTTP client with enhanced functionality. Args: base_url: Base URL for the client headers: Optional headers to include in requests auth: Optional authentication to use cookies: Optional cookies to include in requests timeout: Request timeout in seconds follow_redirects: Whether to follow redirects max_connections: Maximum number of connections (defaults to config value) max_keepalive: Maximum number of keepalive connections (defaults to config value) Returns: httpx.AsyncClient: The HTTP client """ # Use configuration values if not explicitly provided max_connections = max_connections if max_connections is not None else HTTP_MAX_CONNECTIONS max_keepalive = max_keepalive if max_keepalive is not None else HTTP_MAX_KEEPALIVE # Log detailed auth information if auth: auth_type = type(auth).__name__ has_session_manager = hasattr(auth, 'session_manager') if auth else False logger.debug(f'Creating HTTP client with auth type: {auth_type}') # For CognitoAuth, verify the session manager and token if has_session_manager and hasattr(auth, 'session_manager'): session_manager = getattr(auth, 'session_manager') is_authenticated = ( session_manager.is_authenticated() if hasattr(session_manager, 'is_authenticated') else False ) logger.debug(f'Auth has session_manager, authenticated: {is_authenticated}') # Try to get and log the token if hasattr(session_manager, 'get_access_token'): token = session_manager.get_access_token() has_token = token is not None logger.debug(f'Session manager has access token: {has_token}') if token: # Mask token for security masked_token = ( token[:10] + '...' + token[-10:] if len(token) > 30 else token ) logger.debug(f'Access token from session manager: {masked_token}') # Add token to default headers if not already there if headers is None: headers = {} # Only add if not already in headers if 'Authorization' not in headers: headers['Authorization'] = f'Bearer {token}' logger.debug('Added Authorization header from session token') # Log the final headers that will be used (safely) if headers: safe_headers = {} for key, value in headers.items(): if key.lower() == 'authorization' and value: if isinstance(value, str) and value.startswith('Bearer '): token_part = value[7:] masked_token = ( token_part[:10] + '...' + token_part[-10:] if len(token_part) > 30 else token_part ) safe_headers[key] = f'Bearer {masked_token}' else: safe_headers[key] = '[MASKED]' else: safe_headers[key] = value logger.debug(f'Creating client with headers: {safe_headers}') # Create client with connection pooling client = httpx.AsyncClient( base_url=base_url, headers=headers, auth=auth, cookies=cookies, timeout=timeout if isinstance(timeout, httpx.Timeout) else httpx.Timeout(timeout), follow_redirects=follow_redirects, limits=httpx.Limits( max_connections=max_connections, max_keepalive_connections=max_keepalive, ), ) logger.info( f'Created HTTP client for {base_url} with max_connections={max_connections}, ' f'max_keepalive={max_keepalive}' ) # Verify client has auth after creation client_has_auth = hasattr(client, 'auth') and client.auth is not None logger.debug(f'Created client has auth: {client_has_auth}') if client_has_auth: logger.debug(f'Client auth type: {type(client.auth).__name__}') return client async def make_request_with_retry( client: httpx.AsyncClient, method: str, url: str, max_retries: int = 3, retry_delay: float = 1.0, **kwargs: Any, ) -> httpx.Response: """Make an HTTP request with retry logic. Args: client: The HTTP client method: HTTP method url: URL to request max_retries: Maximum number of retries retry_delay: Base delay between retries in seconds **kwargs: Additional arguments to pass to the request Returns: httpx.Response: The HTTP response Raises: httpx.HTTPError: If the request fails after all retries """ # Use tenacity if available and enabled if USE_TENACITY and TENACITY_AVAILABLE and tenacity is not None: @tenacity.retry( stop=tenacity.stop_after_attempt(max_retries), wait=tenacity.wait_exponential( multiplier=retry_delay, min=retry_delay, max=retry_delay * 10 ), retry=tenacity.retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError)), before_sleep=lambda retry_state: logger.warning( f'Request failed, retrying ({retry_state.attempt_number}/{max_retries}): {retry_state.outcome.exception() if retry_state.outcome else "Unknown error"}' ), ) @api_call_timer async def _make_request(): response = await client.request(method, url, **kwargs) response.raise_for_status() return response return await _make_request() # Otherwise, use simple retry logic @api_call_timer async def _make_request_simple(): for attempt in range(max_retries): try: response = await client.request(method, url, **kwargs) response.raise_for_status() return response except (httpx.TimeoutException, httpx.ConnectError) as e: if attempt < max_retries - 1: delay = retry_delay * (2**attempt) # Exponential backoff logger.warning(f'Request failed, retrying ({attempt + 1}/{max_retries}): {e}') await asyncio.sleep(delay) else: logger.error(f'Request failed after {max_retries} attempts: {e}') raise except httpx.HTTPStatusError as e: # Don't retry on status errors (4xx, 5xx) logger.error(f'Request failed with status {e.response.status_code}: {e}') raise # This should never be reached raise RuntimeError('Unexpected error in retry logic') return await _make_request_simple() # Simple function for making a single request without retries @api_call_timer async def make_request( client: httpx.AsyncClient, method: str, url: str, **kwargs: Any, ) -> httpx.Response: """Make an HTTP request without retry logic. Args: client: The HTTP client method: HTTP method url: URL to request **kwargs: Additional arguments to pass to the request Returns: httpx.Response: The HTTP response Raises: httpx.HTTPError: If the request fails """ response = await client.request(method, url, **kwargs) response.raise_for_status() return response

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ingeno/mcp-openapi-lambda'

If you have feedback or need assistance with the MCP directory API, please join our Discord server