unimus_client.py•93.8 kB
#!/usr/bin/env python3
"""
Unimus Client Library
This module provides a REST API client for Unimus network configuration management system.
Based on Unimus API v.2 documentation.
"""
import requests
import base64
import binascii
import re
import time
import hashlib
import json
import functools
import threading
from typing import Any, Dict, List, Optional, Union
import logging
# Cache backend imports
try:
from cachetools import TTLCache
from diskcache import Cache as DiskCache
CACHE_AVAILABLE = True
except ImportError:
TTLCache = None
DiskCache = None
CACHE_AVAILABLE = False
logger = logging.getLogger(__name__)
def generate_cache_key(func, *args, **kwargs) -> str:
"""
Generate a unique cache key for a function call.
Args:
func: The function being called
*args: Positional arguments (excluding 'self')
**kwargs: Keyword arguments
Returns:
MD5 hash string representing the unique cache key
"""
# Sort kwargs for consistent key generation
kwargs_sorted = sorted(kwargs.items())
# Create a representation of the function call
call_representation = {
'func': func.__name__,
'args': args, # Skip 'self' as it's not passed here
'kwargs': kwargs_sorted
}
# Serialize to stable JSON string
try:
serialized_call = json.dumps(call_representation, sort_keys=True, default=str)
except (TypeError, ValueError) as e:
# Fallback for non-serializable objects
logger.warning(f"Failed to serialize cache key data: {e}, using string representation")
serialized_call = str(call_representation)
# Generate MD5 hash for compact and unique key
return hashlib.md5(serialized_call.encode('utf-8')).hexdigest()
def unimus_cache(ttl_category: str = 'default'):
"""
Decorator for caching Unimus API responses.
Args:
ttl_category: Category for TTL lookup in cache configuration
Returns:
Decorated function with caching capability
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
# Check if caching is available and enabled
if not CACHE_AVAILABLE or not hasattr(self, 'cache_enabled') or not self.cache_enabled:
return func(self, *args, **kwargs)
# Get cache backend and TTL
cache_backend = getattr(self, 'cache_backend', None)
if cache_backend is None:
return func(self, *args, **kwargs)
# Get TTL for this operation
ttl_config = getattr(self, 'cache_ttl_config', None)
if ttl_config:
ttl = getattr(ttl_config, ttl_category, ttl_config.default)
else:
ttl = 300 # Default fallback
# Generate cache key
cache_key = generate_cache_key(func, *args, **kwargs)
# Try to get from cache
try:
if hasattr(cache_backend, '__contains__') and hasattr(cache_backend, '__getitem__'): # TTLCache and dict-like objects
if cache_key in cache_backend:
result = cache_backend[cache_key]
# Update cache statistics
if hasattr(self, 'cache_stats'):
self.cache_stats['hits'] += 1
logger.debug(f"Cache hit for {func.__name__} (key: {cache_key[:8]}...)")
return result
elif hasattr(cache_backend, 'get'): # DiskCache
result = cache_backend.get(cache_key)
if result is not None:
# Update cache statistics
if hasattr(self, 'cache_stats'):
self.cache_stats['hits'] += 1
logger.debug(f"Cache hit for {func.__name__} (key: {cache_key[:8]}...)")
return result
except Exception as e:
logger.warning(f"Cache retrieval error for {func.__name__}: {e}")
# Cache miss - execute original function
if hasattr(self, 'cache_stats'):
self.cache_stats['misses'] += 1
logger.debug(f"Cache miss for {func.__name__} (key: {cache_key[:8]}...), executing function")
result = func(self, *args, **kwargs)
# Store result in cache
try:
if hasattr(cache_backend, '__setitem__'): # TTLCache and dict-like objects
cache_backend[cache_key] = result
logger.debug(f"Cached result for {func.__name__} (TTL: {ttl}s)")
elif hasattr(cache_backend, 'set'): # DiskCache
cache_backend.set(cache_key, result, expire=ttl)
logger.debug(f"Cached result for {func.__name__} (TTL: {ttl}s)")
except Exception as e:
logger.warning(f"Cache storage error for {func.__name__}: {e}")
return result
return wrapper
return decorator
class UnimusError(Exception):
"""Base exception for Unimus API errors."""
pass
class UnimusAuthenticationError(UnimusError):
"""Authentication related errors (401)."""
pass
class UnimusNotFoundError(UnimusError):
"""Resource not found errors (404)."""
pass
class UnimusValidationError(UnimusError):
"""Validation errors (400, 422)."""
pass
class UnimusRestClient:
"""
Unimus client implementation using the REST API v.2.
This client provides read-only access to Unimus devices, backups, and related data.
All methods handle pagination transparently where applicable.
"""
def __init__(self, url: str, token: str, verify_ssl: bool = True, timeout: int = 30, config=None):
"""
Initialize the REST API client.
Args:
url: The base URL of the Unimus instance (e.g., 'https://unimus.example.com')
token: API token for authentication
verify_ssl: Whether to verify SSL certificates
timeout: Request timeout in seconds
config: Optional UnimusConfig instance for cache configuration
"""
self.base_url = url.rstrip('/')
self.api_url = f"{self.base_url}/api/v2"
self.token = token
self.verify_ssl = verify_ssl
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json',
})
# Initialize caching
self.config = config
self._setup_cache()
def _setup_cache(self):
"""
Initialize cache backend based on configuration.
"""
# Default cache settings (disabled)
self.cache_enabled = False
self.cache_backend = None
self.cache_ttl_config = None
self.cache_stats = {
'hits': 0,
'misses': 0,
'total_requests': 0
}
# Check if caching is available and configured
if not CACHE_AVAILABLE:
logger.info("Cache libraries not available, caching disabled")
return
if not self.config or not hasattr(self.config, 'cache'):
logger.debug("No cache configuration provided, caching disabled")
return
cache_config = self.config.cache
if not cache_config.enabled:
logger.debug("Cache explicitly disabled in configuration")
return
# Store TTL configuration
self.cache_ttl_config = cache_config.ttl
try:
# Setup cache backend
if cache_config.backend == 'disk':
if DiskCache is None:
logger.error("Disk cache requested but diskcache not available")
return
# Create cache directory if needed
import os
cache_path = cache_config.path or "/tmp/unimus_mcp_cache"
os.makedirs(cache_path, exist_ok=True)
# Initialize DiskCache with size limit
size_limit_bytes = cache_config.size_limit_mb * 1024 * 1024
self.cache_backend = DiskCache(cache_path, size_limit=size_limit_bytes)
logger.info(f"Initialized disk cache at {cache_path} (limit: {cache_config.size_limit_mb}MB)")
else: # memory backend
if TTLCache is None:
logger.error("Memory cache requested but cachetools not available")
return
# Initialize TTLCache with default TTL
# Note: TTLCache uses a global TTL for all items
self.cache_backend = TTLCache(
maxsize=cache_config.max_items,
ttl=cache_config.ttl.default
)
logger.info(f"Initialized memory cache (max items: {cache_config.max_items}, default TTL: {cache_config.ttl.default}s)")
# Enable caching
self.cache_enabled = True
# Initialize statistics tracking
if cache_config.enable_stats:
self.cache_stats.update({
'hits': 0,
'misses': 0,
'total_requests': 0,
'backend': cache_config.backend,
'max_items': cache_config.max_items,
'size_limit_mb': cache_config.size_limit_mb
})
logger.info(f"Cache system initialized: {cache_config.backend} backend, stats: {cache_config.enable_stats}")
except Exception as e:
logger.error(f"Failed to initialize cache: {e}")
self.cache_enabled = False
self.cache_backend = None
def get_cache_stats(self) -> Dict[str, Any]:
"""
Get cache performance statistics.
Returns:
Dictionary with cache statistics
"""
if not self.cache_enabled:
return {'enabled': False, 'message': 'Caching is disabled'}
stats = self.cache_stats.copy()
stats['enabled'] = True
stats['total_requests'] = stats['hits'] + stats['misses']
if stats['total_requests'] > 0:
stats['hit_ratio'] = stats['hits'] / stats['total_requests']
else:
stats['hit_ratio'] = 0.0
# Add backend-specific stats
try:
if hasattr(self.cache_backend, '__len__'):
stats['current_size'] = len(self.cache_backend)
if hasattr(self.cache_backend, 'currsize'): # TTLCache
stats['current_size'] = self.cache_backend.currsize
stats['max_size'] = self.cache_backend.maxsize
elif hasattr(self.cache_backend, 'volume'): # DiskCache
stats['volume'] = self.cache_backend.volume()
stats['size_mb'] = self.cache_backend.volume() / (1024 * 1024)
except Exception as e:
logger.debug(f"Error getting cache backend stats: {e}")
return stats
def clear_cache(self):
"""
Clear all cached data.
"""
if not self.cache_enabled or not self.cache_backend:
return
try:
if hasattr(self.cache_backend, 'clear'):
self.cache_backend.clear()
elif hasattr(self.cache_backend, 'evict'): # DiskCache
self.cache_backend.evict(0) # Evict all items
logger.info("Cache cleared successfully")
# Reset statistics
self.cache_stats.update({
'hits': 0,
'misses': 0,
'total_requests': 0
})
except Exception as e:
logger.error(f"Failed to clear cache: {e}")
def _build_url(self, endpoint: str) -> str:
"""Build the full URL for an API request."""
endpoint = endpoint.strip('/')
return f"{self.api_url}/{endpoint}"
def _handle_response(self, response: requests.Response) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""
Handle API response and raise appropriate exceptions for errors.
Args:
response: The requests Response object
Returns:
Parsed JSON response data
Raises:
UnimusAuthenticationError: For 401 errors
UnimusNotFoundError: For 404 errors
UnimusValidationError: For 400/422 errors
UnimusError: For other API errors
"""
try:
data = response.json()
except ValueError:
# Non-JSON response
response.raise_for_status()
return {}
if response.status_code >= 400:
error_msg = data.get('message', f'HTTP {response.status_code}')
if response.status_code == 401:
raise UnimusAuthenticationError(f"Authentication failed: {error_msg}")
elif response.status_code == 404:
raise UnimusNotFoundError(f"Resource not found: {error_msg}")
elif response.status_code in (400, 422):
raise UnimusValidationError(f"Validation error: {error_msg}")
else:
raise UnimusError(f"API error ({response.status_code}): {error_msg}")
return data
def _decode_backup_content(self, backup: Dict[str, Any]) -> Dict[str, Any]:
"""
Decode backup content from base64 to readable format.
TEXT types become readable strings, BINARY types remain base64.
Args:
backup: Backup object with 'bytes' field containing base64 data
Returns:
Backup object with 'content' field and 'content_type' indicator
Note:
If base64 decoding or UTF-8 decoding fails for TEXT backups,
falls back to returning the original base64 content with
content_type='base64' and logs a warning.
"""
if not backup.get('bytes'):
return backup
backup_copy = backup.copy()
if backup.get('type') == 'TEXT':
try:
decoded = base64.b64decode(backup['bytes']).decode('utf-8')
backup_copy['content'] = decoded
backup_copy['content_type'] = 'text'
except (binascii.Error, UnicodeDecodeError) as e:
logger.warning(f"Failed to decode TEXT backup {backup.get('id', 'unknown')}: {e}")
backup_copy['content'] = backup['bytes'] # Fallback to base64
backup_copy['content_type'] = 'base64'
else: # BINARY type
backup_copy['content'] = backup['bytes']
backup_copy['content_type'] = 'base64'
# Remove original bytes field
backup_copy.pop('bytes', None)
return backup_copy
def _process_backup_list(self, backups: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process a list of backups to decode their content."""
return [self._decode_backup_content(backup) for backup in backups]
def _process_backup_response(self, backup: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Process a single backup response to decode its content."""
if backup is None:
return None
return self._decode_backup_content(backup)
def _get_paginated_data(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Fetch all pages of data transparently.
Args:
endpoint: API endpoint to query
params: Query parameters
Returns:
Complete list of all items across all pages
"""
all_items = []
page = 0
params = params or {}
while True:
current_params = {**params, 'page': page, 'size': 50} # Use max page size for efficiency
response = self.session.get(
self._build_url(endpoint),
params=current_params,
verify=self.verify_ssl,
timeout=self.timeout
)
data = self._handle_response(response)
# Handle single item responses (no pagination)
if 'data' not in data:
return [data] if data else []
items = data['data']
if not isinstance(items, list):
return [items]
all_items.extend(items)
# Check if we have more pages
paginator = data.get('paginator', {})
current_page = paginator.get('page', 0)
total_pages = paginator.get('totalPages', 1)
if current_page >= total_pages - 1: # 0-based indexing
break
page += 1
return all_items
def _get_single_item(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Fetch a single item from the API.
Args:
endpoint: API endpoint to query
params: Query parameters
Returns:
Single item data
"""
response = self.session.get(
self._build_url(endpoint),
params=params,
verify=self.verify_ssl,
timeout=self.timeout
)
data = self._handle_response(response)
# Handle wrapped responses
if 'data' in data:
return data['data']
return data
@unimus_cache(ttl_category='health')
def get_health(self) -> Dict[str, str]:
"""
Get Unimus health status.
Returns:
Health status information containing 'status' field.
Status can be: 'OK', 'LICENSING_UNREACHABLE', or 'ERROR'
"""
return self._get_single_item('health')
@unimus_cache(ttl_category='devices')
def get_devices(self, filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Get all devices with optional filtering.
Automatically handles pagination to return all results.
Args:
filters: Optional filters to apply. Common filters include:
- managed: Boolean, filter by managed state
- vendor: String, filter by vendor name
- type: String, filter by device type
- site_id: Number, filter by site ID
- zone_id: String, filter by zone ID
Returns:
List of device objects
"""
return self._get_paginated_data('devices', filters)
@unimus_cache(ttl_category='enhanced_metadata') # Use enhanced_metadata for enrich_metadata=True, device_metadata for basic
def get_device_by_id(
self,
device_id: int,
include_connections: Optional[bool] = None, # Backward compatibility
include_attributes: Optional[List[str]] = None, # New flexible option
enrich_metadata: bool = False # New Phase 2 feature
) -> Dict[str, Any]:
"""
Get a specific device by its ID with flexible attribute selection and enhanced metadata.
Args:
device_id: The numeric ID of the device
include_connections: Whether to include connection information (backward compatibility)
If None and include_attributes is None, defaults to True
include_attributes: List of specific attributes to include. Options:
- 'schedule' or 's': Include backup schedule information
- 'connections' or 'c': Include connection details (SSH/TELNET)
- None: Use include_connections parameter or default behavior
enrich_metadata: Whether to add calculated metadata fields (Phase 2 feature)
Adds: backupAge, lastSeen, configurationHealth, etc.
Returns:
Device object with detailed information and optional enriched metadata
Raises:
UnimusNotFoundError: If device with given ID doesn't exist
UnimusValidationError: If device_id is invalid
Examples:
# Backward compatibility - include all (schedule + connections)
device = client.get_device_by_id(123)
device = client.get_device_by_id(123, include_connections=True)
# New flexible selection - only schedule
device = client.get_device_by_id(123, include_attributes=['schedule'])
device = client.get_device_by_id(123, include_attributes=['s'])
# New flexible selection - only connections
device = client.get_device_by_id(123, include_attributes=['connections'])
device = client.get_device_by_id(123, include_attributes=['c'])
# New flexible selection - specific combination
device = client.get_device_by_id(123, include_attributes=['schedule', 'connections'])
# Basic device info only (no additional attributes)
device = client.get_device_by_id(123, include_attributes=[])
device = client.get_device_by_id(123, include_connections=False)
# Enhanced metadata with calculated fields (Phase 2)
device = client.get_device_by_id(123, enrich_metadata=True)
# Returns additional fields: backupAge, lastSeen, configurationHealth, etc.
"""
params = {}
# Handle attribute selection with backward compatibility
if include_attributes is not None:
# New flexible attribute selection
if include_attributes: # Non-empty list
attr_parts = []
for attr in include_attributes:
if attr.lower() in ('schedule', 's'):
attr_parts.append('s')
elif attr.lower() in ('connections', 'c'):
attr_parts.append('c')
else:
logger.warning(f"Unknown attribute '{attr}' ignored. Valid options: 'schedule', 's', 'connections', 'c'")
if attr_parts:
# Remove duplicates while preserving order
unique_attrs = []
for attr in attr_parts:
if attr not in unique_attrs:
unique_attrs.append(attr)
params['attr'] = ','.join(unique_attrs)
# Empty list means no additional attributes
else:
# Backward compatibility mode
if include_connections is None:
include_connections = True # Default behavior
if include_connections:
params['attr'] = 's,c' # schedule and connections
device = self._get_single_item(f'devices/{device_id}', params)
# Phase 2: Add enriched metadata if requested
if enrich_metadata:
device = self._enrich_device_metadata(device)
return device
def get_device_by_address(self, address: str, zone_id: Optional[str] = None) -> Dict[str, Any]:
"""
Get a device by its address (IP or hostname).
Args:
address: The device address (IP address or hostname)
zone_id: Optional zone ID to limit search scope
Returns:
Device object
Raises:
UnimusNotFoundError: If device with given address doesn't exist
"""
params = {'attr': 's,c'} # Include schedule and connections
if zone_id:
params['zoneId'] = zone_id
return self._get_single_item(f'devices/findByAddress/{address}', params)
@unimus_cache(ttl_category='backups')
def get_device_backups(self, device_id: int, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get all backups for a specific device.
Backups are returned in descending order by creation time (newest first).
Args:
device_id: The numeric ID of the device
limit: Optional limit on number of results to return
Returns:
List of backup objects for the device, with decoded content
Raises:
UnimusNotFoundError: If device with given ID doesn't exist
"""
if limit:
# Use single request with limit
params = {'size': limit}
response = self.session.get(
self._build_url(f'devices/{device_id}/backups'),
params=params,
verify=self.verify_ssl,
timeout=self.timeout
)
data = self._handle_response(response)
backups = data.get('data', [])
return self._process_backup_list(backups)
else:
# Use paginated approach for all results
backups = self._get_paginated_data(f'devices/{device_id}/backups')
return self._process_backup_list(backups)
def get_device_latest_backup(self, device_id: int) -> Optional[Dict[str, Any]]:
"""
Get the latest backup for a specific device.
Args:
device_id: The numeric ID of the device
Returns:
Latest backup object with decoded content or None if no backups exist
Raises:
UnimusNotFoundError: If device with given ID doesn't exist
"""
try:
backup = self._get_single_item(f'devices/{device_id}/backups/latest')
return self._process_backup_response(backup)
except UnimusNotFoundError:
# Device exists but has no backups
return None
def get_backup_by_id(self, backup_id: int) -> Dict[str, Any]:
"""
Get a specific backup by its ID.
Args:
backup_id: The numeric ID of the backup
Returns:
Backup object with decoded content
Raises:
UnimusNotFoundError: If backup with given ID doesn't exist
UnimusError: For other API errors
"""
backup = self._get_single_item(f'backups/{backup_id}')
return self._process_backup_response(backup)
def get_devices_by_description(self, description: str, exact_match: bool = False, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get devices by description content.
**Performance Warning**: When exact_match=True, this function retrieves ALL devices
that partially match the description from the API and filters them in Python to find
exact matches. This is necessary because the Unimus API only supports partial matching.
In environments with many devices, this can be inefficient and slow.
Args:
description: Description text to search for
exact_match: If True, match description exactly. If False, partial match (default)
**Warning: exact_match=True may be slow with large device inventories**
limit: Optional limit on number of results to return. Recommended when using
exact_match=True to improve performance
Returns:
List of device objects matching the description criteria
Performance Tips:
- Use partial matching (exact_match=False) when possible for better performance
- Always specify a limit parameter when using exact_match=True
- Consider filtering devices by other criteria first (vendor, type, zone)
Examples:
# Fast partial match
devices = client.get_devices_by_description("router", exact_match=False)
# Exact match with performance optimization
devices = client.get_devices_by_description("Main Router", exact_match=True, limit=10)
"""
if exact_match:
# Log performance warning when using exact_match without limit
if limit is None:
logger.warning(
f"Performance warning: get_devices_by_description() called with exact_match=True "
f"but no limit specified. This will fetch ALL devices matching '{description}' "
f"and filter them in Python, which may be slow with large device inventories. "
f"Consider adding a limit parameter for better performance."
)
# For exact match, get all devices and filter manually
# This is needed because the API only supports partial matching
all_devices = self._get_paginated_data(f'devices/findByDescription/{description}')
filtered_devices = [
device for device in all_devices
if device.get('description', '') == description
]
if limit:
return filtered_devices[:limit]
return filtered_devices
else:
# Use API's built-in partial matching
if limit:
# Use single request with limit
params = {'size': limit}
response = self.session.get(
self._build_url(f'devices/findByDescription/{description}'),
params=params,
verify=self.verify_ssl,
timeout=self.timeout
)
data = self._handle_response(response)
return data.get('data', [])
else:
# Use paginated approach for all results
return self._get_paginated_data(f'devices/findByDescription/{description}')
@unimus_cache(ttl_category='backups')
def get_latest_backups(self, device_ids: List[int]) -> List[Dict[str, Any]]:
"""
Get the latest backups for multiple devices.
Args:
device_ids: List of device IDs to get backups for
Returns:
List of objects containing deviceId, address, and backup information with decoded content
Raises:
UnimusValidationError: If device_ids list is empty or contains invalid IDs
"""
if not device_ids:
raise UnimusValidationError("device_ids list cannot be empty")
if not all(isinstance(id, int) and id > 0 for id in device_ids):
raise UnimusValidationError("All device_ids must be positive integers")
# Convert list to comma-separated string as required by API
device_ids_str = ','.join(map(str, device_ids))
params = {'id': device_ids_str}
backup_responses = self._get_paginated_data('devices/backups/latest', params)
# Process each backup response to decode content
processed_responses = []
for response in backup_responses:
processed_response = response.copy()
if 'backup' in response and response['backup']:
processed_response['backup'] = self._decode_backup_content(response['backup'])
processed_responses.append(processed_response)
return processed_responses
def get_backup_diff(self, orig_id: int, rev_id: int) -> Dict[str, Any]:
"""
Get differences between two backup configurations.
Args:
orig_id: ID of the backup that will be considered as original
rev_id: ID of the backup that will be considered as revised
Returns:
Dictionary containing diff information with added/removed/changed lines
Raises:
UnimusValidationError: If backup IDs are invalid
UnimusNotFoundError: If one or both backups don't exist
"""
if not isinstance(orig_id, int) or orig_id <= 0:
raise UnimusValidationError("orig_id must be a positive integer")
if not isinstance(rev_id, int) or rev_id <= 0:
raise UnimusValidationError("rev_id must be a positive integer")
params = {'origId': orig_id, 'revId': rev_id}
response = self.session.get(
self._build_url('backups/diff'),
params=params,
verify=self.verify_ssl,
timeout=self.timeout
)
return self._handle_response(response)
def get_devices_with_changed_backups(self, since: Optional[int] = None, until: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get devices that had backup changes within a specific time range.
Args:
since: Start of time range in seconds (unix timestamp). If None, defaults to 0
until: End of time range in seconds (unix timestamp). If None, defaults to current time
Returns:
List of device objects that had backup changes, each containing:
- id: Device ID
- createTime: Device creation time in seconds
- address: Hostname, IPv4 or IPv6 address
- description: Device description
Raises:
UnimusValidationError: If time range parameters are invalid
Note:
If no backups have been changed for any device in the time range,
returns an empty list with HTTP 200 status.
"""
params = {}
if since is not None:
if not isinstance(since, int) or since < 0:
raise UnimusValidationError("since must be a non-negative integer")
params['since'] = since
if until is not None:
if not isinstance(until, int) or until < 0:
raise UnimusValidationError("until must be a non-negative integer")
params['until'] = until
if since is not None and until is not None and since > until:
raise UnimusValidationError("since must be less than or equal to until")
return self._get_paginated_data('devices/findByChangedBackup', params)
@unimus_cache(ttl_category='schedules')
def get_schedules(self) -> List[Dict[str, Any]]:
"""
Get a list of all schedules in Unimus.
Returns:
List of schedule objects containing schedule information
Raises:
UnimusError: If the request fails
"""
return self._get_paginated_data('schedules')
def get_schedule_by_id(self, schedule_id: int) -> Dict[str, Any]:
"""
Get an individual schedule by ID.
Args:
schedule_id: The schedule ID to retrieve
Returns:
Schedule object containing detailed schedule information
Raises:
UnimusValidationError: If schedule_id is invalid
UnimusNotFoundError: If schedule doesn't exist
"""
if not isinstance(schedule_id, int) or schedule_id <= 0:
raise UnimusValidationError("schedule_id must be a positive integer")
response = self.session.get(
self._build_url(f'schedules/{schedule_id}'),
verify=self.verify_ssl,
timeout=self.timeout
)
return self._handle_response(response)
@unimus_cache(ttl_category='backup_search')
def search_backup_content(
self,
pattern: str,
device_filters: Optional[Dict[str, Any]] = None,
context_lines: int = 2,
limit: Optional[int] = None,
since: Optional[int] = None,
until: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Search through backup configurations for specific patterns.
**Performance Warning**: This function retrieves backup content from devices
and searches through them client-side. For large device inventories, this can
be resource-intensive. Use device_filters and limit parameters to optimize performance.
Args:
pattern: Regular expression pattern to search for
device_filters: Optional filters to limit device scope:
- vendor: String, filter by vendor name (e.g., "Cisco", "MikroTik")
- type: String, filter by device type (e.g., "IOS", "RouterOS")
- managed: Boolean, filter by managed state
- site_id: Number, filter by site ID
- zone_id: String, filter by zone ID
context_lines: Number of context lines to include around matches (default: 2)
limit: Optional limit on number of devices to search (recommended for performance)
since: Start time range for backup filtering (unix timestamp)
until: End time range for backup filtering (unix timestamp)
Returns:
List of search result objects, each containing:
- device: Device information (id, address, description, vendor, type)
- backup: Backup information (id, validSince, validUntil, type)
- matches: List of pattern matches with:
- line_number: Line number of the match
- line_content: Content of the matching line
- context_before: Lines before the match
- context_after: Lines after the match
- match_groups: Regex capture groups if any
Raises:
UnimusValidationError: If pattern is invalid regex or parameters are invalid
UnimusError: If API requests fail
Performance Tips:
- Use specific device_filters to reduce search scope
- Set a reasonable limit parameter for large inventories
- Use time filters (since/until) to limit backup age
- Consider partial matches before exact regex patterns
Examples:
# Search for interface configurations
results = client.search_backup_content(
pattern=r"interface GigabitEthernet\d+/\d+",
device_filters={"vendor": "Cisco"},
limit=50
)
# Search for VLAN configurations with context
results = client.search_backup_content(
pattern=r"vlan \d+",
context_lines=3,
device_filters={"type": "IOS"}
)
"""
# Validate regex pattern
try:
compiled_pattern = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
except re.error as e:
raise UnimusValidationError(f"Invalid regex pattern: {str(e)}")
# Validate parameters
if context_lines < 0:
raise UnimusValidationError("context_lines must be non-negative")
if limit is not None and (not isinstance(limit, int) or limit <= 0):
raise UnimusValidationError("limit must be a positive integer")
# Log performance warning for unrestricted searches
if not device_filters and limit is None:
logger.warning(
f"Performance warning: search_backup_content() called without device_filters "
f"or limit parameters. This will search through ALL devices and may be slow "
f"with large inventories. Consider adding device_filters or limit for better performance."
)
# Get filtered devices
devices = self.get_devices(device_filters)
if limit:
devices = devices[:limit]
if not devices:
logger.info("No devices found matching the specified filters")
return []
logger.info(f"Searching backup content for {len(devices)} devices with pattern: {pattern}")
search_results = []
devices_searched = 0
devices_with_matches = 0
for device in devices:
device_id = device['id']
devices_searched += 1
try:
# Get latest backup for this device
latest_backup = self.get_device_latest_backup(device_id)
if not latest_backup:
logger.debug(f"Device {device_id} ({device.get('address', 'unknown')}) has no backups")
continue
# Skip if backup doesn't match time filters
if since is not None or until is not None:
backup_time = latest_backup.get('validSince', 0)
if since is not None and backup_time < since:
continue
if until is not None and backup_time > until:
continue
# Only search TEXT backups with decoded content
if latest_backup.get('type') != 'TEXT' or not latest_backup.get('content'):
logger.debug(f"Device {device_id} backup is not searchable (type: {latest_backup.get('type', 'unknown')})")
continue
backup_content = latest_backup['content']
lines = backup_content.split('\n')
# Search for pattern matches
device_matches = []
for line_num, line in enumerate(lines, 1):
match = compiled_pattern.search(line)
if match:
# Get context lines
start_idx = max(0, line_num - 1 - context_lines)
end_idx = min(len(lines), line_num + context_lines)
context_before = lines[start_idx:line_num-1] if line_num > 1 else []
context_after = lines[line_num:end_idx] if line_num < len(lines) else []
match_info = {
'line_number': line_num,
'line_content': line,
'context_before': context_before,
'context_after': context_after,
'match_groups': match.groups() if match.groups() else []
}
device_matches.append(match_info)
if device_matches:
devices_with_matches += 1
search_result = {
'device': {
'id': device['id'],
'address': device.get('address', ''),
'description': device.get('description', ''),
'vendor': device.get('vendor', ''),
'type': device.get('type', ''),
'model': device.get('model', '')
},
'backup': {
'id': latest_backup['id'],
'validSince': latest_backup.get('validSince'),
'validUntil': latest_backup.get('validUntil'),
'type': latest_backup['type']
},
'matches': device_matches
}
search_results.append(search_result)
except UnimusNotFoundError:
logger.debug(f"Device {device_id} not found or not accessible")
continue
except Exception as e:
logger.warning(f"Error searching device {device_id} ({device.get('address', 'unknown')}): {str(e)}")
continue
logger.info(
f"Backup content search completed: {devices_searched} devices searched, "
f"{devices_with_matches} devices with matches, {len(search_results)} total results"
)
return search_results
def validate_connection(self) -> bool:
"""
Validate that the connection to Unimus is working.
Returns:
True if connection is successful, False otherwise
"""
try:
health = self.get_health()
return health.get('status') in ['OK', 'LICENSING_UNREACHABLE']
except Exception as e:
logger.error(f"Connection validation failed: {e}")
return False
def _enrich_device_metadata(self, device: Dict[str, Any]) -> Dict[str, Any]:
"""
Enrich device data with calculated metadata fields (Phase 2 feature).
Adds computed fields such as backup health, timing information,
and connectivity status to the device object.
Args:
device: Base device object from API
Returns:
Device object with additional enriched metadata fields
"""
enriched_device = device.copy()
current_time = int(time.time())
device_id = device.get('id')
if not device_id:
logger.warning("Cannot enrich metadata: device ID missing")
return enriched_device
try:
# Backup-related metadata
backup_metadata = self._calculate_backup_metadata(device_id, current_time)
enriched_device.update(backup_metadata)
# Connectivity and timing metadata
connectivity_metadata = self._calculate_connectivity_metadata(device, current_time)
enriched_device.update(connectivity_metadata)
# Configuration health metadata
config_metadata = self._calculate_configuration_metadata(device_id, current_time)
enriched_device.update(config_metadata)
# Add metadata generation timestamp
enriched_device['_metadata'] = {
'enriched_at': current_time,
'enrichment_version': '0.5.0-phase2'
}
except Exception as e:
logger.warning(f"Failed to enrich metadata for device {device_id}: {e}")
# Add error indicator but don't fail the request
enriched_device['_metadata_error'] = str(e)
return enriched_device
def _calculate_backup_metadata(self, device_id: int, current_time: int) -> Dict[str, Any]:
"""Calculate backup-related metadata."""
metadata = {}
try:
# Get latest backup for timing analysis
latest_backup = self.get_device_latest_backup(device_id)
if latest_backup:
valid_since = latest_backup.get('validSince', 0)
valid_until = latest_backup.get('validUntil')
# Calculate backup age
metadata['backupAge'] = current_time - valid_since if valid_since else None
metadata['lastBackupTime'] = valid_since
metadata['lastBackupId'] = latest_backup.get('id')
metadata['backupType'] = latest_backup.get('type')
# Backup freshness indicator
if metadata['backupAge']:
if metadata['backupAge'] < 86400: # 1 day
metadata['backupFreshness'] = 'fresh'
elif metadata['backupAge'] < 604800: # 1 week
metadata['backupFreshness'] = 'recent'
elif metadata['backupAge'] < 2592000: # 1 month
metadata['backupFreshness'] = 'aging'
else:
metadata['backupFreshness'] = 'stale'
# Configuration validity period
if valid_until:
metadata['configurationStabilityPeriod'] = valid_until - valid_since
else:
metadata['configurationStabilityPeriod'] = current_time - valid_since
else:
metadata['backupAge'] = None
metadata['lastBackupTime'] = None
metadata['backupFreshness'] = 'none'
# Get backup count (limited to avoid performance issues)
backups = self.get_device_backups(device_id, limit=50)
metadata['backupCount'] = len(backups)
metadata['recentBackupCount'] = len(backups) # Limited sample
except Exception as e:
logger.debug(f"Failed to calculate backup metadata for device {device_id}: {e}")
metadata['backupAge'] = None
metadata['backupCount'] = 0
return metadata
def _calculate_connectivity_metadata(self, device: Dict[str, Any], current_time: int) -> Dict[str, Any]:
"""Calculate connectivity and status metadata."""
metadata = {}
# Basic status analysis
last_job_status = device.get('lastJobStatus', 'UNKNOWN')
metadata['lastJobStatus'] = last_job_status
metadata['deviceHealth'] = self._determine_device_health(last_job_status)
# Connection type analysis
connections = device.get('connections', [])
if connections:
connection_types = [conn.get('type') for conn in connections if conn.get('type')]
metadata['connectionTypes'] = list(set(connection_types))
metadata['primaryConnectionType'] = connection_types[0] if connection_types else None
# Port analysis
ports = [conn.get('port') for conn in connections if conn.get('port')]
metadata['connectionPorts'] = ports
else:
metadata['connectionTypes'] = []
metadata['primaryConnectionType'] = None
metadata['connectionPorts'] = []
# Device age analysis
create_time = device.get('createTime')
if create_time:
device_age = current_time - create_time
metadata['deviceAge'] = device_age
# Device lifecycle stage
if device_age < 86400: # 1 day
metadata['deviceLifecycle'] = 'new'
elif device_age < 2592000: # 1 month
metadata['deviceLifecycle'] = 'active'
elif device_age < 31536000: # 1 year
metadata['deviceLifecycle'] = 'established'
else:
metadata['deviceLifecycle'] = 'mature'
else:
metadata['deviceAge'] = None
metadata['deviceLifecycle'] = 'unknown'
return metadata
def _calculate_configuration_metadata(self, device_id: int, current_time: int) -> Dict[str, Any]:
"""Calculate configuration health and change metadata."""
metadata = {}
try:
# Get recent configuration changes (last 30 days)
thirty_days_ago = current_time - (30 * 24 * 60 * 60)
changed_devices = self.get_devices_with_changed_backups(since=thirty_days_ago)
# Check if this device has recent changes
device_has_changes = any(d.get('id') == device_id for d in changed_devices)
metadata['hasRecentChanges'] = device_has_changes
if device_has_changes:
# Find this device in the changed devices list
device_change_info = next((d for d in changed_devices if d.get('id') == device_id), {})
backups_in_period = device_change_info.get('backups', [])
metadata['recentChangeCount'] = len(backups_in_period)
# Calculate change frequency
if len(backups_in_period) > 1:
change_frequency = len(backups_in_period) / 30 # changes per day
metadata['changeFrequency'] = round(change_frequency, 3)
if change_frequency > 1:
metadata['configurationStability'] = 'unstable'
elif change_frequency > 0.1:
metadata['configurationStability'] = 'moderate'
else:
metadata['configurationStability'] = 'stable'
else:
metadata['changeFrequency'] = 0
metadata['configurationStability'] = 'stable'
else:
metadata['recentChangeCount'] = 0
metadata['changeFrequency'] = 0
metadata['configurationStability'] = 'stable'
except Exception as e:
logger.debug(f"Failed to calculate configuration metadata for device {device_id}: {e}")
metadata['hasRecentChanges'] = None
metadata['configurationStability'] = 'unknown'
return metadata
def _determine_device_health(self, last_job_status: str) -> str:
"""Determine overall device health based on last job status."""
status_mapping = {
'SUCCESSFUL': 'healthy',
'FAILED': 'unhealthy',
'UNKNOWN': 'unknown'
}
return status_mapping.get(last_job_status, 'unknown')
@unimus_cache(ttl_category='relationships')
def get_device_relationships(
self,
device_id: int,
include_network_neighbors: bool = True,
include_zone_peers: bool = True,
include_connection_analysis: bool = True
) -> Dict[str, Any]:
"""
Analyze and discover device relationships and topology connections (Phase 3 feature).
Discovers device relationships through:
- Network topology analysis (IP ranges, subnets)
- Zone-based device groupings
- Connection pattern analysis
- Configuration-based neighbor discovery
Args:
device_id: The device ID to analyze relationships for
include_network_neighbors: Whether to discover network neighbors via IP analysis
include_zone_peers: Whether to include devices in the same zone
include_connection_analysis: Whether to analyze connection patterns
Returns:
Dictionary containing relationship analysis:
- device: Base device information
- networkNeighbors: Devices in same network segments
- zonePeers: Devices in same zone
- connectionPatterns: Analysis of connection methods and paths
- topologyInsights: Network topology analysis
- relationshipMetadata: Calculated relationship metrics
Raises:
UnimusNotFoundError: If device with given ID doesn't exist
"""
try:
# Get the target device with full details
target_device = self.get_device_by_id(device_id, include_attributes=['connections'], enrich_metadata=True)
relationships = {
'device': target_device,
'networkNeighbors': [],
'zonePeers': [],
'connectionPatterns': {},
'topologyInsights': {},
'relationshipMetadata': {}
}
# Get all devices for relationship analysis
all_devices = self.get_devices()
if include_network_neighbors:
relationships['networkNeighbors'] = self._discover_network_neighbors(target_device, all_devices)
if include_zone_peers:
relationships['zonePeers'] = self._discover_zone_peers(target_device, all_devices)
if include_connection_analysis:
relationships['connectionPatterns'] = self._analyze_connection_patterns(target_device, all_devices)
# Generate topology insights
relationships['topologyInsights'] = self._generate_topology_insights(target_device, relationships)
# Calculate relationship metadata
relationships['relationshipMetadata'] = self._calculate_relationship_metadata(relationships)
return relationships
except UnimusNotFoundError:
raise
except Exception as e:
raise UnimusError(f"Failed to analyze device relationships: {str(e)}")
def _discover_network_neighbors(self, target_device: Dict[str, Any], all_devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Discover devices in the same network segments."""
import ipaddress
neighbors = []
target_address = target_device.get('address')
if not target_address:
return neighbors
try:
# Try to parse as IP address for network analysis
target_ip = ipaddress.ip_address(target_address)
for device in all_devices:
if device.get('id') == target_device.get('id'):
continue # Skip self
device_address = device.get('address')
if not device_address:
continue
try:
device_ip = ipaddress.ip_address(device_address)
# Check if in same network (different subnet assumptions)
network_relationship = self._analyze_network_relationship(target_ip, device_ip)
if network_relationship['is_related']:
neighbor_info = {
'device': {
'id': device.get('id'),
'address': device.get('address'),
'description': device.get('description'),
'vendor': device.get('vendor'),
'type': device.get('type'),
'managed': device.get('managed')
},
'networkRelationship': network_relationship
}
neighbors.append(neighbor_info)
except ValueError:
# Not a valid IP address, skip network analysis
continue
except ValueError:
# Target address is not a valid IP, cannot do network analysis
logger.debug(f"Target device address '{target_address}' is not a valid IP for network analysis")
return neighbors
def _analyze_network_relationship(self, ip1: 'ipaddress.IPv4Address', ip2: 'ipaddress.IPv4Address') -> Dict[str, Any]:
"""Analyze the network relationship between two IP addresses."""
import ipaddress
relationship = {
'is_related': False,
'relationship_type': 'unrelated',
'subnet_mask': None,
'network_distance': None
}
# Check common subnet masks
common_subnets = [24, 25, 26, 27, 28, 16, 20, 22]
for subnet_bits in common_subnets:
try:
# Create networks with different subnet masks
network1 = ipaddress.ip_network(f"{ip1}/{subnet_bits}", strict=False)
network2 = ipaddress.ip_network(f"{ip2}/{subnet_bits}", strict=False)
if network1 == network2:
relationship['is_related'] = True
relationship['relationship_type'] = f"same_subnet_/{subnet_bits}"
relationship['subnet_mask'] = subnet_bits
# Calculate network "distance" (difference in host portion)
host_bits = 32 - subnet_bits
max_hosts = 2 ** host_bits
host1 = int(ip1) & ((1 << host_bits) - 1)
host2 = int(ip2) & ((1 << host_bits) - 1)
relationship['network_distance'] = abs(host1 - host2)
break # Use the most specific subnet match
except ValueError:
continue
return relationship
def _discover_zone_peers(self, target_device: Dict[str, Any], all_devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Discover devices in the same zone."""
zone_peers = []
target_zone_id = target_device.get('zoneId')
if not target_zone_id:
return zone_peers
for device in all_devices:
if device.get('id') == target_device.get('id'):
continue # Skip self
if device.get('zoneId') == target_zone_id:
peer_info = {
'device': {
'id': device.get('id'),
'address': device.get('address'),
'description': device.get('description'),
'vendor': device.get('vendor'),
'type': device.get('type'),
'managed': device.get('managed'),
'lastJobStatus': device.get('lastJobStatus')
},
'zoneRelationship': {
'shared_zone_id': target_zone_id,
'relationship_type': 'zone_peer'
}
}
zone_peers.append(peer_info)
return zone_peers
def _analyze_connection_patterns(self, target_device: Dict[str, Any], all_devices: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze connection patterns and methods."""
patterns = {
'target_connections': [],
'similar_connection_devices': [],
'connection_statistics': {},
'security_analysis': {}
}
target_connections = target_device.get('connections', [])
patterns['target_connections'] = target_connections
if not target_connections:
return patterns
# Analyze target device connections
target_connection_types = [conn.get('type') for conn in target_connections]
target_ports = [conn.get('port') for conn in target_connections if conn.get('port')]
similar_devices = []
connection_type_counts = {}
port_usage_counts = {}
for device in all_devices:
if device.get('id') == target_device.get('id'):
continue
device_connections = device.get('connections', [])
if not device_connections:
continue
device_connection_types = [conn.get('type') for conn in device_connections]
device_ports = [conn.get('port') for conn in device_connections if conn.get('port')]
# Count connection types
for conn_type in device_connection_types:
connection_type_counts[conn_type] = connection_type_counts.get(conn_type, 0) + 1
# Count port usage
for port in device_ports:
port_usage_counts[port] = port_usage_counts.get(port, 0) + 1
# Find devices with similar connection patterns
if any(ct in target_connection_types for ct in device_connection_types):
similarity_score = len(set(target_connection_types) & set(device_connection_types)) / len(set(target_connection_types + device_connection_types))
similar_device = {
'device': {
'id': device.get('id'),
'address': device.get('address'),
'description': device.get('description'),
'vendor': device.get('vendor'),
'type': device.get('type')
},
'connection_similarity': {
'similarity_score': round(similarity_score, 3),
'shared_connection_types': list(set(target_connection_types) & set(device_connection_types)),
'shared_ports': list(set(target_ports) & set(device_ports))
}
}
similar_devices.append(similar_device)
# Sort by similarity score
similar_devices.sort(key=lambda x: x['connection_similarity']['similarity_score'], reverse=True)
patterns['similar_connection_devices'] = similar_devices[:10] # Top 10 most similar
# Connection statistics
patterns['connection_statistics'] = {
'connection_type_distribution': connection_type_counts,
'port_usage_distribution': port_usage_counts,
'total_devices_analyzed': len(all_devices)
}
# Security analysis
patterns['security_analysis'] = self._analyze_connection_security(target_connections, connection_type_counts)
return patterns
def _analyze_connection_security(self, target_connections: List[Dict[str, Any]], type_distribution: Dict[str, int]) -> Dict[str, Any]:
"""Analyze connection security patterns."""
security_analysis = {
'security_score': 'unknown',
'secure_protocols': 0,
'insecure_protocols': 0,
'recommendations': []
}
secure_protocols = ['SSH']
insecure_protocols = ['TELNET']
for connection in target_connections:
conn_type = connection.get('type', '').upper()
if conn_type in secure_protocols:
security_analysis['secure_protocols'] += 1
elif conn_type in insecure_protocols:
security_analysis['insecure_protocols'] += 1
total_connections = len(target_connections)
if total_connections > 0:
secure_ratio = security_analysis['secure_protocols'] / total_connections
if secure_ratio >= 1.0:
security_analysis['security_score'] = 'excellent'
elif secure_ratio >= 0.8:
security_analysis['security_score'] = 'good'
elif secure_ratio >= 0.5:
security_analysis['security_score'] = 'moderate'
else:
security_analysis['security_score'] = 'poor'
if security_analysis['insecure_protocols'] > 0:
security_analysis['recommendations'].append("Consider migrating from TELNET to SSH for better security")
# Analyze against network-wide patterns
total_ssh = type_distribution.get('SSH', 0)
total_telnet = type_distribution.get('TELNET', 0)
if total_telnet > total_ssh:
security_analysis['recommendations'].append("Network has more TELNET than SSH connections - consider security audit")
return security_analysis
def _generate_topology_insights(self, target_device: Dict[str, Any], relationships: Dict[str, Any]) -> Dict[str, Any]:
"""Generate high-level topology insights."""
insights = {
'network_position': 'unknown',
'connectivity_role': 'unknown',
'zone_importance': 'unknown',
'topology_metrics': {}
}
# Analyze network position
neighbor_count = len(relationships.get('networkNeighbors', []))
zone_peer_count = len(relationships.get('zonePeers', []))
if neighbor_count > 10:
insights['network_position'] = 'highly_connected'
elif neighbor_count > 5:
insights['network_position'] = 'moderately_connected'
elif neighbor_count > 0:
insights['network_position'] = 'lightly_connected'
else:
insights['network_position'] = 'isolated'
# Analyze connectivity role
connection_patterns = relationships.get('connectionPatterns', {})
similar_devices_count = len(connection_patterns.get('similar_connection_devices', []))
if similar_devices_count > 20:
insights['connectivity_role'] = 'infrastructure_hub'
elif similar_devices_count > 10:
insights['connectivity_role'] = 'network_node'
elif similar_devices_count > 0:
insights['connectivity_role'] = 'endpoint'
else:
insights['connectivity_role'] = 'standalone'
# Analyze zone importance
if zone_peer_count > 50:
insights['zone_importance'] = 'critical'
elif zone_peer_count > 20:
insights['zone_importance'] = 'important'
elif zone_peer_count > 5:
insights['zone_importance'] = 'moderate'
else:
insights['zone_importance'] = 'minimal'
# Topology metrics
insights['topology_metrics'] = {
'network_neighbor_count': neighbor_count,
'zone_peer_count': zone_peer_count,
'connection_similarity_count': similar_devices_count,
'total_relationships': neighbor_count + zone_peer_count
}
return insights
def _calculate_relationship_metadata(self, relationships: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate comprehensive relationship metadata."""
import time
metadata = {
'analysis_timestamp': int(time.time()),
'analysis_version': '0.5.0-phase3',
'relationship_summary': {},
'network_analysis': {},
'zone_analysis': {},
'connectivity_analysis': {}
}
# Relationship summary
network_neighbors = relationships.get('networkNeighbors', [])
zone_peers = relationships.get('zonePeers', [])
connection_patterns = relationships.get('connectionPatterns', {})
metadata['relationship_summary'] = {
'total_network_neighbors': len(network_neighbors),
'total_zone_peers': len(zone_peers),
'total_connection_similarities': len(connection_patterns.get('similar_connection_devices', [])),
'has_relationships': len(network_neighbors) > 0 or len(zone_peers) > 0
}
# Network analysis metadata
if network_neighbors:
subnet_types = [neighbor.get('networkRelationship', {}).get('relationship_type') for neighbor in network_neighbors]
most_common_subnet = max(set(subnet_types), key=subnet_types.count) if subnet_types else None
metadata['network_analysis'] = {
'most_common_subnet_relationship': most_common_subnet,
'subnet_diversity': len(set(subnet_types)),
'average_network_distance': sum(
neighbor.get('networkRelationship', {}).get('network_distance', 0)
for neighbor in network_neighbors
) / len(network_neighbors) if network_neighbors else 0
}
# Zone analysis metadata
if zone_peers:
zone_vendors = [peer.get('device', {}).get('vendor') for peer in zone_peers]
zone_types = [peer.get('device', {}).get('type') for peer in zone_peers]
metadata['zone_analysis'] = {
'vendor_diversity': len(set(filter(None, zone_vendors))),
'type_diversity': len(set(filter(None, zone_types))),
'managed_percentage': (
sum(1 for peer in zone_peers if peer.get('device', {}).get('managed')) / len(zone_peers) * 100
) if zone_peers else 0
}
# Connectivity analysis metadata
connection_stats = connection_patterns.get('connection_statistics', {})
security_analysis = connection_patterns.get('security_analysis', {})
metadata['connectivity_analysis'] = {
'connection_type_count': len(connection_stats.get('connection_type_distribution', {})),
'security_score': security_analysis.get('security_score', 'unknown'),
'has_security_recommendations': len(security_analysis.get('recommendations', [])) > 0
}
return metadata
@unimus_cache(ttl_category='topology')
def get_network_topology_analysis(
self,
zone_id: Optional[str] = None,
include_clusters: bool = True,
include_security_analysis: bool = True
) -> Dict[str, Any]:
"""
Analyze network topology and device clusters across the entire infrastructure (Phase 3 feature).
Provides comprehensive network-wide topology analysis including:
- Device cluster identification and mapping
- Network segment analysis
- Zone-based topology insights
- Security pattern analysis across the infrastructure
Args:
zone_id: Optional zone ID to limit analysis scope (default: analyze all zones)
include_clusters: Whether to perform device clustering analysis
include_security_analysis: Whether to include security pattern analysis
Returns:
Dictionary containing comprehensive topology analysis:
- networkOverview: High-level network statistics and metrics
- deviceClusters: Identified device clusters and groupings
- networkSegments: Discovered network segments and subnets
- zoneTopology: Zone-based topology analysis
- securityPatterns: Network-wide security analysis
- topologyMetadata: Analysis metadata and insights
Raises:
UnimusError: If topology analysis fails
"""
try:
# Get all devices for analysis
device_filters = {'zoneId': zone_id} if zone_id else {}
all_devices = self.get_devices(device_filters)
topology_analysis = {
'networkOverview': {},
'deviceClusters': [],
'networkSegments': [],
'zoneTopology': {},
'securityPatterns': {},
'topologyMetadata': {}
}
# Generate network overview
topology_analysis['networkOverview'] = self._generate_network_overview(all_devices)
if include_clusters:
topology_analysis['deviceClusters'] = self._identify_device_clusters(all_devices)
topology_analysis['networkSegments'] = self._analyze_network_segments(all_devices)
# Zone topology analysis
topology_analysis['zoneTopology'] = self._analyze_zone_topology(all_devices)
if include_security_analysis:
topology_analysis['securityPatterns'] = self._analyze_network_security_patterns(all_devices)
# Generate topology metadata
topology_analysis['topologyMetadata'] = self._generate_topology_metadata(topology_analysis, all_devices)
return topology_analysis
except Exception as e:
raise UnimusError(f"Failed to analyze network topology: {str(e)}")
def _generate_network_overview(self, devices: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate high-level network overview statistics."""
overview = {
'total_devices': len(devices),
'managed_devices': 0,
'vendor_distribution': {},
'type_distribution': {},
'zone_distribution': {},
'connection_distribution': {},
'health_distribution': {}
}
for device in devices:
# Count managed devices
if device.get('managed'):
overview['managed_devices'] += 1
# Vendor distribution
vendor = device.get('vendor')
if vendor:
overview['vendor_distribution'][vendor] = overview['vendor_distribution'].get(vendor, 0) + 1
# Type distribution
device_type = device.get('type')
if device_type:
overview['type_distribution'][device_type] = overview['type_distribution'].get(device_type, 0) + 1
# Zone distribution
zone_id = device.get('zoneId')
if zone_id:
overview['zone_distribution'][zone_id] = overview['zone_distribution'].get(zone_id, 0) + 1
# Health distribution
health_status = device.get('lastJobStatus', 'UNKNOWN')
overview['health_distribution'][health_status] = overview['health_distribution'].get(health_status, 0) + 1
# Connection distribution (requires connections data)
connections = device.get('connections', [])
for connection in connections:
conn_type = connection.get('type')
if conn_type:
overview['connection_distribution'][conn_type] = overview['connection_distribution'].get(conn_type, 0) + 1
# Calculate percentages
total = overview['total_devices']
if total > 0:
overview['managed_percentage'] = round((overview['managed_devices'] / total) * 100, 1)
else:
overview['managed_percentage'] = 0
return overview
def _identify_device_clusters(self, devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Identify device clusters based on various criteria."""
import ipaddress
clusters = []
# Group devices by network clusters
network_clusters = {}
vendor_clusters = {}
zone_clusters = {}
for device in devices:
device_address = device.get('address')
vendor = device.get('vendor')
zone_id = device.get('zoneId')
# Network-based clustering
if device_address:
try:
ip = ipaddress.ip_address(device_address)
# Use /24 subnet for clustering
network = ipaddress.ip_network(f"{ip}/24", strict=False)
network_key = str(network)
if network_key not in network_clusters:
network_clusters[network_key] = {
'cluster_type': 'network_subnet',
'cluster_id': network_key,
'devices': []
}
network_clusters[network_key]['devices'].append({
'id': device.get('id'),
'address': device.get('address'),
'description': device.get('description'),
'vendor': device.get('vendor'),
'type': device.get('type')
})
except ValueError:
# Not a valid IP address
pass
# Vendor-based clustering
if vendor:
if vendor not in vendor_clusters:
vendor_clusters[vendor] = {
'cluster_type': 'vendor_group',
'cluster_id': vendor,
'devices': []
}
vendor_clusters[vendor]['devices'].append({
'id': device.get('id'),
'address': device.get('address'),
'description': device.get('description'),
'vendor': device.get('vendor'),
'type': device.get('type')
})
# Zone-based clustering
if zone_id:
if zone_id not in zone_clusters:
zone_clusters[zone_id] = {
'cluster_type': 'zone_group',
'cluster_id': zone_id,
'devices': []
}
zone_clusters[zone_id]['devices'].append({
'id': device.get('id'),
'address': device.get('address'),
'description': device.get('description'),
'vendor': device.get('vendor'),
'type': device.get('type')
})
# Add clusters with meaningful device counts
for cluster in network_clusters.values():
if len(cluster['devices']) >= 2: # At least 2 devices to form a cluster
cluster['device_count'] = len(cluster['devices'])
clusters.append(cluster)
for cluster in vendor_clusters.values():
if len(cluster['devices']) >= 3: # At least 3 devices to form a vendor cluster
cluster['device_count'] = len(cluster['devices'])
clusters.append(cluster)
for cluster in zone_clusters.values():
if len(cluster['devices']) >= 2: # At least 2 devices to form a zone cluster
cluster['device_count'] = len(cluster['devices'])
clusters.append(cluster)
# Sort clusters by device count
clusters.sort(key=lambda x: x['device_count'], reverse=True)
return clusters
def _analyze_network_segments(self, devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Analyze network segments and subnets."""
import ipaddress
segments = {}
for device in devices:
device_address = device.get('address')
if not device_address:
continue
try:
ip = ipaddress.ip_address(device_address)
# Analyze different subnet sizes
for subnet_bits in [16, 20, 22, 24, 25, 26, 27, 28]:
network = ipaddress.ip_network(f"{ip}/{subnet_bits}", strict=False)
network_key = f"{network}_{subnet_bits}"
if network_key not in segments:
segments[network_key] = {
'network': str(network),
'subnet_mask': subnet_bits,
'device_count': 0,
'devices': [],
'vendor_diversity': set(),
'type_diversity': set()
}
segments[network_key]['device_count'] += 1
segments[network_key]['devices'].append({
'id': device.get('id'),
'address': device.get('address'),
'description': device.get('description')
})
if device.get('vendor'):
segments[network_key]['vendor_diversity'].add(device.get('vendor'))
if device.get('type'):
segments[network_key]['type_diversity'].add(device.get('type'))
except ValueError:
# Not a valid IP address
continue
# Convert to list and filter meaningful segments
segment_list = []
for segment_data in segments.values():
if segment_data['device_count'] >= 2: # At least 2 devices
segment_data['vendor_diversity'] = len(segment_data['vendor_diversity'])
segment_data['type_diversity'] = len(segment_data['type_diversity'])
segment_list.append(segment_data)
# Sort by device count
segment_list.sort(key=lambda x: x['device_count'], reverse=True)
return segment_list[:20] # Return top 20 segments
def _analyze_zone_topology(self, devices: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze topology patterns within and across zones."""
zone_analysis = {}
zones = {}
for device in devices:
zone_id = device.get('zoneId')
if not zone_id:
continue
if zone_id not in zones:
zones[zone_id] = {
'zone_id': zone_id,
'device_count': 0,
'managed_count': 0,
'vendors': set(),
'types': set(),
'health_status': {},
'connections': {}
}
zone_data = zones[zone_id]
zone_data['device_count'] += 1
if device.get('managed'):
zone_data['managed_count'] += 1
if device.get('vendor'):
zone_data['vendors'].add(device.get('vendor'))
if device.get('type'):
zone_data['types'].add(device.get('type'))
# Health status distribution
health = device.get('lastJobStatus', 'UNKNOWN')
zone_data['health_status'][health] = zone_data['health_status'].get(health, 0) + 1
# Connection analysis
connections = device.get('connections', [])
for connection in connections:
conn_type = connection.get('type')
if conn_type:
zone_data['connections'][conn_type] = zone_data['connections'].get(conn_type, 0) + 1
# Convert sets to counts and calculate percentages
for zone_id, zone_data in zones.items():
zone_data['vendor_diversity'] = len(zone_data['vendors'])
zone_data['type_diversity'] = len(zone_data['types'])
zone_data['managed_percentage'] = round(
(zone_data['managed_count'] / zone_data['device_count']) * 100, 1
) if zone_data['device_count'] > 0 else 0
# Remove sets (not JSON serializable)
del zone_data['vendors']
del zone_data['types']
zone_analysis['zones'] = zones
zone_analysis['total_zones'] = len(zones)
zone_analysis['total_devices'] = sum(z['device_count'] for z in zones.values())
return zone_analysis
def _analyze_network_security_patterns(self, devices: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze security patterns across the network."""
security_patterns = {
'connection_security': {},
'protocol_distribution': {},
'security_recommendations': [],
'risk_assessment': {}
}
total_connections = 0
secure_connections = 0
insecure_connections = 0
protocol_counts = {}
for device in devices:
connections = device.get('connections', [])
for connection in connections:
total_connections += 1
conn_type = connection.get('type', '').upper()
# Count protocols
protocol_counts[conn_type] = protocol_counts.get(conn_type, 0) + 1
# Security classification
if conn_type == 'SSH':
secure_connections += 1
elif conn_type == 'TELNET':
insecure_connections += 1
# Calculate security metrics
if total_connections > 0:
security_patterns['connection_security'] = {
'total_connections': total_connections,
'secure_connections': secure_connections,
'insecure_connections': insecure_connections,
'secure_percentage': round((secure_connections / total_connections) * 100, 1),
'insecure_percentage': round((insecure_connections / total_connections) * 100, 1)
}
security_patterns['protocol_distribution'] = protocol_counts
# Generate recommendations
if total_connections > 0:
insecure_ratio = insecure_connections / total_connections
if insecure_ratio > 0.5:
security_patterns['security_recommendations'].append(
"CRITICAL: More than 50% of connections use insecure protocols (TELNET). Immediate migration to SSH recommended."
)
elif insecure_ratio > 0.25:
security_patterns['security_recommendations'].append(
"WARNING: More than 25% of connections use insecure protocols. Plan migration to SSH."
)
elif insecure_ratio > 0:
security_patterns['security_recommendations'].append(
"INFO: Some TELNET connections detected. Consider migrating remaining devices to SSH."
)
else:
security_patterns['security_recommendations'].append(
"EXCELLENT: All connections use secure protocols (SSH)."
)
# Risk assessment
if total_connections > 0:
if insecure_ratio > 0.5:
risk_level = 'HIGH'
elif insecure_ratio > 0.25:
risk_level = 'MEDIUM'
elif insecure_ratio > 0:
risk_level = 'LOW'
else:
risk_level = 'MINIMAL'
security_patterns['risk_assessment'] = {
'overall_risk_level': risk_level,
'insecure_device_ratio': round(insecure_ratio * 100, 1),
'security_score': round((secure_connections / total_connections) * 100, 1)
}
return security_patterns
def _generate_topology_metadata(self, topology_analysis: Dict[str, Any], devices: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate comprehensive topology analysis metadata."""
import time
metadata = {
'analysis_timestamp': int(time.time()),
'analysis_version': '0.5.0-phase3',
'devices_analyzed': len(devices),
'analysis_scope': {},
'cluster_insights': {},
'network_insights': {},
'security_insights': {}
}
# Analysis scope
overview = topology_analysis.get('networkOverview', {})
metadata['analysis_scope'] = {
'total_devices': overview.get('total_devices', 0),
'managed_devices': overview.get('managed_devices', 0),
'zones_analyzed': len(overview.get('zone_distribution', {})),
'vendors_present': len(overview.get('vendor_distribution', {})),
'device_types_present': len(overview.get('type_distribution', {}))
}
# Cluster insights
clusters = topology_analysis.get('deviceClusters', [])
if clusters:
cluster_types = [cluster.get('cluster_type') for cluster in clusters]
largest_cluster = max(clusters, key=lambda x: x.get('device_count', 0))
metadata['cluster_insights'] = {
'total_clusters': len(clusters),
'cluster_types': list(set(cluster_types)),
'largest_cluster_size': largest_cluster.get('device_count', 0),
'largest_cluster_type': largest_cluster.get('cluster_type'),
'average_cluster_size': round(
sum(cluster.get('device_count', 0) for cluster in clusters) / len(clusters), 1
) if clusters else 0
}
# Network insights
segments = topology_analysis.get('networkSegments', [])
if segments:
subnet_masks = [segment.get('subnet_mask') for segment in segments]
most_common_mask = max(set(subnet_masks), key=subnet_masks.count) if subnet_masks else None
metadata['network_insights'] = {
'network_segments_discovered': len(segments),
'most_common_subnet_mask': most_common_mask,
'largest_segment_size': max(segment.get('device_count', 0) for segment in segments),
'average_segment_size': round(
sum(segment.get('device_count', 0) for segment in segments) / len(segments), 1
) if segments else 0
}
# Security insights
security = topology_analysis.get('securityPatterns', {})
if security:
risk_assessment = security.get('risk_assessment', {})
metadata['security_insights'] = {
'overall_risk_level': risk_assessment.get('overall_risk_level', 'UNKNOWN'),
'security_score': risk_assessment.get('security_score', 0),
'has_security_recommendations': len(security.get('security_recommendations', [])) > 0,
'total_protocols_in_use': len(security.get('protocol_distribution', {}))
}
return metadata