#!/usr/bin/env python3
"""
Shared AAP Gateway API Client
"""
import logging
import os
from typing import Any, Dict, Optional
import atexit
import httpx
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv not available, use system environment
# Configure logging
logger = logging.getLogger(__name__)
# Configuration
GATEWAY_BASE_URL = os.getenv("GATEWAY_BASE_URL") or os.getenv("AAP_BASE_URL", "https://your-aap-gateway.example.com")
GATEWAY_URL = os.getenv("GATEWAY_URL") # Support for full API URL
GATEWAY_USERNAME = os.getenv("GATEWAY_USERNAME") or os.getenv("AAP_USERNAME")
GATEWAY_PASSWORD = os.getenv("GATEWAY_PASSWORD") or os.getenv("AAP_PASSWORD")
GATEWAY_TOKEN = os.getenv("GATEWAY_TOKEN") or os.getenv("AAP_TOKEN")
class GatewayClient:
"""HTTP client for AAP Gateway API with optimized connection management"""
def __init__(self):
# Use GATEWAY_URL if provided (full API URL), otherwise construct from base URL
if GATEWAY_URL:
self.api_base = GATEWAY_URL.rstrip("/")
else:
self.base_url = GATEWAY_BASE_URL.rstrip("/")
self.api_base = f"{self.base_url}/api/gateway/v1"
# Setup authentication
auth = None
headers = {"Content-Type": "application/json"}
if GATEWAY_TOKEN:
headers["Authorization"] = f"Bearer {GATEWAY_TOKEN}"
elif GATEWAY_USERNAME and GATEWAY_PASSWORD:
auth = (GATEWAY_USERNAME, GATEWAY_PASSWORD)
else:
raise ValueError("Either GATEWAY_TOKEN or both GATEWAY_USERNAME and GATEWAY_PASSWORD must be provided")
# Optimized connection settings
limits = httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
keepalive_expiry=30.0
)
self.client = httpx.Client(
auth=auth,
headers=headers,
verify=False, # Disable SSL verification as requested
timeout=30.0,
limits=limits,
http2=True # Enable HTTP/2 for better performance
)
def close(self):
"""Properly close the HTTP client"""
if hasattr(self, 'client') and self.client:
self.client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Make GET request to Gateway API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = self.client.get(url, params=params)
response.raise_for_status()
return response.json()
def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""Make POST request to Gateway API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = self.client.post(url, json=data)
response.raise_for_status()
return response.json()
def patch(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make PATCH request to Gateway API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = self.client.patch(url, json=data)
response.raise_for_status()
return response.json()
def delete(self, endpoint: str) -> Dict[str, Any]:
"""Make DELETE request to Gateway API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = self.client.delete(url)
response.raise_for_status()
return response.json() if response.text else {}
# Global client instance - lazy initialization with proper cleanup
_gateway_client = None
def get_gateway_connector():
"""Get or create the Gateway client instance"""
global _gateway_client
if _gateway_client is None:
# Debug: Check environment variables
logger.info(f"GATEWAY_URL: {os.getenv('GATEWAY_URL')}")
logger.info(f"GATEWAY_TOKEN: {'*' * 10 if os.getenv('GATEWAY_TOKEN') else 'Not set'}")
_gateway_client = GatewayClient()
# Register cleanup function
atexit.register(_cleanup_gateway_client)
return _gateway_client
def _cleanup_gateway_client():
"""Cleanup function for proper client shutdown"""
global _gateway_client
if _gateway_client:
_gateway_client.close()
_gateway_client = None