kyc_api_client.py•5.42 kB
"""KYC API client with retry logic and error handling."""
from typing import Any, Dict, Optional
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
from src.utils.logger import get_logger
logger = get_logger(__name__)
class APIError(Exception):
"""Base API error."""
pass
class ValidationError(APIError):
"""Validation error."""
pass
class ServiceUnavailableError(APIError):
"""Service unavailable error."""
pass
class KYCAPIClient:
"""
HTTP client for KYC API with retry logic and error handling.
"""
def __init__(
self,
base_url: str,
api_key: str,
jwt_token: str,
timeout: int = 30,
max_connections: int = 100,
):
"""
Initialize KYC API client.
Args:
base_url: Base URL for KYC API
api_key: API key for authentication
jwt_token: JWT token for authentication
timeout: Request timeout in seconds
max_connections: Maximum number of connections
"""
self.base_url = base_url
self.api_key = api_key
self.jwt_token = jwt_token
self.timeout = timeout
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=20,
)
self.client = httpx.AsyncClient(
base_url=base_url,
timeout=timeout,
limits=limits,
headers={
"Authorization": jwt_token,
"x-api-key": api_key,
"Content-Type": "application/json",
},
)
logger.info("kyc_api_client_initialized", base_url=base_url)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
async def post(
self,
endpoint: str,
data: Dict[str, Any],
headers: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""
Make POST request with retry logic.
Args:
endpoint: API endpoint
data: Request payload
headers: Additional headers (optional)
Returns:
Response data
Raises:
ValidationError: For 422 validation errors
ServiceUnavailableError: For 503 service unavailable
APIError: For other API errors
"""
try:
logger.debug("api_request", method="POST", endpoint=endpoint)
response = await self.client.post(
endpoint, json=data, headers=headers or {}
)
response.raise_for_status()
result = response.json()
logger.debug("api_response", status_code=response.status_code)
return result
except httpx.HTTPStatusError as e:
logger.error(
"api_error",
status_code=e.response.status_code,
endpoint=endpoint,
error=str(e),
)
raise self._map_error(e)
except httpx.RequestError as e:
logger.error("api_request_error", endpoint=endpoint, error=str(e))
raise APIError(f"Request failed: {str(e)}")
async def get(
self, endpoint: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Make GET request.
Args:
endpoint: API endpoint
params: Query parameters (optional)
Returns:
Response data
Raises:
APIError: For API errors
"""
try:
logger.debug("api_request", method="GET", endpoint=endpoint)
response = await self.client.get(endpoint, params=params or {})
response.raise_for_status()
result = response.json()
logger.debug("api_response", status_code=response.status_code)
return result
except httpx.HTTPStatusError as e:
logger.error(
"api_error",
status_code=e.response.status_code,
endpoint=endpoint,
error=str(e),
)
raise self._map_error(e)
except httpx.RequestError as e:
logger.error("api_request_error", endpoint=endpoint, error=str(e))
raise APIError(f"Request failed: {str(e)}")
def _map_error(self, error: httpx.HTTPStatusError) -> Exception:
"""
Map HTTP errors to domain-specific exceptions.
Args:
error: HTTP status error
Returns:
Mapped exception
"""
status_code = error.response.status_code
try:
error_data = error.response.json()
message = error_data.get("message", str(error))
except Exception:
message = str(error)
if status_code == 422:
return ValidationError(message)
elif status_code == 503:
return ServiceUnavailableError("KYC service temporarily unavailable")
elif status_code >= 500:
return ServiceUnavailableError(f"Server error: {message}")
else:
return APIError(f"API error ({status_code}): {message}")
async def close(self) -> None:
"""Close the HTTP client."""
await self.client.aclose()
logger.info("kyc_api_client_closed")