Skip to main content
Glama

baidu-ai-search

Official
by baidubce
http_client.py8.48 kB
import asyncio import json import logging from typing import Any, Dict, Optional, Union, List, Tuple import aiohttp from aiohttp import ClientTimeout # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class HTTPClient: """ HTTP client with retry mechanism, timeout handling, and error reporting. Supports exponential backoff for retries. """ def __init__( self, base_url: Optional[str] = None, headers: Optional[Dict[str, str]] = None, default_timeout: int = 30, max_retries: int = 3, ): """ Initialize the HTTP client. Args: base_url: Base URL for all requests headers: Default headers to include in all requests default_timeout: Default timeout in seconds max_retries: Maximum number of retry attempts """ self.base_url = base_url self.headers = headers or {} self.default_timeout = default_timeout self.max_retries = max_retries self.session: Optional[aiohttp.ClientSession] = None async def _ensure_session(self) -> None: """Ensure aiohttp session exists and is open""" if self.session is None or self.session.closed: self.session = aiohttp.ClientSession( timeout=ClientTimeout(total=self.default_timeout), headers=self.headers ) async def close(self) -> None: """Close the HTTP session""" if self.session and not self.session.closed: await self.session.close() self.session = None async def request( self, method: str, url: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, data: Optional[str] = None, json_data: Optional[Dict[str, Any]] = None, timeout: Optional[int] = None, max_retries: Optional[int] = None, ) -> Dict[str, Any]: """ Make an HTTP request with retry mechanism. Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) url: URL path (will be joined with base_url if provided) params: Query parameters headers: Additional headers (will be merged with default headers) data: Request body as string json_data: Request body as JSON (will be serialized) timeout: Request timeout in seconds (overrides default_timeout) max_retries: Maximum retry attempts (overrides default max_retries) Returns: Dictionary with response data or error information """ # Use provided values or fall back to defaults timeout = timeout or self.default_timeout max_retries = max_retries if max_retries is not None else self.max_retries # Merge headers merged_headers = dict(self.headers) if headers: merged_headers.update(headers) # Handle JSON data if json_data: data = json.dumps(json_data) merged_headers['Content-Type'] = 'application/json' # Construct full URL from urllib.parse import urljoin full_url = urljoin(self.base_url, url) if self.base_url else url await self._ensure_session() # Implement retry logic retries = 0 while retries <= max_retries: try: logger.info(f"Making {method} request to {full_url} (Attempt {retries+1}/{max_retries+1})") async with self.session.request( method=method.upper(), url=full_url, params=params, headers=merged_headers, data=data, timeout=ClientTimeout(total=timeout) ) as response: response.raise_for_status() # Handle different response content types content_type = response.headers.get('content-type', '') if 'application/json' in content_type: result = await response.json() else: result = await response.text() return { 'success': True, 'status_code': response.status, 'content_type': content_type, 'data': result } except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError) as e: error_message = str(e) error_body = None if isinstance(e, aiohttp.ClientResponseError): try: error_body = await response.json() except: try: error_body = await response.text() except: error_body = "Could not read response body" error_message = f"{error_message} - {error_body}" retries += 1 if retries <= max_retries: wait_time = 2 ** retries # Exponential backoff logger.warning(f"Request failed: {error_message}. Retrying in {wait_time} seconds...") await asyncio.sleep(wait_time) else: logger.error(f"Request failed after {max_retries+1} attempts: {error_message}") return { 'success': False, 'error': { 'type': 'http_error' if isinstance(e, aiohttp.ClientResponseError) else 'network_error', 'status_code': e.status if isinstance(e, aiohttp.ClientResponseError) else None, 'detail': error_message } } except aiohttp.ClientError as e: logger.error(f"Client error: {str(e)}") return { 'success': False, 'error': { 'type': 'network_error', 'detail': str(e) } } except asyncio.TimeoutError: retries += 1 if retries <= max_retries: wait_time = 2 ** retries # Exponential backoff logger.warning(f"Request timed out after {timeout} seconds. Retrying in {wait_time} seconds...") await asyncio.sleep(wait_time) else: logger.error(f"Request timed out after {max_retries+1} attempts") return { 'success': False, 'error': { 'type': 'timeout_error', 'detail': f"Request timed out after {timeout} seconds" } } except Exception as e: logger.error(f"Unexpected error: {str(e)}") return { 'success': False, 'error': { 'type': 'unexpected_error', 'detail': str(e) } } async def get(self, url: str, **kwargs) -> Dict[str, Any]: """Convenience method for GET requests""" return await self.request("GET", url, **kwargs) async def post(self, url: str, **kwargs) -> Dict[str, Any]: """Convenience method for POST requests""" return await self.request("POST", url, **kwargs) async def put(self, url: str, **kwargs) -> Dict[str, Any]: """Convenience method for PUT requests""" return await self.request("PUT", url, **kwargs) async def patch(self, url: str, **kwargs) -> Dict[str, Any]: """Convenience method for PATCH requests""" return await self.request("PATCH", url, **kwargs) async def delete(self, url: str, **kwargs) -> Dict[str, Any]: """Convenience method for DELETE requests""" return await self.request("DELETE", url, **kwargs)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/baidubce/app-builder'

If you have feedback or need assistance with the MCP directory API, please join our Discord server