Skip to main content
Glama
by zachegner
base.py7.88 kB
"""Base EPA API client with retry logic and error handling.""" import asyncio import json import logging from typing import Dict, List, Optional, Any, Union from urllib.parse import urlencode import httpx from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from config import settings logger = logging.getLogger(__name__) class EPAAPIError(Exception): """Base exception for EPA API errors.""" pass class EPATimeoutError(EPAAPIError): """EPA API timeout error.""" pass class EPANetworkError(EPAAPIError): """EPA API network error.""" pass class EPAClient: """Base client for EPA Envirofacts API with retry logic.""" def __init__(self, base_url: Optional[str] = None, timeout: Optional[int] = None): """Initialize EPA client. Args: base_url: EPA API base URL (defaults to settings) timeout: Request timeout in seconds (defaults to settings) """ self.base_url = base_url or settings.epa_api_base_url.rstrip('/') self.timeout = timeout or settings.request_timeout # Create httpx client with timeout self.client = httpx.AsyncClient( timeout=httpx.Timeout(self.timeout), headers={ 'User-Agent': 'epa-envirofacts-mcp/1.0', 'Accept': 'application/json' } ) def _build_query_url( self, table: str, filters: Optional[Dict[str, Any]] = None, joins: Optional[List[str]] = None, sort: Optional[str] = None, limit: int = 100, offset: int = 0, format_type: str = "JSON" ) -> str: """Build EPA API query URL. Args: table: EPA table name (e.g., 'frs.frs_facilities') filters: Dictionary of filters {column: {operator: value}} joins: List of join tables sort: Sort column limit: Maximum results offset: Starting offset format_type: Response format (JSON, CSV, etc.) Returns: Complete EPA API URL Example: filters = { 'state_abbr': {'equals': 'CA'}, 'facility_name': {'contains': 'Chemical'} } """ # Start with base URL and table url_parts = [self.base_url, table] # Add filters if filters: for column, conditions in filters.items(): for operator, value in conditions.items(): url_parts.extend([column, operator, str(value)]) # Add joins if joins: for join_table in joins: url_parts.append('join') url_parts.append(join_table) # Add pagination url_parts.append(f"{offset + 1}:{offset + limit}") # Add sort if sort: url_parts.append(sort) # Add format url_parts.append(format_type) return '/'.join(url_parts) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=8), retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError, httpx.HTTPStatusError)), reraise=True ) async def _execute_query(self, url: str) -> Dict[str, Any]: """Execute EPA API query with retry logic. Args: url: Complete EPA API URL Returns: Parsed JSON response Raises: EPATimeoutError: If request times out EPANetworkError: If network error occurs EPAAPIError: If API returns error """ try: logger.debug(f"Executing EPA API query: {url}") response = await self.client.get(url) response.raise_for_status() # Parse JSON response try: data = response.json() except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON response: {e}") raise EPAAPIError(f"Invalid JSON response: {e}") logger.debug(f"EPA API query successful, returned {len(data)} records") return data except httpx.TimeoutException as e: logger.warning(f"EPA API timeout: {e}") raise EPATimeoutError(f"Request timed out after {self.timeout}s: {e}") except httpx.NetworkError as e: logger.warning(f"EPA API network error: {e}") raise EPANetworkError(f"Network error: {e}") except httpx.HTTPStatusError as e: logger.error(f"EPA API HTTP error {e.response.status_code}: {e}") if e.response.status_code == 429: raise EPAAPIError("Rate limit exceeded - please try again later") elif e.response.status_code >= 500: raise EPAAPIError(f"EPA API server error: {e.response.status_code}") else: raise EPAAPIError(f"EPA API error {e.response.status_code}: {e}") async def query_table( self, table: str, filters: Optional[Dict[str, Any]] = None, joins: Optional[List[str]] = None, sort: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """Query EPA table with filters and pagination. Args: table: EPA table name filters: Dictionary of filters joins: List of join tables sort: Sort column limit: Maximum results offset: Starting offset Returns: List of records from EPA API Raises: EPAAPIError: If query fails """ url = self._build_query_url(table, filters, joins, sort, limit, offset) try: data = await self._execute_query(url) # EPA API returns a list of records if isinstance(data, list): return data elif isinstance(data, dict) and 'results' in data: return data['results'] else: logger.warning(f"Unexpected EPA API response format: {type(data)}") return [] except Exception as e: logger.error(f"Failed to query EPA table {table}: {e}") raise EPAAPIError(f"Query failed: {e}") async def health_check(self) -> Dict[str, Any]: """Check EPA API connectivity. Returns: Health status information """ try: # Simple test query to FRS facility_site table test_url = self._build_query_url('frs.frs_facility_site', limit=1) await self._execute_query(test_url) return { "status": "healthy", "api_reachable": True, "base_url": self.base_url, "timeout": self.timeout } except Exception as e: logger.error(f"EPA API health check failed: {e}") return { "status": "unhealthy", "api_reachable": False, "error": str(e), "base_url": self.base_url, "timeout": self.timeout } async def close(self): """Close the HTTP client.""" await self.client.aclose() async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zachegner/envirofacts-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server