Skip to main content
Glama

Unimus MCP Server

unimus_client.py93.8 kB
#!/usr/bin/env python3 """ Unimus Client Library This module provides a REST API client for Unimus network configuration management system. Based on Unimus API v.2 documentation. """ import requests import base64 import binascii import re import time import hashlib import json import functools import threading from typing import Any, Dict, List, Optional, Union import logging # Cache backend imports try: from cachetools import TTLCache from diskcache import Cache as DiskCache CACHE_AVAILABLE = True except ImportError: TTLCache = None DiskCache = None CACHE_AVAILABLE = False logger = logging.getLogger(__name__) def generate_cache_key(func, *args, **kwargs) -> str: """ Generate a unique cache key for a function call. Args: func: The function being called *args: Positional arguments (excluding 'self') **kwargs: Keyword arguments Returns: MD5 hash string representing the unique cache key """ # Sort kwargs for consistent key generation kwargs_sorted = sorted(kwargs.items()) # Create a representation of the function call call_representation = { 'func': func.__name__, 'args': args, # Skip 'self' as it's not passed here 'kwargs': kwargs_sorted } # Serialize to stable JSON string try: serialized_call = json.dumps(call_representation, sort_keys=True, default=str) except (TypeError, ValueError) as e: # Fallback for non-serializable objects logger.warning(f"Failed to serialize cache key data: {e}, using string representation") serialized_call = str(call_representation) # Generate MD5 hash for compact and unique key return hashlib.md5(serialized_call.encode('utf-8')).hexdigest() def unimus_cache(ttl_category: str = 'default'): """ Decorator for caching Unimus API responses. Args: ttl_category: Category for TTL lookup in cache configuration Returns: Decorated function with caching capability """ def decorator(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): # Check if caching is available and enabled if not CACHE_AVAILABLE or not hasattr(self, 'cache_enabled') or not self.cache_enabled: return func(self, *args, **kwargs) # Get cache backend and TTL cache_backend = getattr(self, 'cache_backend', None) if cache_backend is None: return func(self, *args, **kwargs) # Get TTL for this operation ttl_config = getattr(self, 'cache_ttl_config', None) if ttl_config: ttl = getattr(ttl_config, ttl_category, ttl_config.default) else: ttl = 300 # Default fallback # Generate cache key cache_key = generate_cache_key(func, *args, **kwargs) # Try to get from cache try: if hasattr(cache_backend, '__contains__') and hasattr(cache_backend, '__getitem__'): # TTLCache and dict-like objects if cache_key in cache_backend: result = cache_backend[cache_key] # Update cache statistics if hasattr(self, 'cache_stats'): self.cache_stats['hits'] += 1 logger.debug(f"Cache hit for {func.__name__} (key: {cache_key[:8]}...)") return result elif hasattr(cache_backend, 'get'): # DiskCache result = cache_backend.get(cache_key) if result is not None: # Update cache statistics if hasattr(self, 'cache_stats'): self.cache_stats['hits'] += 1 logger.debug(f"Cache hit for {func.__name__} (key: {cache_key[:8]}...)") return result except Exception as e: logger.warning(f"Cache retrieval error for {func.__name__}: {e}") # Cache miss - execute original function if hasattr(self, 'cache_stats'): self.cache_stats['misses'] += 1 logger.debug(f"Cache miss for {func.__name__} (key: {cache_key[:8]}...), executing function") result = func(self, *args, **kwargs) # Store result in cache try: if hasattr(cache_backend, '__setitem__'): # TTLCache and dict-like objects cache_backend[cache_key] = result logger.debug(f"Cached result for {func.__name__} (TTL: {ttl}s)") elif hasattr(cache_backend, 'set'): # DiskCache cache_backend.set(cache_key, result, expire=ttl) logger.debug(f"Cached result for {func.__name__} (TTL: {ttl}s)") except Exception as e: logger.warning(f"Cache storage error for {func.__name__}: {e}") return result return wrapper return decorator class UnimusError(Exception): """Base exception for Unimus API errors.""" pass class UnimusAuthenticationError(UnimusError): """Authentication related errors (401).""" pass class UnimusNotFoundError(UnimusError): """Resource not found errors (404).""" pass class UnimusValidationError(UnimusError): """Validation errors (400, 422).""" pass class UnimusRestClient: """ Unimus client implementation using the REST API v.2. This client provides read-only access to Unimus devices, backups, and related data. All methods handle pagination transparently where applicable. """ def __init__(self, url: str, token: str, verify_ssl: bool = True, timeout: int = 30, config=None): """ Initialize the REST API client. Args: url: The base URL of the Unimus instance (e.g., 'https://unimus.example.com') token: API token for authentication verify_ssl: Whether to verify SSL certificates timeout: Request timeout in seconds config: Optional UnimusConfig instance for cache configuration """ self.base_url = url.rstrip('/') self.api_url = f"{self.base_url}/api/v2" self.token = token self.verify_ssl = verify_ssl self.timeout = timeout self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json', 'Accept': 'application/json', }) # Initialize caching self.config = config self._setup_cache() def _setup_cache(self): """ Initialize cache backend based on configuration. """ # Default cache settings (disabled) self.cache_enabled = False self.cache_backend = None self.cache_ttl_config = None self.cache_stats = { 'hits': 0, 'misses': 0, 'total_requests': 0 } # Check if caching is available and configured if not CACHE_AVAILABLE: logger.info("Cache libraries not available, caching disabled") return if not self.config or not hasattr(self.config, 'cache'): logger.debug("No cache configuration provided, caching disabled") return cache_config = self.config.cache if not cache_config.enabled: logger.debug("Cache explicitly disabled in configuration") return # Store TTL configuration self.cache_ttl_config = cache_config.ttl try: # Setup cache backend if cache_config.backend == 'disk': if DiskCache is None: logger.error("Disk cache requested but diskcache not available") return # Create cache directory if needed import os cache_path = cache_config.path or "/tmp/unimus_mcp_cache" os.makedirs(cache_path, exist_ok=True) # Initialize DiskCache with size limit size_limit_bytes = cache_config.size_limit_mb * 1024 * 1024 self.cache_backend = DiskCache(cache_path, size_limit=size_limit_bytes) logger.info(f"Initialized disk cache at {cache_path} (limit: {cache_config.size_limit_mb}MB)") else: # memory backend if TTLCache is None: logger.error("Memory cache requested but cachetools not available") return # Initialize TTLCache with default TTL # Note: TTLCache uses a global TTL for all items self.cache_backend = TTLCache( maxsize=cache_config.max_items, ttl=cache_config.ttl.default ) logger.info(f"Initialized memory cache (max items: {cache_config.max_items}, default TTL: {cache_config.ttl.default}s)") # Enable caching self.cache_enabled = True # Initialize statistics tracking if cache_config.enable_stats: self.cache_stats.update({ 'hits': 0, 'misses': 0, 'total_requests': 0, 'backend': cache_config.backend, 'max_items': cache_config.max_items, 'size_limit_mb': cache_config.size_limit_mb }) logger.info(f"Cache system initialized: {cache_config.backend} backend, stats: {cache_config.enable_stats}") except Exception as e: logger.error(f"Failed to initialize cache: {e}") self.cache_enabled = False self.cache_backend = None def get_cache_stats(self) -> Dict[str, Any]: """ Get cache performance statistics. Returns: Dictionary with cache statistics """ if not self.cache_enabled: return {'enabled': False, 'message': 'Caching is disabled'} stats = self.cache_stats.copy() stats['enabled'] = True stats['total_requests'] = stats['hits'] + stats['misses'] if stats['total_requests'] > 0: stats['hit_ratio'] = stats['hits'] / stats['total_requests'] else: stats['hit_ratio'] = 0.0 # Add backend-specific stats try: if hasattr(self.cache_backend, '__len__'): stats['current_size'] = len(self.cache_backend) if hasattr(self.cache_backend, 'currsize'): # TTLCache stats['current_size'] = self.cache_backend.currsize stats['max_size'] = self.cache_backend.maxsize elif hasattr(self.cache_backend, 'volume'): # DiskCache stats['volume'] = self.cache_backend.volume() stats['size_mb'] = self.cache_backend.volume() / (1024 * 1024) except Exception as e: logger.debug(f"Error getting cache backend stats: {e}") return stats def clear_cache(self): """ Clear all cached data. """ if not self.cache_enabled or not self.cache_backend: return try: if hasattr(self.cache_backend, 'clear'): self.cache_backend.clear() elif hasattr(self.cache_backend, 'evict'): # DiskCache self.cache_backend.evict(0) # Evict all items logger.info("Cache cleared successfully") # Reset statistics self.cache_stats.update({ 'hits': 0, 'misses': 0, 'total_requests': 0 }) except Exception as e: logger.error(f"Failed to clear cache: {e}") def _build_url(self, endpoint: str) -> str: """Build the full URL for an API request.""" endpoint = endpoint.strip('/') return f"{self.api_url}/{endpoint}" def _handle_response(self, response: requests.Response) -> Union[Dict[str, Any], List[Dict[str, Any]]]: """ Handle API response and raise appropriate exceptions for errors. Args: response: The requests Response object Returns: Parsed JSON response data Raises: UnimusAuthenticationError: For 401 errors UnimusNotFoundError: For 404 errors UnimusValidationError: For 400/422 errors UnimusError: For other API errors """ try: data = response.json() except ValueError: # Non-JSON response response.raise_for_status() return {} if response.status_code >= 400: error_msg = data.get('message', f'HTTP {response.status_code}') if response.status_code == 401: raise UnimusAuthenticationError(f"Authentication failed: {error_msg}") elif response.status_code == 404: raise UnimusNotFoundError(f"Resource not found: {error_msg}") elif response.status_code in (400, 422): raise UnimusValidationError(f"Validation error: {error_msg}") else: raise UnimusError(f"API error ({response.status_code}): {error_msg}") return data def _decode_backup_content(self, backup: Dict[str, Any]) -> Dict[str, Any]: """ Decode backup content from base64 to readable format. TEXT types become readable strings, BINARY types remain base64. Args: backup: Backup object with 'bytes' field containing base64 data Returns: Backup object with 'content' field and 'content_type' indicator Note: If base64 decoding or UTF-8 decoding fails for TEXT backups, falls back to returning the original base64 content with content_type='base64' and logs a warning. """ if not backup.get('bytes'): return backup backup_copy = backup.copy() if backup.get('type') == 'TEXT': try: decoded = base64.b64decode(backup['bytes']).decode('utf-8') backup_copy['content'] = decoded backup_copy['content_type'] = 'text' except (binascii.Error, UnicodeDecodeError) as e: logger.warning(f"Failed to decode TEXT backup {backup.get('id', 'unknown')}: {e}") backup_copy['content'] = backup['bytes'] # Fallback to base64 backup_copy['content_type'] = 'base64' else: # BINARY type backup_copy['content'] = backup['bytes'] backup_copy['content_type'] = 'base64' # Remove original bytes field backup_copy.pop('bytes', None) return backup_copy def _process_backup_list(self, backups: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Process a list of backups to decode their content.""" return [self._decode_backup_content(backup) for backup in backups] def _process_backup_response(self, backup: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Process a single backup response to decode its content.""" if backup is None: return None return self._decode_backup_content(backup) def _get_paginated_data(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ Fetch all pages of data transparently. Args: endpoint: API endpoint to query params: Query parameters Returns: Complete list of all items across all pages """ all_items = [] page = 0 params = params or {} while True: current_params = {**params, 'page': page, 'size': 50} # Use max page size for efficiency response = self.session.get( self._build_url(endpoint), params=current_params, verify=self.verify_ssl, timeout=self.timeout ) data = self._handle_response(response) # Handle single item responses (no pagination) if 'data' not in data: return [data] if data else [] items = data['data'] if not isinstance(items, list): return [items] all_items.extend(items) # Check if we have more pages paginator = data.get('paginator', {}) current_page = paginator.get('page', 0) total_pages = paginator.get('totalPages', 1) if current_page >= total_pages - 1: # 0-based indexing break page += 1 return all_items def _get_single_item(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Fetch a single item from the API. Args: endpoint: API endpoint to query params: Query parameters Returns: Single item data """ response = self.session.get( self._build_url(endpoint), params=params, verify=self.verify_ssl, timeout=self.timeout ) data = self._handle_response(response) # Handle wrapped responses if 'data' in data: return data['data'] return data @unimus_cache(ttl_category='health') def get_health(self) -> Dict[str, str]: """ Get Unimus health status. Returns: Health status information containing 'status' field. Status can be: 'OK', 'LICENSING_UNREACHABLE', or 'ERROR' """ return self._get_single_item('health') @unimus_cache(ttl_category='devices') def get_devices(self, filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ Get all devices with optional filtering. Automatically handles pagination to return all results. Args: filters: Optional filters to apply. Common filters include: - managed: Boolean, filter by managed state - vendor: String, filter by vendor name - type: String, filter by device type - site_id: Number, filter by site ID - zone_id: String, filter by zone ID Returns: List of device objects """ return self._get_paginated_data('devices', filters) @unimus_cache(ttl_category='enhanced_metadata') # Use enhanced_metadata for enrich_metadata=True, device_metadata for basic def get_device_by_id( self, device_id: int, include_connections: Optional[bool] = None, # Backward compatibility include_attributes: Optional[List[str]] = None, # New flexible option enrich_metadata: bool = False # New Phase 2 feature ) -> Dict[str, Any]: """ Get a specific device by its ID with flexible attribute selection and enhanced metadata. Args: device_id: The numeric ID of the device include_connections: Whether to include connection information (backward compatibility) If None and include_attributes is None, defaults to True include_attributes: List of specific attributes to include. Options: - 'schedule' or 's': Include backup schedule information - 'connections' or 'c': Include connection details (SSH/TELNET) - None: Use include_connections parameter or default behavior enrich_metadata: Whether to add calculated metadata fields (Phase 2 feature) Adds: backupAge, lastSeen, configurationHealth, etc. Returns: Device object with detailed information and optional enriched metadata Raises: UnimusNotFoundError: If device with given ID doesn't exist UnimusValidationError: If device_id is invalid Examples: # Backward compatibility - include all (schedule + connections) device = client.get_device_by_id(123) device = client.get_device_by_id(123, include_connections=True) # New flexible selection - only schedule device = client.get_device_by_id(123, include_attributes=['schedule']) device = client.get_device_by_id(123, include_attributes=['s']) # New flexible selection - only connections device = client.get_device_by_id(123, include_attributes=['connections']) device = client.get_device_by_id(123, include_attributes=['c']) # New flexible selection - specific combination device = client.get_device_by_id(123, include_attributes=['schedule', 'connections']) # Basic device info only (no additional attributes) device = client.get_device_by_id(123, include_attributes=[]) device = client.get_device_by_id(123, include_connections=False) # Enhanced metadata with calculated fields (Phase 2) device = client.get_device_by_id(123, enrich_metadata=True) # Returns additional fields: backupAge, lastSeen, configurationHealth, etc. """ params = {} # Handle attribute selection with backward compatibility if include_attributes is not None: # New flexible attribute selection if include_attributes: # Non-empty list attr_parts = [] for attr in include_attributes: if attr.lower() in ('schedule', 's'): attr_parts.append('s') elif attr.lower() in ('connections', 'c'): attr_parts.append('c') else: logger.warning(f"Unknown attribute '{attr}' ignored. Valid options: 'schedule', 's', 'connections', 'c'") if attr_parts: # Remove duplicates while preserving order unique_attrs = [] for attr in attr_parts: if attr not in unique_attrs: unique_attrs.append(attr) params['attr'] = ','.join(unique_attrs) # Empty list means no additional attributes else: # Backward compatibility mode if include_connections is None: include_connections = True # Default behavior if include_connections: params['attr'] = 's,c' # schedule and connections device = self._get_single_item(f'devices/{device_id}', params) # Phase 2: Add enriched metadata if requested if enrich_metadata: device = self._enrich_device_metadata(device) return device def get_device_by_address(self, address: str, zone_id: Optional[str] = None) -> Dict[str, Any]: """ Get a device by its address (IP or hostname). Args: address: The device address (IP address or hostname) zone_id: Optional zone ID to limit search scope Returns: Device object Raises: UnimusNotFoundError: If device with given address doesn't exist """ params = {'attr': 's,c'} # Include schedule and connections if zone_id: params['zoneId'] = zone_id return self._get_single_item(f'devices/findByAddress/{address}', params) @unimus_cache(ttl_category='backups') def get_device_backups(self, device_id: int, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Get all backups for a specific device. Backups are returned in descending order by creation time (newest first). Args: device_id: The numeric ID of the device limit: Optional limit on number of results to return Returns: List of backup objects for the device, with decoded content Raises: UnimusNotFoundError: If device with given ID doesn't exist """ if limit: # Use single request with limit params = {'size': limit} response = self.session.get( self._build_url(f'devices/{device_id}/backups'), params=params, verify=self.verify_ssl, timeout=self.timeout ) data = self._handle_response(response) backups = data.get('data', []) return self._process_backup_list(backups) else: # Use paginated approach for all results backups = self._get_paginated_data(f'devices/{device_id}/backups') return self._process_backup_list(backups) def get_device_latest_backup(self, device_id: int) -> Optional[Dict[str, Any]]: """ Get the latest backup for a specific device. Args: device_id: The numeric ID of the device Returns: Latest backup object with decoded content or None if no backups exist Raises: UnimusNotFoundError: If device with given ID doesn't exist """ try: backup = self._get_single_item(f'devices/{device_id}/backups/latest') return self._process_backup_response(backup) except UnimusNotFoundError: # Device exists but has no backups return None def get_backup_by_id(self, backup_id: int) -> Dict[str, Any]: """ Get a specific backup by its ID. Args: backup_id: The numeric ID of the backup Returns: Backup object with decoded content Raises: UnimusNotFoundError: If backup with given ID doesn't exist UnimusError: For other API errors """ backup = self._get_single_item(f'backups/{backup_id}') return self._process_backup_response(backup) def get_devices_by_description(self, description: str, exact_match: bool = False, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Get devices by description content. **Performance Warning**: When exact_match=True, this function retrieves ALL devices that partially match the description from the API and filters them in Python to find exact matches. This is necessary because the Unimus API only supports partial matching. In environments with many devices, this can be inefficient and slow. Args: description: Description text to search for exact_match: If True, match description exactly. If False, partial match (default) **Warning: exact_match=True may be slow with large device inventories** limit: Optional limit on number of results to return. Recommended when using exact_match=True to improve performance Returns: List of device objects matching the description criteria Performance Tips: - Use partial matching (exact_match=False) when possible for better performance - Always specify a limit parameter when using exact_match=True - Consider filtering devices by other criteria first (vendor, type, zone) Examples: # Fast partial match devices = client.get_devices_by_description("router", exact_match=False) # Exact match with performance optimization devices = client.get_devices_by_description("Main Router", exact_match=True, limit=10) """ if exact_match: # Log performance warning when using exact_match without limit if limit is None: logger.warning( f"Performance warning: get_devices_by_description() called with exact_match=True " f"but no limit specified. This will fetch ALL devices matching '{description}' " f"and filter them in Python, which may be slow with large device inventories. " f"Consider adding a limit parameter for better performance." ) # For exact match, get all devices and filter manually # This is needed because the API only supports partial matching all_devices = self._get_paginated_data(f'devices/findByDescription/{description}') filtered_devices = [ device for device in all_devices if device.get('description', '') == description ] if limit: return filtered_devices[:limit] return filtered_devices else: # Use API's built-in partial matching if limit: # Use single request with limit params = {'size': limit} response = self.session.get( self._build_url(f'devices/findByDescription/{description}'), params=params, verify=self.verify_ssl, timeout=self.timeout ) data = self._handle_response(response) return data.get('data', []) else: # Use paginated approach for all results return self._get_paginated_data(f'devices/findByDescription/{description}') @unimus_cache(ttl_category='backups') def get_latest_backups(self, device_ids: List[int]) -> List[Dict[str, Any]]: """ Get the latest backups for multiple devices. Args: device_ids: List of device IDs to get backups for Returns: List of objects containing deviceId, address, and backup information with decoded content Raises: UnimusValidationError: If device_ids list is empty or contains invalid IDs """ if not device_ids: raise UnimusValidationError("device_ids list cannot be empty") if not all(isinstance(id, int) and id > 0 for id in device_ids): raise UnimusValidationError("All device_ids must be positive integers") # Convert list to comma-separated string as required by API device_ids_str = ','.join(map(str, device_ids)) params = {'id': device_ids_str} backup_responses = self._get_paginated_data('devices/backups/latest', params) # Process each backup response to decode content processed_responses = [] for response in backup_responses: processed_response = response.copy() if 'backup' in response and response['backup']: processed_response['backup'] = self._decode_backup_content(response['backup']) processed_responses.append(processed_response) return processed_responses def get_backup_diff(self, orig_id: int, rev_id: int) -> Dict[str, Any]: """ Get differences between two backup configurations. Args: orig_id: ID of the backup that will be considered as original rev_id: ID of the backup that will be considered as revised Returns: Dictionary containing diff information with added/removed/changed lines Raises: UnimusValidationError: If backup IDs are invalid UnimusNotFoundError: If one or both backups don't exist """ if not isinstance(orig_id, int) or orig_id <= 0: raise UnimusValidationError("orig_id must be a positive integer") if not isinstance(rev_id, int) or rev_id <= 0: raise UnimusValidationError("rev_id must be a positive integer") params = {'origId': orig_id, 'revId': rev_id} response = self.session.get( self._build_url('backups/diff'), params=params, verify=self.verify_ssl, timeout=self.timeout ) return self._handle_response(response) def get_devices_with_changed_backups(self, since: Optional[int] = None, until: Optional[int] = None) -> List[Dict[str, Any]]: """ Get devices that had backup changes within a specific time range. Args: since: Start of time range in seconds (unix timestamp). If None, defaults to 0 until: End of time range in seconds (unix timestamp). If None, defaults to current time Returns: List of device objects that had backup changes, each containing: - id: Device ID - createTime: Device creation time in seconds - address: Hostname, IPv4 or IPv6 address - description: Device description Raises: UnimusValidationError: If time range parameters are invalid Note: If no backups have been changed for any device in the time range, returns an empty list with HTTP 200 status. """ params = {} if since is not None: if not isinstance(since, int) or since < 0: raise UnimusValidationError("since must be a non-negative integer") params['since'] = since if until is not None: if not isinstance(until, int) or until < 0: raise UnimusValidationError("until must be a non-negative integer") params['until'] = until if since is not None and until is not None and since > until: raise UnimusValidationError("since must be less than or equal to until") return self._get_paginated_data('devices/findByChangedBackup', params) @unimus_cache(ttl_category='schedules') def get_schedules(self) -> List[Dict[str, Any]]: """ Get a list of all schedules in Unimus. Returns: List of schedule objects containing schedule information Raises: UnimusError: If the request fails """ return self._get_paginated_data('schedules') def get_schedule_by_id(self, schedule_id: int) -> Dict[str, Any]: """ Get an individual schedule by ID. Args: schedule_id: The schedule ID to retrieve Returns: Schedule object containing detailed schedule information Raises: UnimusValidationError: If schedule_id is invalid UnimusNotFoundError: If schedule doesn't exist """ if not isinstance(schedule_id, int) or schedule_id <= 0: raise UnimusValidationError("schedule_id must be a positive integer") response = self.session.get( self._build_url(f'schedules/{schedule_id}'), verify=self.verify_ssl, timeout=self.timeout ) return self._handle_response(response) @unimus_cache(ttl_category='backup_search') def search_backup_content( self, pattern: str, device_filters: Optional[Dict[str, Any]] = None, context_lines: int = 2, limit: Optional[int] = None, since: Optional[int] = None, until: Optional[int] = None ) -> List[Dict[str, Any]]: """ Search through backup configurations for specific patterns. **Performance Warning**: This function retrieves backup content from devices and searches through them client-side. For large device inventories, this can be resource-intensive. Use device_filters and limit parameters to optimize performance. Args: pattern: Regular expression pattern to search for device_filters: Optional filters to limit device scope: - vendor: String, filter by vendor name (e.g., "Cisco", "MikroTik") - type: String, filter by device type (e.g., "IOS", "RouterOS") - managed: Boolean, filter by managed state - site_id: Number, filter by site ID - zone_id: String, filter by zone ID context_lines: Number of context lines to include around matches (default: 2) limit: Optional limit on number of devices to search (recommended for performance) since: Start time range for backup filtering (unix timestamp) until: End time range for backup filtering (unix timestamp) Returns: List of search result objects, each containing: - device: Device information (id, address, description, vendor, type) - backup: Backup information (id, validSince, validUntil, type) - matches: List of pattern matches with: - line_number: Line number of the match - line_content: Content of the matching line - context_before: Lines before the match - context_after: Lines after the match - match_groups: Regex capture groups if any Raises: UnimusValidationError: If pattern is invalid regex or parameters are invalid UnimusError: If API requests fail Performance Tips: - Use specific device_filters to reduce search scope - Set a reasonable limit parameter for large inventories - Use time filters (since/until) to limit backup age - Consider partial matches before exact regex patterns Examples: # Search for interface configurations results = client.search_backup_content( pattern=r"interface GigabitEthernet\d+/\d+", device_filters={"vendor": "Cisco"}, limit=50 ) # Search for VLAN configurations with context results = client.search_backup_content( pattern=r"vlan \d+", context_lines=3, device_filters={"type": "IOS"} ) """ # Validate regex pattern try: compiled_pattern = re.compile(pattern, re.IGNORECASE | re.MULTILINE) except re.error as e: raise UnimusValidationError(f"Invalid regex pattern: {str(e)}") # Validate parameters if context_lines < 0: raise UnimusValidationError("context_lines must be non-negative") if limit is not None and (not isinstance(limit, int) or limit <= 0): raise UnimusValidationError("limit must be a positive integer") # Log performance warning for unrestricted searches if not device_filters and limit is None: logger.warning( f"Performance warning: search_backup_content() called without device_filters " f"or limit parameters. This will search through ALL devices and may be slow " f"with large inventories. Consider adding device_filters or limit for better performance." ) # Get filtered devices devices = self.get_devices(device_filters) if limit: devices = devices[:limit] if not devices: logger.info("No devices found matching the specified filters") return [] logger.info(f"Searching backup content for {len(devices)} devices with pattern: {pattern}") search_results = [] devices_searched = 0 devices_with_matches = 0 for device in devices: device_id = device['id'] devices_searched += 1 try: # Get latest backup for this device latest_backup = self.get_device_latest_backup(device_id) if not latest_backup: logger.debug(f"Device {device_id} ({device.get('address', 'unknown')}) has no backups") continue # Skip if backup doesn't match time filters if since is not None or until is not None: backup_time = latest_backup.get('validSince', 0) if since is not None and backup_time < since: continue if until is not None and backup_time > until: continue # Only search TEXT backups with decoded content if latest_backup.get('type') != 'TEXT' or not latest_backup.get('content'): logger.debug(f"Device {device_id} backup is not searchable (type: {latest_backup.get('type', 'unknown')})") continue backup_content = latest_backup['content'] lines = backup_content.split('\n') # Search for pattern matches device_matches = [] for line_num, line in enumerate(lines, 1): match = compiled_pattern.search(line) if match: # Get context lines start_idx = max(0, line_num - 1 - context_lines) end_idx = min(len(lines), line_num + context_lines) context_before = lines[start_idx:line_num-1] if line_num > 1 else [] context_after = lines[line_num:end_idx] if line_num < len(lines) else [] match_info = { 'line_number': line_num, 'line_content': line, 'context_before': context_before, 'context_after': context_after, 'match_groups': match.groups() if match.groups() else [] } device_matches.append(match_info) if device_matches: devices_with_matches += 1 search_result = { 'device': { 'id': device['id'], 'address': device.get('address', ''), 'description': device.get('description', ''), 'vendor': device.get('vendor', ''), 'type': device.get('type', ''), 'model': device.get('model', '') }, 'backup': { 'id': latest_backup['id'], 'validSince': latest_backup.get('validSince'), 'validUntil': latest_backup.get('validUntil'), 'type': latest_backup['type'] }, 'matches': device_matches } search_results.append(search_result) except UnimusNotFoundError: logger.debug(f"Device {device_id} not found or not accessible") continue except Exception as e: logger.warning(f"Error searching device {device_id} ({device.get('address', 'unknown')}): {str(e)}") continue logger.info( f"Backup content search completed: {devices_searched} devices searched, " f"{devices_with_matches} devices with matches, {len(search_results)} total results" ) return search_results def validate_connection(self) -> bool: """ Validate that the connection to Unimus is working. Returns: True if connection is successful, False otherwise """ try: health = self.get_health() return health.get('status') in ['OK', 'LICENSING_UNREACHABLE'] except Exception as e: logger.error(f"Connection validation failed: {e}") return False def _enrich_device_metadata(self, device: Dict[str, Any]) -> Dict[str, Any]: """ Enrich device data with calculated metadata fields (Phase 2 feature). Adds computed fields such as backup health, timing information, and connectivity status to the device object. Args: device: Base device object from API Returns: Device object with additional enriched metadata fields """ enriched_device = device.copy() current_time = int(time.time()) device_id = device.get('id') if not device_id: logger.warning("Cannot enrich metadata: device ID missing") return enriched_device try: # Backup-related metadata backup_metadata = self._calculate_backup_metadata(device_id, current_time) enriched_device.update(backup_metadata) # Connectivity and timing metadata connectivity_metadata = self._calculate_connectivity_metadata(device, current_time) enriched_device.update(connectivity_metadata) # Configuration health metadata config_metadata = self._calculate_configuration_metadata(device_id, current_time) enriched_device.update(config_metadata) # Add metadata generation timestamp enriched_device['_metadata'] = { 'enriched_at': current_time, 'enrichment_version': '0.5.0-phase2' } except Exception as e: logger.warning(f"Failed to enrich metadata for device {device_id}: {e}") # Add error indicator but don't fail the request enriched_device['_metadata_error'] = str(e) return enriched_device def _calculate_backup_metadata(self, device_id: int, current_time: int) -> Dict[str, Any]: """Calculate backup-related metadata.""" metadata = {} try: # Get latest backup for timing analysis latest_backup = self.get_device_latest_backup(device_id) if latest_backup: valid_since = latest_backup.get('validSince', 0) valid_until = latest_backup.get('validUntil') # Calculate backup age metadata['backupAge'] = current_time - valid_since if valid_since else None metadata['lastBackupTime'] = valid_since metadata['lastBackupId'] = latest_backup.get('id') metadata['backupType'] = latest_backup.get('type') # Backup freshness indicator if metadata['backupAge']: if metadata['backupAge'] < 86400: # 1 day metadata['backupFreshness'] = 'fresh' elif metadata['backupAge'] < 604800: # 1 week metadata['backupFreshness'] = 'recent' elif metadata['backupAge'] < 2592000: # 1 month metadata['backupFreshness'] = 'aging' else: metadata['backupFreshness'] = 'stale' # Configuration validity period if valid_until: metadata['configurationStabilityPeriod'] = valid_until - valid_since else: metadata['configurationStabilityPeriod'] = current_time - valid_since else: metadata['backupAge'] = None metadata['lastBackupTime'] = None metadata['backupFreshness'] = 'none' # Get backup count (limited to avoid performance issues) backups = self.get_device_backups(device_id, limit=50) metadata['backupCount'] = len(backups) metadata['recentBackupCount'] = len(backups) # Limited sample except Exception as e: logger.debug(f"Failed to calculate backup metadata for device {device_id}: {e}") metadata['backupAge'] = None metadata['backupCount'] = 0 return metadata def _calculate_connectivity_metadata(self, device: Dict[str, Any], current_time: int) -> Dict[str, Any]: """Calculate connectivity and status metadata.""" metadata = {} # Basic status analysis last_job_status = device.get('lastJobStatus', 'UNKNOWN') metadata['lastJobStatus'] = last_job_status metadata['deviceHealth'] = self._determine_device_health(last_job_status) # Connection type analysis connections = device.get('connections', []) if connections: connection_types = [conn.get('type') for conn in connections if conn.get('type')] metadata['connectionTypes'] = list(set(connection_types)) metadata['primaryConnectionType'] = connection_types[0] if connection_types else None # Port analysis ports = [conn.get('port') for conn in connections if conn.get('port')] metadata['connectionPorts'] = ports else: metadata['connectionTypes'] = [] metadata['primaryConnectionType'] = None metadata['connectionPorts'] = [] # Device age analysis create_time = device.get('createTime') if create_time: device_age = current_time - create_time metadata['deviceAge'] = device_age # Device lifecycle stage if device_age < 86400: # 1 day metadata['deviceLifecycle'] = 'new' elif device_age < 2592000: # 1 month metadata['deviceLifecycle'] = 'active' elif device_age < 31536000: # 1 year metadata['deviceLifecycle'] = 'established' else: metadata['deviceLifecycle'] = 'mature' else: metadata['deviceAge'] = None metadata['deviceLifecycle'] = 'unknown' return metadata def _calculate_configuration_metadata(self, device_id: int, current_time: int) -> Dict[str, Any]: """Calculate configuration health and change metadata.""" metadata = {} try: # Get recent configuration changes (last 30 days) thirty_days_ago = current_time - (30 * 24 * 60 * 60) changed_devices = self.get_devices_with_changed_backups(since=thirty_days_ago) # Check if this device has recent changes device_has_changes = any(d.get('id') == device_id for d in changed_devices) metadata['hasRecentChanges'] = device_has_changes if device_has_changes: # Find this device in the changed devices list device_change_info = next((d for d in changed_devices if d.get('id') == device_id), {}) backups_in_period = device_change_info.get('backups', []) metadata['recentChangeCount'] = len(backups_in_period) # Calculate change frequency if len(backups_in_period) > 1: change_frequency = len(backups_in_period) / 30 # changes per day metadata['changeFrequency'] = round(change_frequency, 3) if change_frequency > 1: metadata['configurationStability'] = 'unstable' elif change_frequency > 0.1: metadata['configurationStability'] = 'moderate' else: metadata['configurationStability'] = 'stable' else: metadata['changeFrequency'] = 0 metadata['configurationStability'] = 'stable' else: metadata['recentChangeCount'] = 0 metadata['changeFrequency'] = 0 metadata['configurationStability'] = 'stable' except Exception as e: logger.debug(f"Failed to calculate configuration metadata for device {device_id}: {e}") metadata['hasRecentChanges'] = None metadata['configurationStability'] = 'unknown' return metadata def _determine_device_health(self, last_job_status: str) -> str: """Determine overall device health based on last job status.""" status_mapping = { 'SUCCESSFUL': 'healthy', 'FAILED': 'unhealthy', 'UNKNOWN': 'unknown' } return status_mapping.get(last_job_status, 'unknown') @unimus_cache(ttl_category='relationships') def get_device_relationships( self, device_id: int, include_network_neighbors: bool = True, include_zone_peers: bool = True, include_connection_analysis: bool = True ) -> Dict[str, Any]: """ Analyze and discover device relationships and topology connections (Phase 3 feature). Discovers device relationships through: - Network topology analysis (IP ranges, subnets) - Zone-based device groupings - Connection pattern analysis - Configuration-based neighbor discovery Args: device_id: The device ID to analyze relationships for include_network_neighbors: Whether to discover network neighbors via IP analysis include_zone_peers: Whether to include devices in the same zone include_connection_analysis: Whether to analyze connection patterns Returns: Dictionary containing relationship analysis: - device: Base device information - networkNeighbors: Devices in same network segments - zonePeers: Devices in same zone - connectionPatterns: Analysis of connection methods and paths - topologyInsights: Network topology analysis - relationshipMetadata: Calculated relationship metrics Raises: UnimusNotFoundError: If device with given ID doesn't exist """ try: # Get the target device with full details target_device = self.get_device_by_id(device_id, include_attributes=['connections'], enrich_metadata=True) relationships = { 'device': target_device, 'networkNeighbors': [], 'zonePeers': [], 'connectionPatterns': {}, 'topologyInsights': {}, 'relationshipMetadata': {} } # Get all devices for relationship analysis all_devices = self.get_devices() if include_network_neighbors: relationships['networkNeighbors'] = self._discover_network_neighbors(target_device, all_devices) if include_zone_peers: relationships['zonePeers'] = self._discover_zone_peers(target_device, all_devices) if include_connection_analysis: relationships['connectionPatterns'] = self._analyze_connection_patterns(target_device, all_devices) # Generate topology insights relationships['topologyInsights'] = self._generate_topology_insights(target_device, relationships) # Calculate relationship metadata relationships['relationshipMetadata'] = self._calculate_relationship_metadata(relationships) return relationships except UnimusNotFoundError: raise except Exception as e: raise UnimusError(f"Failed to analyze device relationships: {str(e)}") def _discover_network_neighbors(self, target_device: Dict[str, Any], all_devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Discover devices in the same network segments.""" import ipaddress neighbors = [] target_address = target_device.get('address') if not target_address: return neighbors try: # Try to parse as IP address for network analysis target_ip = ipaddress.ip_address(target_address) for device in all_devices: if device.get('id') == target_device.get('id'): continue # Skip self device_address = device.get('address') if not device_address: continue try: device_ip = ipaddress.ip_address(device_address) # Check if in same network (different subnet assumptions) network_relationship = self._analyze_network_relationship(target_ip, device_ip) if network_relationship['is_related']: neighbor_info = { 'device': { 'id': device.get('id'), 'address': device.get('address'), 'description': device.get('description'), 'vendor': device.get('vendor'), 'type': device.get('type'), 'managed': device.get('managed') }, 'networkRelationship': network_relationship } neighbors.append(neighbor_info) except ValueError: # Not a valid IP address, skip network analysis continue except ValueError: # Target address is not a valid IP, cannot do network analysis logger.debug(f"Target device address '{target_address}' is not a valid IP for network analysis") return neighbors def _analyze_network_relationship(self, ip1: 'ipaddress.IPv4Address', ip2: 'ipaddress.IPv4Address') -> Dict[str, Any]: """Analyze the network relationship between two IP addresses.""" import ipaddress relationship = { 'is_related': False, 'relationship_type': 'unrelated', 'subnet_mask': None, 'network_distance': None } # Check common subnet masks common_subnets = [24, 25, 26, 27, 28, 16, 20, 22] for subnet_bits in common_subnets: try: # Create networks with different subnet masks network1 = ipaddress.ip_network(f"{ip1}/{subnet_bits}", strict=False) network2 = ipaddress.ip_network(f"{ip2}/{subnet_bits}", strict=False) if network1 == network2: relationship['is_related'] = True relationship['relationship_type'] = f"same_subnet_/{subnet_bits}" relationship['subnet_mask'] = subnet_bits # Calculate network "distance" (difference in host portion) host_bits = 32 - subnet_bits max_hosts = 2 ** host_bits host1 = int(ip1) & ((1 << host_bits) - 1) host2 = int(ip2) & ((1 << host_bits) - 1) relationship['network_distance'] = abs(host1 - host2) break # Use the most specific subnet match except ValueError: continue return relationship def _discover_zone_peers(self, target_device: Dict[str, Any], all_devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Discover devices in the same zone.""" zone_peers = [] target_zone_id = target_device.get('zoneId') if not target_zone_id: return zone_peers for device in all_devices: if device.get('id') == target_device.get('id'): continue # Skip self if device.get('zoneId') == target_zone_id: peer_info = { 'device': { 'id': device.get('id'), 'address': device.get('address'), 'description': device.get('description'), 'vendor': device.get('vendor'), 'type': device.get('type'), 'managed': device.get('managed'), 'lastJobStatus': device.get('lastJobStatus') }, 'zoneRelationship': { 'shared_zone_id': target_zone_id, 'relationship_type': 'zone_peer' } } zone_peers.append(peer_info) return zone_peers def _analyze_connection_patterns(self, target_device: Dict[str, Any], all_devices: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze connection patterns and methods.""" patterns = { 'target_connections': [], 'similar_connection_devices': [], 'connection_statistics': {}, 'security_analysis': {} } target_connections = target_device.get('connections', []) patterns['target_connections'] = target_connections if not target_connections: return patterns # Analyze target device connections target_connection_types = [conn.get('type') for conn in target_connections] target_ports = [conn.get('port') for conn in target_connections if conn.get('port')] similar_devices = [] connection_type_counts = {} port_usage_counts = {} for device in all_devices: if device.get('id') == target_device.get('id'): continue device_connections = device.get('connections', []) if not device_connections: continue device_connection_types = [conn.get('type') for conn in device_connections] device_ports = [conn.get('port') for conn in device_connections if conn.get('port')] # Count connection types for conn_type in device_connection_types: connection_type_counts[conn_type] = connection_type_counts.get(conn_type, 0) + 1 # Count port usage for port in device_ports: port_usage_counts[port] = port_usage_counts.get(port, 0) + 1 # Find devices with similar connection patterns if any(ct in target_connection_types for ct in device_connection_types): similarity_score = len(set(target_connection_types) & set(device_connection_types)) / len(set(target_connection_types + device_connection_types)) similar_device = { 'device': { 'id': device.get('id'), 'address': device.get('address'), 'description': device.get('description'), 'vendor': device.get('vendor'), 'type': device.get('type') }, 'connection_similarity': { 'similarity_score': round(similarity_score, 3), 'shared_connection_types': list(set(target_connection_types) & set(device_connection_types)), 'shared_ports': list(set(target_ports) & set(device_ports)) } } similar_devices.append(similar_device) # Sort by similarity score similar_devices.sort(key=lambda x: x['connection_similarity']['similarity_score'], reverse=True) patterns['similar_connection_devices'] = similar_devices[:10] # Top 10 most similar # Connection statistics patterns['connection_statistics'] = { 'connection_type_distribution': connection_type_counts, 'port_usage_distribution': port_usage_counts, 'total_devices_analyzed': len(all_devices) } # Security analysis patterns['security_analysis'] = self._analyze_connection_security(target_connections, connection_type_counts) return patterns def _analyze_connection_security(self, target_connections: List[Dict[str, Any]], type_distribution: Dict[str, int]) -> Dict[str, Any]: """Analyze connection security patterns.""" security_analysis = { 'security_score': 'unknown', 'secure_protocols': 0, 'insecure_protocols': 0, 'recommendations': [] } secure_protocols = ['SSH'] insecure_protocols = ['TELNET'] for connection in target_connections: conn_type = connection.get('type', '').upper() if conn_type in secure_protocols: security_analysis['secure_protocols'] += 1 elif conn_type in insecure_protocols: security_analysis['insecure_protocols'] += 1 total_connections = len(target_connections) if total_connections > 0: secure_ratio = security_analysis['secure_protocols'] / total_connections if secure_ratio >= 1.0: security_analysis['security_score'] = 'excellent' elif secure_ratio >= 0.8: security_analysis['security_score'] = 'good' elif secure_ratio >= 0.5: security_analysis['security_score'] = 'moderate' else: security_analysis['security_score'] = 'poor' if security_analysis['insecure_protocols'] > 0: security_analysis['recommendations'].append("Consider migrating from TELNET to SSH for better security") # Analyze against network-wide patterns total_ssh = type_distribution.get('SSH', 0) total_telnet = type_distribution.get('TELNET', 0) if total_telnet > total_ssh: security_analysis['recommendations'].append("Network has more TELNET than SSH connections - consider security audit") return security_analysis def _generate_topology_insights(self, target_device: Dict[str, Any], relationships: Dict[str, Any]) -> Dict[str, Any]: """Generate high-level topology insights.""" insights = { 'network_position': 'unknown', 'connectivity_role': 'unknown', 'zone_importance': 'unknown', 'topology_metrics': {} } # Analyze network position neighbor_count = len(relationships.get('networkNeighbors', [])) zone_peer_count = len(relationships.get('zonePeers', [])) if neighbor_count > 10: insights['network_position'] = 'highly_connected' elif neighbor_count > 5: insights['network_position'] = 'moderately_connected' elif neighbor_count > 0: insights['network_position'] = 'lightly_connected' else: insights['network_position'] = 'isolated' # Analyze connectivity role connection_patterns = relationships.get('connectionPatterns', {}) similar_devices_count = len(connection_patterns.get('similar_connection_devices', [])) if similar_devices_count > 20: insights['connectivity_role'] = 'infrastructure_hub' elif similar_devices_count > 10: insights['connectivity_role'] = 'network_node' elif similar_devices_count > 0: insights['connectivity_role'] = 'endpoint' else: insights['connectivity_role'] = 'standalone' # Analyze zone importance if zone_peer_count > 50: insights['zone_importance'] = 'critical' elif zone_peer_count > 20: insights['zone_importance'] = 'important' elif zone_peer_count > 5: insights['zone_importance'] = 'moderate' else: insights['zone_importance'] = 'minimal' # Topology metrics insights['topology_metrics'] = { 'network_neighbor_count': neighbor_count, 'zone_peer_count': zone_peer_count, 'connection_similarity_count': similar_devices_count, 'total_relationships': neighbor_count + zone_peer_count } return insights def _calculate_relationship_metadata(self, relationships: Dict[str, Any]) -> Dict[str, Any]: """Calculate comprehensive relationship metadata.""" import time metadata = { 'analysis_timestamp': int(time.time()), 'analysis_version': '0.5.0-phase3', 'relationship_summary': {}, 'network_analysis': {}, 'zone_analysis': {}, 'connectivity_analysis': {} } # Relationship summary network_neighbors = relationships.get('networkNeighbors', []) zone_peers = relationships.get('zonePeers', []) connection_patterns = relationships.get('connectionPatterns', {}) metadata['relationship_summary'] = { 'total_network_neighbors': len(network_neighbors), 'total_zone_peers': len(zone_peers), 'total_connection_similarities': len(connection_patterns.get('similar_connection_devices', [])), 'has_relationships': len(network_neighbors) > 0 or len(zone_peers) > 0 } # Network analysis metadata if network_neighbors: subnet_types = [neighbor.get('networkRelationship', {}).get('relationship_type') for neighbor in network_neighbors] most_common_subnet = max(set(subnet_types), key=subnet_types.count) if subnet_types else None metadata['network_analysis'] = { 'most_common_subnet_relationship': most_common_subnet, 'subnet_diversity': len(set(subnet_types)), 'average_network_distance': sum( neighbor.get('networkRelationship', {}).get('network_distance', 0) for neighbor in network_neighbors ) / len(network_neighbors) if network_neighbors else 0 } # Zone analysis metadata if zone_peers: zone_vendors = [peer.get('device', {}).get('vendor') for peer in zone_peers] zone_types = [peer.get('device', {}).get('type') for peer in zone_peers] metadata['zone_analysis'] = { 'vendor_diversity': len(set(filter(None, zone_vendors))), 'type_diversity': len(set(filter(None, zone_types))), 'managed_percentage': ( sum(1 for peer in zone_peers if peer.get('device', {}).get('managed')) / len(zone_peers) * 100 ) if zone_peers else 0 } # Connectivity analysis metadata connection_stats = connection_patterns.get('connection_statistics', {}) security_analysis = connection_patterns.get('security_analysis', {}) metadata['connectivity_analysis'] = { 'connection_type_count': len(connection_stats.get('connection_type_distribution', {})), 'security_score': security_analysis.get('security_score', 'unknown'), 'has_security_recommendations': len(security_analysis.get('recommendations', [])) > 0 } return metadata @unimus_cache(ttl_category='topology') def get_network_topology_analysis( self, zone_id: Optional[str] = None, include_clusters: bool = True, include_security_analysis: bool = True ) -> Dict[str, Any]: """ Analyze network topology and device clusters across the entire infrastructure (Phase 3 feature). Provides comprehensive network-wide topology analysis including: - Device cluster identification and mapping - Network segment analysis - Zone-based topology insights - Security pattern analysis across the infrastructure Args: zone_id: Optional zone ID to limit analysis scope (default: analyze all zones) include_clusters: Whether to perform device clustering analysis include_security_analysis: Whether to include security pattern analysis Returns: Dictionary containing comprehensive topology analysis: - networkOverview: High-level network statistics and metrics - deviceClusters: Identified device clusters and groupings - networkSegments: Discovered network segments and subnets - zoneTopology: Zone-based topology analysis - securityPatterns: Network-wide security analysis - topologyMetadata: Analysis metadata and insights Raises: UnimusError: If topology analysis fails """ try: # Get all devices for analysis device_filters = {'zoneId': zone_id} if zone_id else {} all_devices = self.get_devices(device_filters) topology_analysis = { 'networkOverview': {}, 'deviceClusters': [], 'networkSegments': [], 'zoneTopology': {}, 'securityPatterns': {}, 'topologyMetadata': {} } # Generate network overview topology_analysis['networkOverview'] = self._generate_network_overview(all_devices) if include_clusters: topology_analysis['deviceClusters'] = self._identify_device_clusters(all_devices) topology_analysis['networkSegments'] = self._analyze_network_segments(all_devices) # Zone topology analysis topology_analysis['zoneTopology'] = self._analyze_zone_topology(all_devices) if include_security_analysis: topology_analysis['securityPatterns'] = self._analyze_network_security_patterns(all_devices) # Generate topology metadata topology_analysis['topologyMetadata'] = self._generate_topology_metadata(topology_analysis, all_devices) return topology_analysis except Exception as e: raise UnimusError(f"Failed to analyze network topology: {str(e)}") def _generate_network_overview(self, devices: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate high-level network overview statistics.""" overview = { 'total_devices': len(devices), 'managed_devices': 0, 'vendor_distribution': {}, 'type_distribution': {}, 'zone_distribution': {}, 'connection_distribution': {}, 'health_distribution': {} } for device in devices: # Count managed devices if device.get('managed'): overview['managed_devices'] += 1 # Vendor distribution vendor = device.get('vendor') if vendor: overview['vendor_distribution'][vendor] = overview['vendor_distribution'].get(vendor, 0) + 1 # Type distribution device_type = device.get('type') if device_type: overview['type_distribution'][device_type] = overview['type_distribution'].get(device_type, 0) + 1 # Zone distribution zone_id = device.get('zoneId') if zone_id: overview['zone_distribution'][zone_id] = overview['zone_distribution'].get(zone_id, 0) + 1 # Health distribution health_status = device.get('lastJobStatus', 'UNKNOWN') overview['health_distribution'][health_status] = overview['health_distribution'].get(health_status, 0) + 1 # Connection distribution (requires connections data) connections = device.get('connections', []) for connection in connections: conn_type = connection.get('type') if conn_type: overview['connection_distribution'][conn_type] = overview['connection_distribution'].get(conn_type, 0) + 1 # Calculate percentages total = overview['total_devices'] if total > 0: overview['managed_percentage'] = round((overview['managed_devices'] / total) * 100, 1) else: overview['managed_percentage'] = 0 return overview def _identify_device_clusters(self, devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Identify device clusters based on various criteria.""" import ipaddress clusters = [] # Group devices by network clusters network_clusters = {} vendor_clusters = {} zone_clusters = {} for device in devices: device_address = device.get('address') vendor = device.get('vendor') zone_id = device.get('zoneId') # Network-based clustering if device_address: try: ip = ipaddress.ip_address(device_address) # Use /24 subnet for clustering network = ipaddress.ip_network(f"{ip}/24", strict=False) network_key = str(network) if network_key not in network_clusters: network_clusters[network_key] = { 'cluster_type': 'network_subnet', 'cluster_id': network_key, 'devices': [] } network_clusters[network_key]['devices'].append({ 'id': device.get('id'), 'address': device.get('address'), 'description': device.get('description'), 'vendor': device.get('vendor'), 'type': device.get('type') }) except ValueError: # Not a valid IP address pass # Vendor-based clustering if vendor: if vendor not in vendor_clusters: vendor_clusters[vendor] = { 'cluster_type': 'vendor_group', 'cluster_id': vendor, 'devices': [] } vendor_clusters[vendor]['devices'].append({ 'id': device.get('id'), 'address': device.get('address'), 'description': device.get('description'), 'vendor': device.get('vendor'), 'type': device.get('type') }) # Zone-based clustering if zone_id: if zone_id not in zone_clusters: zone_clusters[zone_id] = { 'cluster_type': 'zone_group', 'cluster_id': zone_id, 'devices': [] } zone_clusters[zone_id]['devices'].append({ 'id': device.get('id'), 'address': device.get('address'), 'description': device.get('description'), 'vendor': device.get('vendor'), 'type': device.get('type') }) # Add clusters with meaningful device counts for cluster in network_clusters.values(): if len(cluster['devices']) >= 2: # At least 2 devices to form a cluster cluster['device_count'] = len(cluster['devices']) clusters.append(cluster) for cluster in vendor_clusters.values(): if len(cluster['devices']) >= 3: # At least 3 devices to form a vendor cluster cluster['device_count'] = len(cluster['devices']) clusters.append(cluster) for cluster in zone_clusters.values(): if len(cluster['devices']) >= 2: # At least 2 devices to form a zone cluster cluster['device_count'] = len(cluster['devices']) clusters.append(cluster) # Sort clusters by device count clusters.sort(key=lambda x: x['device_count'], reverse=True) return clusters def _analyze_network_segments(self, devices: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Analyze network segments and subnets.""" import ipaddress segments = {} for device in devices: device_address = device.get('address') if not device_address: continue try: ip = ipaddress.ip_address(device_address) # Analyze different subnet sizes for subnet_bits in [16, 20, 22, 24, 25, 26, 27, 28]: network = ipaddress.ip_network(f"{ip}/{subnet_bits}", strict=False) network_key = f"{network}_{subnet_bits}" if network_key not in segments: segments[network_key] = { 'network': str(network), 'subnet_mask': subnet_bits, 'device_count': 0, 'devices': [], 'vendor_diversity': set(), 'type_diversity': set() } segments[network_key]['device_count'] += 1 segments[network_key]['devices'].append({ 'id': device.get('id'), 'address': device.get('address'), 'description': device.get('description') }) if device.get('vendor'): segments[network_key]['vendor_diversity'].add(device.get('vendor')) if device.get('type'): segments[network_key]['type_diversity'].add(device.get('type')) except ValueError: # Not a valid IP address continue # Convert to list and filter meaningful segments segment_list = [] for segment_data in segments.values(): if segment_data['device_count'] >= 2: # At least 2 devices segment_data['vendor_diversity'] = len(segment_data['vendor_diversity']) segment_data['type_diversity'] = len(segment_data['type_diversity']) segment_list.append(segment_data) # Sort by device count segment_list.sort(key=lambda x: x['device_count'], reverse=True) return segment_list[:20] # Return top 20 segments def _analyze_zone_topology(self, devices: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze topology patterns within and across zones.""" zone_analysis = {} zones = {} for device in devices: zone_id = device.get('zoneId') if not zone_id: continue if zone_id not in zones: zones[zone_id] = { 'zone_id': zone_id, 'device_count': 0, 'managed_count': 0, 'vendors': set(), 'types': set(), 'health_status': {}, 'connections': {} } zone_data = zones[zone_id] zone_data['device_count'] += 1 if device.get('managed'): zone_data['managed_count'] += 1 if device.get('vendor'): zone_data['vendors'].add(device.get('vendor')) if device.get('type'): zone_data['types'].add(device.get('type')) # Health status distribution health = device.get('lastJobStatus', 'UNKNOWN') zone_data['health_status'][health] = zone_data['health_status'].get(health, 0) + 1 # Connection analysis connections = device.get('connections', []) for connection in connections: conn_type = connection.get('type') if conn_type: zone_data['connections'][conn_type] = zone_data['connections'].get(conn_type, 0) + 1 # Convert sets to counts and calculate percentages for zone_id, zone_data in zones.items(): zone_data['vendor_diversity'] = len(zone_data['vendors']) zone_data['type_diversity'] = len(zone_data['types']) zone_data['managed_percentage'] = round( (zone_data['managed_count'] / zone_data['device_count']) * 100, 1 ) if zone_data['device_count'] > 0 else 0 # Remove sets (not JSON serializable) del zone_data['vendors'] del zone_data['types'] zone_analysis['zones'] = zones zone_analysis['total_zones'] = len(zones) zone_analysis['total_devices'] = sum(z['device_count'] for z in zones.values()) return zone_analysis def _analyze_network_security_patterns(self, devices: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze security patterns across the network.""" security_patterns = { 'connection_security': {}, 'protocol_distribution': {}, 'security_recommendations': [], 'risk_assessment': {} } total_connections = 0 secure_connections = 0 insecure_connections = 0 protocol_counts = {} for device in devices: connections = device.get('connections', []) for connection in connections: total_connections += 1 conn_type = connection.get('type', '').upper() # Count protocols protocol_counts[conn_type] = protocol_counts.get(conn_type, 0) + 1 # Security classification if conn_type == 'SSH': secure_connections += 1 elif conn_type == 'TELNET': insecure_connections += 1 # Calculate security metrics if total_connections > 0: security_patterns['connection_security'] = { 'total_connections': total_connections, 'secure_connections': secure_connections, 'insecure_connections': insecure_connections, 'secure_percentage': round((secure_connections / total_connections) * 100, 1), 'insecure_percentage': round((insecure_connections / total_connections) * 100, 1) } security_patterns['protocol_distribution'] = protocol_counts # Generate recommendations if total_connections > 0: insecure_ratio = insecure_connections / total_connections if insecure_ratio > 0.5: security_patterns['security_recommendations'].append( "CRITICAL: More than 50% of connections use insecure protocols (TELNET). Immediate migration to SSH recommended." ) elif insecure_ratio > 0.25: security_patterns['security_recommendations'].append( "WARNING: More than 25% of connections use insecure protocols. Plan migration to SSH." ) elif insecure_ratio > 0: security_patterns['security_recommendations'].append( "INFO: Some TELNET connections detected. Consider migrating remaining devices to SSH." ) else: security_patterns['security_recommendations'].append( "EXCELLENT: All connections use secure protocols (SSH)." ) # Risk assessment if total_connections > 0: if insecure_ratio > 0.5: risk_level = 'HIGH' elif insecure_ratio > 0.25: risk_level = 'MEDIUM' elif insecure_ratio > 0: risk_level = 'LOW' else: risk_level = 'MINIMAL' security_patterns['risk_assessment'] = { 'overall_risk_level': risk_level, 'insecure_device_ratio': round(insecure_ratio * 100, 1), 'security_score': round((secure_connections / total_connections) * 100, 1) } return security_patterns def _generate_topology_metadata(self, topology_analysis: Dict[str, Any], devices: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate comprehensive topology analysis metadata.""" import time metadata = { 'analysis_timestamp': int(time.time()), 'analysis_version': '0.5.0-phase3', 'devices_analyzed': len(devices), 'analysis_scope': {}, 'cluster_insights': {}, 'network_insights': {}, 'security_insights': {} } # Analysis scope overview = topology_analysis.get('networkOverview', {}) metadata['analysis_scope'] = { 'total_devices': overview.get('total_devices', 0), 'managed_devices': overview.get('managed_devices', 0), 'zones_analyzed': len(overview.get('zone_distribution', {})), 'vendors_present': len(overview.get('vendor_distribution', {})), 'device_types_present': len(overview.get('type_distribution', {})) } # Cluster insights clusters = topology_analysis.get('deviceClusters', []) if clusters: cluster_types = [cluster.get('cluster_type') for cluster in clusters] largest_cluster = max(clusters, key=lambda x: x.get('device_count', 0)) metadata['cluster_insights'] = { 'total_clusters': len(clusters), 'cluster_types': list(set(cluster_types)), 'largest_cluster_size': largest_cluster.get('device_count', 0), 'largest_cluster_type': largest_cluster.get('cluster_type'), 'average_cluster_size': round( sum(cluster.get('device_count', 0) for cluster in clusters) / len(clusters), 1 ) if clusters else 0 } # Network insights segments = topology_analysis.get('networkSegments', []) if segments: subnet_masks = [segment.get('subnet_mask') for segment in segments] most_common_mask = max(set(subnet_masks), key=subnet_masks.count) if subnet_masks else None metadata['network_insights'] = { 'network_segments_discovered': len(segments), 'most_common_subnet_mask': most_common_mask, 'largest_segment_size': max(segment.get('device_count', 0) for segment in segments), 'average_segment_size': round( sum(segment.get('device_count', 0) for segment in segments) / len(segments), 1 ) if segments else 0 } # Security insights security = topology_analysis.get('securityPatterns', {}) if security: risk_assessment = security.get('risk_assessment', {}) metadata['security_insights'] = { 'overall_risk_level': risk_assessment.get('overall_risk_level', 'UNKNOWN'), 'security_score': risk_assessment.get('security_score', 0), 'has_security_recommendations': len(security.get('security_recommendations', [])) > 0, 'total_protocols_in_use': len(security.get('protocol_distribution', {})) } return metadata

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/unimus-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server