"""UniFi Network Controller API client.
This module provides an async HTTP client for interacting with the UniFi Network
Controller API. It handles authentication, session management, SSL certificate
validation, and provides a clean interface for making API requests.
Key Features:
- Async HTTP client using aiohttp
- Automatic authentication and session management
- Support for self-signed SSL certificates
- Session cookie persistence
- Structured logging with sensitive data redaction
- Retry logic with exponential backoff
- Automatic re-authentication on session expiry
- Response caching with configurable TTL per endpoint type
- Automatic cache invalidation on write operations
"""
import asyncio
import hashlib
import json
import logging
import ssl
import time
from typing import Any, Dict, Optional
from urllib.parse import urljoin
import aiohttp
import certifi
from cachetools import TTLCache
from unifi_mcp.config.loader import UniFiConfig
from unifi_mcp.utils.logging import (
get_logger,
log_with_redaction,
set_correlation_id,
)
from unifi_mcp.utils.retry import (
RetryConfig,
retry_async,
RetryableError,
NonRetryableError,
)
from unifi_mcp.api import (
ControllerType,
ControllerDetector,
EndpointRouter,
ResponseNormalizer,
)
class UniFiClientError(Exception):
"""Base exception for UniFi client errors."""
pass
class AuthenticationError(UniFiClientError):
"""Raised when authentication fails."""
pass
class ConnectionError(UniFiClientError, RetryableError):
"""Raised when connection to controller fails."""
pass
class TimeoutError(UniFiClientError, RetryableError):
"""Raised when request times out."""
pass
class RateLimitError(UniFiClientError, RetryableError):
"""Raised when rate limit is exceeded."""
pass
class SessionExpiredError(UniFiClientError, RetryableError):
"""Raised when session has expired and needs re-authentication."""
pass
class UniFiClient:
"""Async client for UniFi Network Controller API.
This client handles:
- HTTPS connection to UniFi controller
- Authentication with username/password
- Session cookie management
- SSL certificate validation (including self-signed)
- Structured logging with redaction
Example:
>>> config = UniFiConfig(
... host="192.168.1.1",
... port=443,
... username="admin",
... password="secret",
... site="default",
... verify_ssl=False
... )
>>> client = UniFiClient(config)
>>> await client.connect()
>>> devices = await client.get("/api/s/default/stat/device")
>>> await client.close()
"""
def __init__(self, config: UniFiConfig, server_config: Optional[Dict[str, Any]] = None):
"""Initialize the UniFi client.
Args:
config: UniFi controller configuration
server_config: Optional server configuration for caching and performance settings
"""
self.config = config
self.logger = get_logger(__name__)
# Determine authentication method
self.use_api_key = bool(config.api_key and config.api_key.strip())
# Build base URL (different for API key vs session auth)
if self.use_api_key:
# UniFi OS API uses /proxy/network prefix
self.base_url = f"https://{config.host}:{config.port}/proxy/network"
else:
# Traditional UniFi Controller API
self.base_url = f"https://{config.host}:{config.port}"
# Session management
self.session: Optional[aiohttp.ClientSession] = None
self.authenticated = False
self._auth_lock = asyncio.Lock() # Prevent concurrent re-authentication
# SSL context
self.ssl_context = self._create_ssl_context()
# Performance configuration
self._setup_performance_config(server_config)
# Retry configuration
self.retry_config = RetryConfig(
max_attempts=3,
backoff_factor=2.0,
max_backoff=30,
initial_backoff=1.0
)
# Cache configuration
self._setup_cache(server_config)
# Concurrent request limiting
self._request_semaphore = asyncio.Semaphore(self.max_concurrent_requests)
# API abstraction layer components (initialized after connection)
self._controller_detector: Optional[ControllerDetector] = None
self._endpoint_router: Optional[EndpointRouter] = None
self._response_normalizer: Optional[ResponseNormalizer] = None
auth_method = "API Key" if self.use_api_key else "Username/Password"
self.logger.info(
"UniFi client initialized",
extra={
"host": config.host,
"port": config.port,
"site": config.site,
"verify_ssl": config.verify_ssl,
"auth_method": auth_method,
"cache_enabled": self.cache_enabled,
"connection_timeout": self.connection_timeout,
"request_timeout": self.request_timeout,
"max_concurrent_requests": self.max_concurrent_requests,
}
)
@property
def controller_type(self) -> ControllerType:
"""Get the detected controller type.
Returns:
ControllerType indicating whether this is UniFi OS or traditional controller.
Returns UNKNOWN if detection hasn't been performed yet.
"""
if self._controller_detector is None:
return ControllerType.UNKNOWN
cached = self._controller_detector.get_cached_type()
return cached if cached is not None else ControllerType.UNKNOWN
def _setup_performance_config(self, server_config: Optional[Dict[str, Any]] = None) -> None:
"""Set up performance configuration.
Args:
server_config: Optional server configuration for performance settings
"""
# Get performance config
perf_config = (server_config or {}).get("performance", {})
# Connection and read timeouts (in seconds)
self.connection_timeout = perf_config.get("connection_timeout", 10)
self.request_timeout = perf_config.get("request_timeout", 30)
# Maximum concurrent requests
self.max_concurrent_requests = perf_config.get("max_concurrent_requests", 10)
# Connection pooling settings
self.connection_limit = perf_config.get("connection_limit", 100)
self.connection_limit_per_host = perf_config.get("connection_limit_per_host", 10)
self.logger.debug(
"Performance configuration initialized",
extra={
"connection_timeout": self.connection_timeout,
"request_timeout": self.request_timeout,
"max_concurrent_requests": self.max_concurrent_requests,
"connection_limit": self.connection_limit,
"connection_limit_per_host": self.connection_limit_per_host,
}
)
def _setup_cache(self, server_config: Optional[Dict[str, Any]] = None) -> None:
"""Set up response caching with TTL.
Args:
server_config: Optional server configuration for cache settings
"""
# Get performance config
perf_config = (server_config or {}).get("performance", {})
# Default cache TTL
self.default_cache_ttl = perf_config.get("cache_ttl", 30)
# Endpoint-specific TTL configuration
self.cache_ttl_by_endpoint = perf_config.get("cache_ttl_by_endpoint", {
"devices": 30,
"clients": 30,
"networks": 60,
"wlans": 60,
"firewall": 60,
"routes": 60,
"port_forwards": 60,
"stats": 10,
"alerts": 10,
"health": 10,
})
# Create cache (maxsize=100 entries, TTL will be per-entry)
# We use the longest TTL as the default, individual entries can have shorter TTLs
max_ttl = max(self.cache_ttl_by_endpoint.values())
self.cache: TTLCache = TTLCache(maxsize=100, ttl=max_ttl)
self.cache_enabled = True
self.logger.debug(
"Cache initialized",
extra={
"default_ttl": self.default_cache_ttl,
"max_size": 100,
"endpoint_ttls": self.cache_ttl_by_endpoint,
}
)
def _get_cache_key(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> str:
"""Generate cache key from endpoint and parameters.
Args:
endpoint: API endpoint
params: Optional query parameters
Returns:
Cache key string
"""
# Create a deterministic key from endpoint and params
key_parts = [endpoint]
if params:
# Sort params for consistent key generation
sorted_params = sorted(params.items())
params_str = json.dumps(sorted_params, sort_keys=True)
key_parts.append(params_str)
# Create hash for shorter keys
key_string = "|".join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
def _get_endpoint_type(self, endpoint: str) -> Optional[str]:
"""Determine endpoint type for TTL lookup.
Args:
endpoint: API endpoint
Returns:
Endpoint type string or None if not categorized
"""
# Map endpoint patterns to types
endpoint_lower = endpoint.lower()
if "/stat/device" in endpoint_lower:
return "devices"
elif "/stat/sta" in endpoint_lower or "/stat/client" in endpoint_lower:
return "clients"
elif "/rest/networkconf" in endpoint_lower or "/list/networkconf" in endpoint_lower:
return "networks"
elif "/rest/wlanconf" in endpoint_lower or "/list/wlanconf" in endpoint_lower:
return "wlans"
elif "/rest/firewallrule" in endpoint_lower or "/list/firewallrule" in endpoint_lower:
return "firewall"
elif "/rest/routing" in endpoint_lower or "/list/routing" in endpoint_lower:
return "routes"
elif "/rest/portforward" in endpoint_lower or "/list/portforward" in endpoint_lower:
return "port_forwards"
elif "/stat/" in endpoint_lower and ("health" in endpoint_lower or "sysinfo" in endpoint_lower):
return "health"
elif "/stat/" in endpoint_lower:
return "stats"
elif "/list/alarm" in endpoint_lower or "/list/event" in endpoint_lower:
return "alerts"
return None
def _get_cache_ttl(self, endpoint: str) -> int:
"""Get cache TTL for a specific endpoint.
Args:
endpoint: API endpoint
Returns:
TTL in seconds
"""
endpoint_type = self._get_endpoint_type(endpoint)
if endpoint_type and endpoint_type in self.cache_ttl_by_endpoint:
return self.cache_ttl_by_endpoint[endpoint_type]
return self.default_cache_ttl
def invalidate_cache(self, endpoint_pattern: Optional[str] = None) -> None:
"""Invalidate cache entries.
Args:
endpoint_pattern: Optional pattern to match endpoints. If None, clears all cache.
"""
if endpoint_pattern is None:
# Clear entire cache
self.cache.clear()
self.logger.info("Cache cleared (all entries)")
else:
# Clear specific entries (not implemented in TTLCache, would need custom implementation)
# For now, just clear all cache on write operations
self.cache.clear()
self.logger.info(
"Cache invalidated",
extra={"pattern": endpoint_pattern}
)
def _create_ssl_context(self) -> ssl.SSLContext:
"""Create SSL context based on configuration.
Returns:
Configured SSL context
"""
if self.config.verify_ssl:
# Use default SSL context with certificate verification
ssl_context = ssl.create_default_context(cafile=certifi.where())
self.logger.debug("SSL certificate verification enabled")
else:
# Create context that accepts self-signed certificates
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.logger.warning(
"SSL certificate verification disabled - accepting self-signed certificates"
)
return ssl_context
async def connect(self) -> None:
"""Initialize HTTP session and authenticate with the controller.
This method:
1. Creates an aiohttp ClientSession
2. Authenticates with the UniFi controller (if using username/password)
3. Stores session cookies for subsequent requests (session auth only)
For API key authentication, no login is required - the key is sent
with each request via the X-API-KEY header.
Raises:
ConnectionError: If connection to controller fails
AuthenticationError: If authentication fails
"""
# Generate correlation ID for this connection attempt
correlation_id = set_correlation_id()
try:
# Create aiohttp session with connection pooling and keep-alive
connector = aiohttp.TCPConnector(
ssl=self.ssl_context,
limit=self.connection_limit, # Total connection pool size
limit_per_host=self.connection_limit_per_host, # Connections per host
ttl_dns_cache=300, # DNS cache TTL (5 minutes)
force_close=False, # Enable keep-alive
enable_cleanup_closed=True, # Clean up closed connections
)
# Set up default headers for API key authentication
headers = {}
if self.use_api_key:
headers["X-API-KEY"] = self.config.api_key
headers["Accept"] = "application/json"
# Create timeout configuration
timeout = aiohttp.ClientTimeout(
total=None, # No total timeout (handled per-request)
connect=self.connection_timeout,
sock_read=self.request_timeout,
)
self.session = aiohttp.ClientSession(
connector=connector,
cookie_jar=aiohttp.CookieJar(),
headers=headers if headers else None,
timeout=timeout,
)
self.logger.info("HTTP session created", extra={"correlation_id": correlation_id})
# Authenticate (only needed for username/password)
if self.use_api_key:
# API key auth doesn't require login
self.authenticated = True
self.logger.info(
"Using API key authentication - no login required",
extra={"correlation_id": correlation_id}
)
else:
# Session-based auth requires login
await self._authenticate()
# Initialize API abstraction layer components
self._controller_detector = ControllerDetector(self)
self._endpoint_router = EndpointRouter()
self._response_normalizer = ResponseNormalizer()
# Detect controller type after authentication
detected_type = await self._controller_detector.detect()
self.logger.info(
"Successfully connected to UniFi controller",
extra={
"correlation_id": correlation_id,
"controller_type": detected_type.value,
}
)
except aiohttp.ClientError as e:
self.logger.error(
f"Failed to connect to UniFi controller: {e}",
extra={"correlation_id": correlation_id}
)
raise ConnectionError(f"Failed to connect to {self.base_url}: {e}")
except Exception as e:
self.logger.error(
f"Unexpected error during connection: {e}",
extra={"correlation_id": correlation_id}
)
raise
async def _authenticate(self) -> None:
"""Authenticate with the UniFi controller.
This method calls the login endpoint with credentials and stores
the session cookie for subsequent requests. Uses a lock to prevent
concurrent authentication attempts.
Raises:
AuthenticationError: If authentication fails
"""
async with self._auth_lock:
# Check if another coroutine already authenticated
if self.authenticated:
return
if not self.session:
raise UniFiClientError("Session not initialized. Call connect() first.")
login_url = urljoin(self.base_url, "/api/login")
# Prepare login payload
payload = {
"username": self.config.username,
"password": self.config.password,
}
# Log authentication attempt (with redaction)
log_with_redaction(
self.logger,
"info",
"Attempting authentication",
{"username": self.config.username, "password": self.config.password}
)
try:
async with self.session.post(login_url, json=payload) as response:
# Check response status
if response.status == 200:
# Authentication successful
self.authenticated = True
self.logger.info(
"Authentication successful",
extra={"username": self.config.username}
)
elif response.status == 400:
# Bad request - likely invalid credentials
error_text = await response.text()
self.logger.error(
"Authentication failed - invalid credentials",
extra={"status": response.status, "response": error_text}
)
raise AuthenticationError(
"Invalid username or password. "
"Please verify UNIFI_USERNAME and UNIFI_PASSWORD environment variables."
)
elif response.status == 401:
# Unauthorized
self.logger.error(
"Authentication failed - unauthorized",
extra={"status": response.status}
)
raise AuthenticationError(
"Authentication failed. Please check your credentials."
)
else:
# Other error
error_text = await response.text()
self.logger.error(
f"Authentication failed with status {response.status}",
extra={"status": response.status, "response": error_text}
)
raise AuthenticationError(
f"Authentication failed with status {response.status}: {error_text}"
)
except aiohttp.ClientError as e:
self.logger.error(f"Network error during authentication: {e}")
raise AuthenticationError(f"Network error during authentication: {e}")
async def _ensure_authenticated(self) -> None:
"""Ensure the client is authenticated, re-authenticating if necessary.
This method is called before each API request to handle session expiry.
For API key authentication, this is a no-op.
Raises:
AuthenticationError: If re-authentication fails
"""
if self.use_api_key:
# API key auth doesn't expire
return
if not self.authenticated:
self.logger.info("Session expired, re-authenticating...")
await self._authenticate()
async def close(self) -> None:
"""Close the HTTP session and clean up resources.
This should be called when the client is no longer needed.
"""
if self.session:
await self.session.close()
self.session = None
self.authenticated = False
self.logger.info("HTTP session closed")
async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
def _build_url(self, endpoint: str) -> str:
"""Build full URL from endpoint.
Args:
endpoint: API endpoint (e.g., "/api/s/{site}/stat/device")
Returns:
Full URL
Note:
For API key authentication, the base URL already includes /proxy/network,
so endpoints should start with /api/... and will become:
https://host/proxy/network/api/s/default/stat/device
"""
# Replace {site} placeholder with actual site
endpoint = endpoint.replace("{site}", self.config.site)
# Ensure endpoint starts with / for proper joining
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
# Join with base URL
# urljoin replaces the path if endpoint starts with /, so we need to append manually
if self.base_url.endswith("/"):
return self.base_url.rstrip("/") + endpoint
else:
return self.base_url + endpoint
async def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None, use_cache: bool = True) -> Dict[str, Any]:
"""Make GET request to UniFi API with caching and retry logic.
This method automatically:
- Checks cache for recent responses
- Ensures authentication is valid
- Retries on transient errors (timeout, connection issues)
- Re-authenticates on session expiry
- Handles rate limiting
- Caches successful responses with endpoint-specific TTL
Args:
endpoint: API endpoint (e.g., "/api/s/{site}/stat/device")
params: Optional query parameters
use_cache: Whether to use caching for this request (default: True)
Returns:
JSON response data
Raises:
UniFiClientError: If request fails after retries
TimeoutError: If request times out after retries
RateLimitError: If rate limit is exceeded
SessionExpiredError: If session expires and re-auth fails
"""
if not self.session:
raise UniFiClientError("Not connected. Call connect() first.")
# Check cache if enabled
if use_cache and self.cache_enabled:
cache_key = self._get_cache_key(endpoint, params)
if cache_key in self.cache:
self.logger.debug(
f"Cache hit for {endpoint}",
extra={"endpoint": endpoint, "cache_key": cache_key}
)
return self.cache[cache_key]
self.logger.debug(
f"Cache miss for {endpoint}",
extra={"endpoint": endpoint, "cache_key": cache_key}
)
# Wrap the actual request in retry logic
result = await retry_async(
self._get_with_auth,
endpoint,
params,
config=self.retry_config
)
# Cache the result if caching is enabled
if use_cache and self.cache_enabled:
cache_key = self._get_cache_key(endpoint, params)
ttl = self._get_cache_ttl(endpoint)
# Note: TTLCache doesn't support per-item TTL in the standard way
# We're using a global TTL but documenting the intended behavior
# For production, consider using a custom cache implementation
self.cache[cache_key] = result
self.logger.debug(
f"Cached response for {endpoint}",
extra={
"endpoint": endpoint,
"cache_key": cache_key,
"ttl": ttl,
}
)
return result
async def _get_with_auth(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Internal GET request with authentication check.
This method is wrapped by retry logic in the public get() method.
"""
# Ensure we're authenticated before making the request
await self._ensure_authenticated()
url = self._build_url(endpoint)
# Use semaphore to limit concurrent requests
async with self._request_semaphore:
# Start timing
start_time = time.time()
self.logger.debug(
f"GET request to {endpoint}",
extra={"params": params}
)
try:
# Create request-specific timeout
request_timeout = aiohttp.ClientTimeout(
total=self.request_timeout,
connect=self.connection_timeout,
)
async with self.session.get(url, params=params, timeout=request_timeout) as response:
# Handle different status codes
if response.status == 401:
# Session expired - mark as unauthenticated and raise retryable error
self.authenticated = False
self.logger.warning("Session expired (401), will re-authenticate on retry")
raise SessionExpiredError("Session expired, re-authentication required")
elif response.status == 429:
# Rate limit exceeded
retry_after = response.headers.get("Retry-After", "unknown")
self.logger.warning(
f"Rate limit exceeded (429), retry after: {retry_after}",
extra={"endpoint": endpoint, "retry_after": retry_after}
)
raise RateLimitError(f"Rate limit exceeded. Retry after: {retry_after}")
elif response.status >= 500:
# Server error - retryable
error_text = await response.text()
self.logger.warning(
f"Server error ({response.status}), will retry",
extra={"endpoint": endpoint, "status": response.status}
)
raise ConnectionError(f"Server error {response.status}: {error_text}")
# Raise for other error status codes (4xx except 401/429)
response.raise_for_status()
# Parse JSON response
data = await response.json()
# Calculate request duration
duration = time.time() - start_time
self.logger.debug(
f"GET request successful",
extra={
"endpoint": endpoint,
"status": response.status,
"duration_ms": round(duration * 1000, 2),
}
)
# Log slow requests as warnings
if duration > 2.0:
self.logger.warning(
f"Slow GET request detected",
extra={
"endpoint": endpoint,
"duration_ms": round(duration * 1000, 2),
"threshold_ms": 2000,
}
)
return data
except asyncio.TimeoutError as e:
duration = time.time() - start_time
self.logger.warning(
f"Request timeout for {endpoint}",
extra={
"endpoint": endpoint,
"timeout": self.request_timeout,
"duration_ms": round(duration * 1000, 2),
}
)
raise TimeoutError(f"Request to {endpoint} timed out after {self.request_timeout} seconds")
except aiohttp.ClientConnectionError as e:
duration = time.time() - start_time
self.logger.warning(
f"Connection error for {endpoint}: {e}",
extra={
"endpoint": endpoint,
"duration_ms": round(duration * 1000, 2),
}
)
raise ConnectionError(f"Connection error for {endpoint}: {e}")
except aiohttp.ClientError as e:
duration = time.time() - start_time
self.logger.error(
f"GET request failed: {e}",
extra={
"endpoint": endpoint,
"duration_ms": round(duration * 1000, 2),
}
)
raise UniFiClientError(f"GET request to {endpoint} failed: {e}")
async def post(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
invalidate_cache: bool = True
) -> Dict[str, Any]:
"""Make POST request to UniFi API with retry logic and cache invalidation.
This method automatically:
- Ensures authentication is valid
- Retries on transient errors (timeout, connection issues)
- Re-authenticates on session expiry
- Handles rate limiting
- Invalidates cache after successful write operations
Args:
endpoint: API endpoint
data: Optional form data
json: Optional JSON data
invalidate_cache: Whether to invalidate cache after write (default: True)
Returns:
JSON response data
Raises:
UniFiClientError: If request fails after retries
TimeoutError: If request times out after retries
RateLimitError: If rate limit is exceeded
SessionExpiredError: If session expires and re-auth fails
"""
if not self.session:
raise UniFiClientError("Not connected. Call connect() first.")
# Wrap the actual request in retry logic
result = await retry_async(
self._post_with_auth,
endpoint,
data,
json,
config=self.retry_config
)
# Invalidate cache after successful write operation
if invalidate_cache and self.cache_enabled:
self.invalidate_cache(endpoint)
self.logger.info(
"Cache invalidated after write operation",
extra={"endpoint": endpoint}
)
return result
async def _post_with_auth(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Internal POST request with authentication check.
This method is wrapped by retry logic in the public post() method.
"""
# Ensure we're authenticated before making the request
await self._ensure_authenticated()
url = self._build_url(endpoint)
# Use semaphore to limit concurrent requests
async with self._request_semaphore:
# Start timing
start_time = time.time()
# Redact sensitive data in logs
log_data = data or json
log_with_redaction(
self.logger,
"debug",
f"POST request to {endpoint}",
log_data
)
try:
# Create request-specific timeout
request_timeout = aiohttp.ClientTimeout(
total=self.request_timeout,
connect=self.connection_timeout,
)
async with self.session.post(
url,
data=data,
json=json,
timeout=request_timeout
) as response:
# Handle different status codes
if response.status == 401:
# Session expired - mark as unauthenticated and raise retryable error
self.authenticated = False
self.logger.warning("Session expired (401), will re-authenticate on retry")
raise SessionExpiredError("Session expired, re-authentication required")
elif response.status == 429:
# Rate limit exceeded
retry_after = response.headers.get("Retry-After", "unknown")
self.logger.warning(
f"Rate limit exceeded (429), retry after: {retry_after}",
extra={"endpoint": endpoint, "retry_after": retry_after}
)
raise RateLimitError(f"Rate limit exceeded. Retry after: {retry_after}")
elif response.status >= 500:
# Server error - retryable
error_text = await response.text()
self.logger.warning(
f"Server error ({response.status}), will retry",
extra={"endpoint": endpoint, "status": response.status}
)
raise ConnectionError(f"Server error {response.status}: {error_text}")
# Raise for other error status codes (4xx except 401/429)
response.raise_for_status()
# Parse JSON response
response_data = await response.json()
# Calculate request duration
duration = time.time() - start_time
self.logger.debug(
f"POST request successful",
extra={
"endpoint": endpoint,
"status": response.status,
"duration_ms": round(duration * 1000, 2),
}
)
# Log slow requests as warnings
if duration > 2.0:
self.logger.warning(
f"Slow POST request detected",
extra={
"endpoint": endpoint,
"duration_ms": round(duration * 1000, 2),
"threshold_ms": 2000,
}
)
return response_data
except asyncio.TimeoutError as e:
duration = time.time() - start_time
self.logger.warning(
f"Request timeout for {endpoint}",
extra={
"endpoint": endpoint,
"timeout": self.request_timeout,
"duration_ms": round(duration * 1000, 2),
}
)
raise TimeoutError(f"Request to {endpoint} timed out after {self.request_timeout} seconds")
except aiohttp.ClientConnectionError as e:
duration = time.time() - start_time
self.logger.warning(
f"Connection error for {endpoint}: {e}",
extra={
"endpoint": endpoint,
"duration_ms": round(duration * 1000, 2),
}
)
raise ConnectionError(f"Connection error for {endpoint}: {e}")
except aiohttp.ClientError as e:
duration = time.time() - start_time
self.logger.error(
f"POST request failed: {e}",
extra={
"endpoint": endpoint,
"duration_ms": round(duration * 1000, 2),
}
)
raise UniFiClientError(f"POST request to {endpoint} failed: {e}")
async def get_security_data(
self,
feature: str,
normalize: bool = True,
site: Optional[str] = None,
) -> Dict[str, Any]:
"""Get security data with automatic endpoint routing and normalization.
This method provides a high-level interface for retrieving security-related
data (firewall rules, IPS status, traffic routes, port forwards) with
automatic endpoint routing based on the detected controller type.
The method:
1. Uses the detected controller type to select the correct API endpoint
2. Makes the request with automatic fallback to legacy endpoints
3. Optionally normalizes the response to a consistent format
Args:
feature: Security feature to retrieve. Valid values:
- "firewall_rules": Firewall rules/traffic routes
- "ips_status": IPS/threat management status
- "traffic_routes": Traffic routing rules
- "port_forwards": Port forwarding rules
normalize: Whether to normalize the response (default: True)
site: Optional site name (uses client's default if not provided)
Returns:
Dictionary containing:
- data: Response data (normalized if normalize=True)
- endpoint_used: The endpoint that succeeded
- api_version: "v2" or "v1"
- fallback_used: Whether fallback was used
- controller_type: Detected controller type
Raises:
UniFiClientError: If not connected or API layer not initialized
ValueError: If feature is not recognized
Exception: If both primary and fallback endpoints fail
Example:
>>> result = await client.get_security_data("firewall_rules")
>>> for rule in result["data"]:
... print(f"{rule['name']}: {rule['action']}")
"""
if not self.session:
raise UniFiClientError("Not connected. Call connect() first.")
if self._endpoint_router is None:
raise UniFiClientError(
"API layer not initialized. Ensure connect() completed successfully."
)
site = site or self.config.site
controller_type = self.controller_type
self.logger.debug(
f"Getting security data for feature '{feature}'",
extra={
"feature": feature,
"controller_type": controller_type.value,
"normalize": normalize,
"site": site,
}
)
# Use endpoint router to make request with fallback
result = await self._endpoint_router.request_with_fallback(
client=self,
feature=feature,
controller_type=controller_type,
site=site,
)
# Add controller type to result
result["controller_type"] = controller_type.value
# Normalize response if requested
if normalize and self._response_normalizer is not None:
result["data"] = self._normalize_security_data(
feature=feature,
data=result["data"],
controller_type=controller_type,
)
return result
def _normalize_security_data(
self,
feature: str,
data: Any,
controller_type: ControllerType,
) -> Any:
"""Normalize security data based on feature type.
Args:
feature: Security feature name
data: Raw API response data
controller_type: Controller type for normalization
Returns:
Normalized data (list of dicts or dict depending on feature)
"""
if self._response_normalizer is None:
return data
# Extract data from response wrapper if present
raw_data = data
if isinstance(data, dict):
# UniFi API often wraps data in {"data": [...]} or {"meta": {...}, "data": [...]}
if "data" in data:
raw_data = data["data"]
# Normalize based on feature type
if feature == "firewall_rules":
if isinstance(raw_data, list):
normalized = self._response_normalizer.normalize_firewall_rules(
raw_data, controller_type
)
return self._response_normalizer.to_dict_list(normalized)
return raw_data
elif feature == "ips_status":
if isinstance(raw_data, dict):
normalized = self._response_normalizer.normalize_ips_status(
raw_data, controller_type
)
return self._response_normalizer.to_dict(normalized)
elif isinstance(raw_data, list) and len(raw_data) > 0:
# Some endpoints return IPS settings as a list with one item
normalized = self._response_normalizer.normalize_ips_status(
raw_data[0], controller_type
)
return self._response_normalizer.to_dict(normalized)
return raw_data
elif feature == "traffic_routes":
if isinstance(raw_data, list):
normalized = self._response_normalizer.normalize_traffic_routes(
raw_data, controller_type
)
return self._response_normalizer.to_dict_list(normalized)
return raw_data
elif feature == "port_forwards":
if isinstance(raw_data, list):
normalized = self._response_normalizer.normalize_port_forwards(
raw_data, controller_type
)
return self._response_normalizer.to_dict_list(normalized)
return raw_data
else:
# Unknown feature, return as-is
self.logger.warning(
f"Unknown feature '{feature}' for normalization, returning raw data"
)
return raw_data