#!/usr/bin/env python3
"""
Shared AAP Controller API Client
"""
import logging
import os
from typing import Any, Dict, Optional, Union
from urllib.parse import urljoin
import atexit
import httpx
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv not available, use system environment
# Configure logging
logger = logging.getLogger(__name__)
# Configuration
AAP_BASE_URL = os.getenv("AAP_BASE_URL", "https://your-aap-controller.example.com")
AAP_URL = os.getenv("AAP_URL") # Support for full API URL
AAP_USERNAME = os.getenv("AAP_USERNAME")
AAP_PASSWORD = os.getenv("AAP_PASSWORD")
AAP_TOKEN = os.getenv("AAP_TOKEN")
AAP_API_VERSION = os.getenv("AAP_API_VERSION", "v2")
class AAPClient:
"""HTTP client for AAP Controller API with optimized connection management"""
def __init__(self):
# Use AAP_URL if provided (full API URL), otherwise construct from base URL
if AAP_URL:
self.api_base = AAP_URL.rstrip("/")
else:
self.base_url = AAP_BASE_URL.rstrip("/")
self.api_base = f"{self.base_url}/api/controller/{AAP_API_VERSION}"
# Setup authentication
auth = None
headers = {"Content-Type": "application/json"}
if AAP_TOKEN:
headers["Authorization"] = f"Bearer {AAP_TOKEN}"
elif AAP_USERNAME and AAP_PASSWORD:
auth = (AAP_USERNAME, AAP_PASSWORD)
else:
raise ValueError("Either AAP_TOKEN or both AAP_USERNAME and AAP_PASSWORD must be provided")
# Optimized connection settings
limits = httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
keepalive_expiry=30.0
)
self.client = httpx.Client(
auth=auth,
headers=headers,
verify=False, # Disable SSL verification as requested
timeout=30.0,
limits=limits,
http2=True # Enable HTTP/2 for better performance
)
def close(self):
"""Properly close the HTTP client"""
if hasattr(self, 'client') and self.client:
self.client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Make GET request to AAP API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = self.client.get(url, params=params)
response.raise_for_status()
return response.json()
def get_stdout(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Make GET request to AAP API stdout endpoint that returns text"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
# Add format=txt to get plain text format
if not params:
params = {}
params["format"] = "txt"
response = self.client.get(url, params=params)
response.raise_for_status()
# Check content type to determine how to parse
content_type = response.headers.get("content-type", "").lower()
if "text/plain" in content_type or "text/" in content_type:
# Return as structured response with content
return {
"content": response.text,
"format": "txt",
"status": "success"
}
else:
# Try to parse as JSON for other formats
try:
return response.json()
except:
# Fallback to text content
return {
"content": response.text,
"format": "unknown",
"status": "success"
}
def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""Make POST request to AAP API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
try:
response = self.client.post(url, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# Enhanced error handling for survey creation and other POST operations
error_detail = ""
try:
error_response = e.response.json()
error_detail = f" - Details: {error_response}"
except:
error_detail = f" - Response text: {e.response.text}"
raise Exception(f"HTTP {e.response.status_code} {e.response.reason_phrase} for {url}{error_detail}")
except Exception as e:
raise Exception(f"Request failed for {url}: {str(e)}")
def patch(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make PATCH request to AAP API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
try:
response = self.client.patch(url, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# Enhanced error handling
error_detail = ""
try:
error_response = e.response.json()
error_detail = f" - Details: {error_response}"
except:
error_detail = f" - Response text: {e.response.text}"
raise Exception(f"HTTP {e.response.status_code} {e.response.reason_phrase} for {url}{error_detail}")
except Exception as e:
raise Exception(f"Request failed for {url}: {str(e)}")
def delete(self, endpoint: str) -> Dict[str, Any]:
"""Make DELETE request to AAP API"""
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = self.client.delete(url)
response.raise_for_status()
return response.json() if response.text else {}
# Global client instance - lazy initialization with proper cleanup
_aap_client = None
def get_aap_connector():
"""Get or create the AAP client instance"""
global _aap_client
if _aap_client is None:
# Debug: Check environment variables
logger.info(f"AAP_URL: {os.getenv('AAP_URL')}")
logger.info(f"AAP_TOKEN: {'*' * 10 if os.getenv('AAP_TOKEN') else 'Not set'}")
_aap_client = AAPClient()
# Register cleanup function
atexit.register(_cleanup_aap_client)
return _aap_client
def _cleanup_aap_client():
"""Cleanup function for proper client shutdown"""
global _aap_client
if _aap_client:
_aap_client.close()
_aap_client = None