Skip to main content
Glama
aap_connector.py6.75 kB
#!/usr/bin/env python3 """ Shared AAP Controller API Client """ import logging import os from typing import Any, Dict, Optional, Union from urllib.parse import urljoin import atexit import httpx # Load environment variables from .env file try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # dotenv not available, use system environment # Configure logging logger = logging.getLogger(__name__) # Configuration AAP_BASE_URL = os.getenv("AAP_BASE_URL", "https://your-aap-controller.example.com") AAP_URL = os.getenv("AAP_URL") # Support for full API URL AAP_USERNAME = os.getenv("AAP_USERNAME") AAP_PASSWORD = os.getenv("AAP_PASSWORD") AAP_TOKEN = os.getenv("AAP_TOKEN") AAP_API_VERSION = os.getenv("AAP_API_VERSION", "v2") class AAPClient: """HTTP client for AAP Controller API with optimized connection management""" def __init__(self): # Use AAP_URL if provided (full API URL), otherwise construct from base URL if AAP_URL: self.api_base = AAP_URL.rstrip("/") else: self.base_url = AAP_BASE_URL.rstrip("/") self.api_base = f"{self.base_url}/api/controller/{AAP_API_VERSION}" # Setup authentication auth = None headers = {"Content-Type": "application/json"} if AAP_TOKEN: headers["Authorization"] = f"Bearer {AAP_TOKEN}" elif AAP_USERNAME and AAP_PASSWORD: auth = (AAP_USERNAME, AAP_PASSWORD) else: raise ValueError("Either AAP_TOKEN or both AAP_USERNAME and AAP_PASSWORD must be provided") # Optimized connection settings limits = httpx.Limits( max_keepalive_connections=10, max_connections=20, keepalive_expiry=30.0 ) self.client = httpx.Client( auth=auth, headers=headers, verify=False, # Disable SSL verification as requested timeout=30.0, limits=limits, http2=True # Enable HTTP/2 for better performance ) def close(self): """Properly close the HTTP client""" if hasattr(self, 'client') and self.client: self.client.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: """Make GET request to AAP API""" url = f"{self.api_base}/{endpoint.lstrip('/')}" response = self.client.get(url, params=params) response.raise_for_status() return response.json() def get_stdout(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: """Make GET request to AAP API stdout endpoint that returns text""" url = f"{self.api_base}/{endpoint.lstrip('/')}" # Add format=txt to get plain text format if not params: params = {} params["format"] = "txt" response = self.client.get(url, params=params) response.raise_for_status() # Check content type to determine how to parse content_type = response.headers.get("content-type", "").lower() if "text/plain" in content_type or "text/" in content_type: # Return as structured response with content return { "content": response.text, "format": "txt", "status": "success" } else: # Try to parse as JSON for other formats try: return response.json() except: # Fallback to text content return { "content": response.text, "format": "unknown", "status": "success" } def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]: """Make POST request to AAP API""" url = f"{self.api_base}/{endpoint.lstrip('/')}" try: response = self.client.post(url, json=data) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: # Enhanced error handling for survey creation and other POST operations error_detail = "" try: error_response = e.response.json() error_detail = f" - Details: {error_response}" except: error_detail = f" - Response text: {e.response.text}" raise Exception(f"HTTP {e.response.status_code} {e.response.reason_phrase} for {url}{error_detail}") except Exception as e: raise Exception(f"Request failed for {url}: {str(e)}") def patch(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]: """Make PATCH request to AAP API""" url = f"{self.api_base}/{endpoint.lstrip('/')}" try: response = self.client.patch(url, json=data) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: # Enhanced error handling error_detail = "" try: error_response = e.response.json() error_detail = f" - Details: {error_response}" except: error_detail = f" - Response text: {e.response.text}" raise Exception(f"HTTP {e.response.status_code} {e.response.reason_phrase} for {url}{error_detail}") except Exception as e: raise Exception(f"Request failed for {url}: {str(e)}") def delete(self, endpoint: str) -> Dict[str, Any]: """Make DELETE request to AAP API""" url = f"{self.api_base}/{endpoint.lstrip('/')}" response = self.client.delete(url) response.raise_for_status() return response.json() if response.text else {} # Global client instance - lazy initialization with proper cleanup _aap_client = None def get_aap_connector(): """Get or create the AAP client instance""" global _aap_client if _aap_client is None: # Debug: Check environment variables logger.info(f"AAP_URL: {os.getenv('AAP_URL')}") logger.info(f"AAP_TOKEN: {'*' * 10 if os.getenv('AAP_TOKEN') else 'Not set'}") _aap_client = AAPClient() # Register cleanup function atexit.register(_cleanup_aap_client) return _aap_client def _cleanup_aap_client(): """Cleanup function for proper client shutdown""" global _aap_client if _aap_client: _aap_client.close() _aap_client = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anshulbehl/aap-mcp-pilot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server