api_client.py•9.97 kB
"""HTTP client for B4B API with retry logic and connection pooling."""
from typing import Any
import httpx
import structlog
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from ..config import get_config
from ..utils.errors import MCPAuthenticationError, MCPAuthorizationError
logger = structlog.get_logger(__name__)
class B4BAPIClient:
"""
Async HTTP client for B4B API with retry logic and connection pooling.
This client implements:
- Singleton pattern for connection reuse
- Automatic retry with exponential backoff
- Connection pooling for performance
- Structured logging for observability
- Proper error handling and translation
"""
def __init__(self) -> None:
"""Initialize the API client with configuration."""
self.config = get_config()
self._client: httpx.AsyncClient | None = None
self._log = logger.bind(component="api_client")
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the HTTP client instance."""
if self._client is None:
self._client = httpx.AsyncClient(
base_url=self.config.api_base_url,
timeout=httpx.Timeout(
timeout=self.config.api_timeout,
connect=self.config.api_connect_timeout,
),
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
),
follow_redirects=True,
)
self._log.info(
"http_client_initialized",
base_url=self.config.api_base_url,
timeout=self.config.api_timeout,
)
return self._client
async def close(self) -> None:
"""Close the HTTP client and cleanup resources."""
if self._client is not None:
await self._client.aclose()
self._client = None
self._log.info("http_client_closed")
def _handle_response_errors(self, response: httpx.Response) -> None:
"""
Handle HTTP error responses and translate to appropriate exceptions.
Args:
response: HTTP response object
Raises:
MCPAuthenticationError: For 401 responses
MCPAuthorizationError: For 403 responses
httpx.HTTPStatusError: For other error status codes
"""
if response.status_code == 401:
self._log.warning("authentication_failed", url=str(response.url))
raise MCPAuthenticationError(
"Authentication failed. Your token may be invalid or expired."
)
elif response.status_code == 403:
self._log.warning("authorization_failed", url=str(response.url))
raise MCPAuthorizationError(
"Access denied. You do not have permission to access this resource."
)
# Let httpx handle other status codes
response.raise_for_status()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
reraise=True,
)
async def get(
self,
path: str,
*,
token: str,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Send a GET request with automatic retry.
Args:
path: API endpoint path (relative to base URL)
token: JWT authentication token
params: Query parameters
Returns:
JSON response as dictionary
Raises:
MCPAuthenticationError: If authentication fails
MCPAuthorizationError: If authorization fails
httpx.HTTPError: For other HTTP errors
"""
client = await self._get_client()
log = self._log.bind(method="GET", path=path, has_params=params is not None)
log.debug("api_request_started")
try:
response = await client.get(
path,
headers={"Authorization": f"Bearer {token}"},
params=params,
)
self._handle_response_errors(response)
data = response.json()
log.info("api_request_succeeded", status_code=response.status_code)
return data
except (httpx.TimeoutException, httpx.NetworkError) as e:
log.warning("api_request_retrying", error=str(e))
raise
except Exception as e:
log.error("api_request_failed", error=str(e), error_type=type(e).__name__)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
reraise=True,
)
async def post(
self,
path: str,
*,
token: str,
json: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Send a POST request with automatic retry.
Args:
path: API endpoint path (relative to base URL)
token: JWT authentication token
json: JSON body data
Returns:
JSON response as dictionary
Raises:
MCPAuthenticationError: If authentication fails
MCPAuthorizationError: If authorization fails
httpx.HTTPError: For other HTTP errors
"""
client = await self._get_client()
log = self._log.bind(method="POST", path=path, has_body=json is not None)
log.debug("api_request_started")
try:
response = await client.post(
path,
headers={"Authorization": f"Bearer {token}"},
json=json,
)
self._handle_response_errors(response)
data = response.json()
log.info("api_request_succeeded", status_code=response.status_code)
return data
except (httpx.TimeoutException, httpx.NetworkError) as e:
log.warning("api_request_retrying", error=str(e))
raise
except Exception as e:
log.error("api_request_failed", error=str(e), error_type=type(e).__name__)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
reraise=True,
)
async def put(
self,
path: str,
*,
token: str,
json: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Send a PUT request with automatic retry.
Args:
path: API endpoint path (relative to base URL)
token: JWT authentication token
json: JSON body data
Returns:
JSON response as dictionary
Raises:
MCPAuthenticationError: If authentication fails
MCPAuthorizationError: If authorization fails
httpx.HTTPError: For other HTTP errors
"""
client = await self._get_client()
log = self._log.bind(method="PUT", path=path, has_body=json is not None)
log.debug("api_request_started")
try:
response = await client.put(
path,
headers={"Authorization": f"Bearer {token}"},
json=json,
)
self._handle_response_errors(response)
data = response.json()
log.info("api_request_succeeded", status_code=response.status_code)
return data
except (httpx.TimeoutException, httpx.NetworkError) as e:
log.warning("api_request_retrying", error=str(e))
raise
except Exception as e:
log.error("api_request_failed", error=str(e), error_type=type(e).__name__)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
reraise=True,
)
async def delete(
self,
path: str,
*,
token: str,
) -> dict[str, Any]:
"""
Send a DELETE request with automatic retry.
Args:
path: API endpoint path (relative to base URL)
token: JWT authentication token
Returns:
JSON response as dictionary
Raises:
MCPAuthenticationError: If authentication fails
MCPAuthorizationError: If authorization fails
httpx.HTTPError: For other HTTP errors
"""
client = await self._get_client()
log = self._log.bind(method="DELETE", path=path)
log.debug("api_request_started")
try:
response = await client.delete(
path,
headers={"Authorization": f"Bearer {token}"},
)
self._handle_response_errors(response)
data = response.json()
log.info("api_request_succeeded", status_code=response.status_code)
return data
except (httpx.TimeoutException, httpx.NetworkError) as e:
log.warning("api_request_retrying", error=str(e))
raise
except Exception as e:
log.error("api_request_failed", error=str(e), error_type=type(e).__name__)
raise
# Global singleton instance
_api_client: B4BAPIClient | None = None
def get_api_client() -> B4BAPIClient:
"""
Get the global API client instance (singleton pattern).
Returns:
Shared B4BAPIClient instance
"""
global _api_client
if _api_client is None:
_api_client = B4BAPIClient()
return _api_client