Skip to main content
Glama

Unimus MCP Server

server.py42.7 kB
#!/usr/bin/env python3 """ Unimus MCP Server A Model Context Protocol server for read-only access to Unimus network configuration management. Provides tools for querying devices, backups, and system health. Version: 1.0.0 """ from mcp.server.fastmcp import FastMCP from unimus_client import UnimusRestClient, UnimusError, UnimusAuthenticationError, UnimusNotFoundError, UnimusValidationError from config import load_config, UnimusConfig import os import logging import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer import json from functools import partial from typing import Dict, List, Optional, Any # Global configuration and client config: Optional[UnimusConfig] = None unimus_client: Optional[UnimusRestClient] = None # Configure logging (will be updated from config) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("Unimus", description="Read-only MCP server for Unimus network configuration management") @mcp.tool() def unimus_get_health() -> Dict[str, str]: """ Get Unimus system health status. Returns: Health status information containing: - status: 'OK', 'LICENSING_UNREACHABLE', or 'ERROR' The status meanings: - OK: Unimus is ready to handle all requests - LICENSING_UNREACHABLE: License server unreachable, limited functionality - ERROR: Unimus requires user interaction, requests will be refused """ try: return unimus_client.get_health() except UnimusError as e: raise ValueError(f"Failed to get health status: {str(e)}") @mcp.tool() def unimus_get_devices(filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ Get all devices from Unimus with optional filtering. Automatically handles pagination to return complete results. Args: filters: Optional dictionary of filters to apply. Common options include: - managed: Boolean, filter by managed state (true/false) - vendor: String, filter by vendor name (e.g., "Cisco", "MikroTik") - type: String, filter by device type (e.g., "IOS", "RouterOS") - model: String, filter by device model - site_id: Number, filter by site ID - zone_id: String, filter by zone ID - address: String, filter by device address - description: String, filter by description content Returns: List of device objects, each containing: - id: Device ID - uuid: Device UUID - address: IP address or hostname - description: Device description - managed: Management status - vendor: Device vendor - type: Device type - model: Device model - lastJobStatus: Last job status (SUCCESSFUL, FAILED, UNKNOWN) - createTime: Creation timestamp - zoneId: Zone identifier Examples: Get all Cisco devices: {"vendor": "Cisco"} Get managed devices only: {"managed": true} Get devices from specific zone: {"zone_id": "1"} """ try: filters = filters or {} return unimus_client.get_devices(filters) except UnimusError as e: raise ValueError(f"Failed to get devices: {str(e)}") @mcp.tool() def unimus_get_device_by_id( device_id: int, include_connections: Optional[bool] = None, include_attributes: Optional[List[str]] = None, enrich_metadata: bool = False ) -> Dict[str, Any]: """ Get detailed information about a specific device by its ID with flexible attribute selection and enhanced metadata. Args: device_id: The numeric ID of the device to retrieve include_connections: Whether to include connection information (backward compatibility) If None and include_attributes is None, defaults to True include_attributes: List of specific attributes to include. Options: - 'schedule' or 's': Include backup schedule information - 'connections' or 'c': Include connection details (SSH/TELNET) - None: Use include_connections parameter or default behavior enrich_metadata: Whether to add calculated metadata fields (Phase 2 feature) Adds backup health, timing, connectivity, and configuration analysis Returns: Device object with detailed information including: - Basic device info (id, address, description, vendor, type, model, etc.) - Schedule information (if requested via attributes or include_connections) - Connection details (if requested via attributes or include_connections): - Connection type (SSH, TELNET) - Port number - Credentials information - Enable password status - Enhanced metadata (if enrich_metadata=True): - backupAge, lastBackupTime, backupFreshness - deviceHealth, deviceLifecycle, connectionTypes - configurationStability, changeFrequency, hasRecentChanges Raises: ValueError: If device_id is invalid or device not found Examples: # Backward compatibility - include all (schedule + connections) device = unimus_get_device_by_id(123) device = unimus_get_device_by_id(123, include_connections=True) # New flexible selection - only schedule information device = unimus_get_device_by_id(123, include_attributes=['schedule']) # New flexible selection - only connection details device = unimus_get_device_by_id(123, include_attributes=['connections']) # New flexible selection - specific combination device = unimus_get_device_by_id(123, include_attributes=['schedule', 'connections']) # Basic device info only (no additional attributes) device = unimus_get_device_by_id(123, include_attributes=[]) device = unimus_get_device_by_id(123, include_connections=False) # Enhanced metadata with calculated fields (Phase 2) device = unimus_get_device_by_id(123, enrich_metadata=True) # Returns additional calculated fields for advanced analysis # Combined: specific attributes + enhanced metadata device = unimus_get_device_by_id(123, include_attributes=['schedule'], enrich_metadata=True) """ try: return unimus_client.get_device_by_id(device_id, include_connections, include_attributes, enrich_metadata) except UnimusNotFoundError: raise ValueError(f"Device with ID {device_id} not found") except UnimusValidationError as e: raise ValueError(f"Invalid device ID: {str(e)}") except UnimusError as e: raise ValueError(f"Failed to get device: {str(e)}") @mcp.tool() def unimus_get_device_by_address(address: str, zone_id: Optional[str] = None) -> Dict[str, Any]: """ Get a device by its network address (IP address or hostname). Args: address: The device address - can be IPv4, IPv6, or hostname zone_id: Optional zone ID to limit search scope (default: search default zone) Returns: Device object with detailed information including schedule and connections Raises: ValueError: If device with given address is not found Examples: Find device by IP: address="192.168.1.1" Find in specific zone: address="router.example.com", zone_id="2" """ try: return unimus_client.get_device_by_address(address, zone_id) except UnimusNotFoundError: zone_info = f" in zone {zone_id}" if zone_id else "" raise ValueError(f"Device with address '{address}'{zone_info} not found") except UnimusError as e: raise ValueError(f"Failed to get device: {str(e)}") @mcp.tool() def unimus_get_device_backups(device_id: int, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Get all configuration backups for a specific device. Backups are returned in descending chronological order (newest first). Args: device_id: The numeric ID of the device limit: Optional limit on number of backups to return (default: all backups) Returns: List of backup objects, each containing: - id: Backup ID - validSince: Timestamp when configuration was first retrieved - validUntil: Timestamp when configuration was last seen (null if only retrieved once) - type: Backup type ('TEXT' or 'BINARY') - content: Decoded backup content (readable text for TEXT, base64 for BINARY) - content_type: Format indicator ('text' or 'base64') Note: TEXT backups are automatically decoded from base64 to readable text. BINARY backups remain in base64 format. No more manual decoding needed! Raises: ValueError: If device_id is invalid or device not found """ try: return unimus_client.get_device_backups(device_id, limit) except UnimusNotFoundError: raise ValueError(f"Device with ID {device_id} not found") except UnimusError as e: raise ValueError(f"Failed to get device backups: {str(e)}") @mcp.tool() def unimus_get_device_latest_backup(device_id: int) -> Optional[Dict[str, Any]]: """ Get the most recent configuration backup for a specific device. Args: device_id: The numeric ID of the device Returns: Latest backup object or None if no backups exist. Backup object contains: - id: Backup ID - validSince: Timestamp when configuration was first retrieved - validUntil: Timestamp when configuration was last seen - type: Backup type ('TEXT' or 'BINARY') - content: Decoded backup content (readable text for TEXT, base64 for BINARY) - content_type: Format indicator ('text' or 'base64') Raises: ValueError: If device_id is invalid or device not found Note: Returns None if the device exists but has no backups yet. TEXT backups are automatically decoded - no more base64 artifacts needed! """ try: return unimus_client.get_device_latest_backup(device_id) except UnimusNotFoundError: raise ValueError(f"Device with ID {device_id} not found") except UnimusError as e: raise ValueError(f"Failed to get latest backup: {str(e)}") @mcp.tool() def unimus_get_backup_by_id(backup_id: int) -> Dict[str, Any]: """ Get a specific backup by its ID. Args: backup_id: The numeric ID of the backup Returns: Backup object containing: - id: Backup ID - validSince: Timestamp when configuration was first retrieved - validUntil: Timestamp when configuration was last seen - type: Backup type ('TEXT' or 'BINARY') - content: Decoded backup content (readable text for TEXT, base64 for BINARY) - content_type: Format indicator ('text' or 'base64') Raises: ValueError: If backup_id is invalid or backup not found Note: TEXT backups are automatically decoded to readable text format. """ try: return unimus_client.get_backup_by_id(backup_id) except UnimusNotFoundError: raise ValueError(f"Backup with ID {backup_id} not found") except UnimusError as e: raise ValueError(f"Failed to get backup: {str(e)}") @mcp.tool() def unimus_get_devices_by_description(description: str, exact_match: bool = False, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Get devices by description content with exact or partial matching. **Performance Note**: When exact_match=True, this retrieves ALL devices that partially match the description and filters them in Python. For large device inventories, always use the limit parameter to improve performance. Args: description: Description text to search for exact_match: If True, match description exactly. If False, partial match (default) **Warning: May be slow with large inventories without limit** limit: Optional limit on number of results. **Strongly recommended** when exact_match=True Returns: List of device objects matching the description criteria, each containing: - id: Device ID - uuid: Device UUID - address: IP address or hostname - description: Device description - managed: Management status - vendor: Device vendor - type: Device type - model: Device model - lastJobStatus: Last job status (SUCCESSFUL, FAILED, UNKNOWN) - createTime: Creation timestamp - zoneId: Zone identifier Examples: Fast partial match: description="router", exact_match=False Optimized exact match: description="Main Core Router", exact_match=True, limit=10 Performance-friendly: description="switch", limit=10 """ try: return unimus_client.get_devices_by_description(description, exact_match, limit) except UnimusError as e: raise ValueError(f"Failed to get devices by description: {str(e)}") @mcp.tool() def unimus_get_backup_diff(orig_id: int, rev_id: int) -> Dict[str, Any]: """ Get differences between two backup configurations. Args: orig_id: ID of the backup that will be considered as original rev_id: ID of the backup that will be considered as revised Returns: Dictionary containing diff information with: - original: Information about the original backup - revised: Information about the revised backup - added: Lines that were added in the revised backup - removed: Lines that were removed from the original backup - changed: Lines that were changed between backups Raises: ValueError: If backup IDs are invalid or backups not found Examples: Compare two backups: orig_id=100, rev_id=105 Compare device config changes over time Analyze configuration differences between devices Note: This function can compare backups from different devices. """ try: return unimus_client.get_backup_diff(orig_id, rev_id) except UnimusValidationError as e: raise ValueError(f"Invalid backup ID parameter: {str(e)}") except UnimusNotFoundError as e: raise ValueError(f"Backup not found: {str(e)}") except UnimusError as e: raise ValueError(f"Failed to get backup diff: {str(e)}") @mcp.tool() def unimus_get_devices_with_changed_backups(since: Optional[int] = None, until: Optional[int] = None) -> List[Dict[str, Any]]: """ Get devices that had backup changes within a specific time range. Args: since: Start of time range in seconds (unix timestamp). If None, defaults to 0 until: End of time range in seconds (unix timestamp). If None, defaults to current time Returns: List of device objects that had backup changes, each containing: - id: Device ID - createTime: Device creation time in seconds - address: Hostname, IPv4 or IPv6 address - description: Device description Raises: ValueError: If time range parameters are invalid Examples: All changes since epoch: since=None, until=None Last 24 hours: since=current_time-86400, until=current_time Specific period: since=1640995200, until=1641081600 Note: Returns empty list if no devices had backup changes in the time range. """ try: return unimus_client.get_devices_with_changed_backups(since, until) except UnimusValidationError as e: raise ValueError(f"Invalid time range parameter: {str(e)}") except UnimusError as e: raise ValueError(f"Failed to get devices with changed backups: {str(e)}") @mcp.tool() def unimus_get_schedules() -> List[Dict[str, Any]]: """ Get a list of all schedules in Unimus. Returns: List of schedule objects containing schedule information including: - Schedule ID and name - Configuration and timing details - Associated devices and rules - Status and execution information Raises: ValueError: If the request fails Examples: List all backup schedules View schedule configuration details Check schedule status and effectiveness """ try: return unimus_client.get_schedules() except UnimusError as e: raise ValueError(f"Failed to get schedules: {str(e)}") @mcp.tool() def unimus_get_schedule_by_id(schedule_id: int) -> Dict[str, Any]: """ Get detailed information about a specific schedule. Args: schedule_id: The schedule ID to retrieve Returns: Schedule object containing detailed information including: - Complete schedule configuration - Timing and recurrence settings - Associated devices and filters - Execution history and status Raises: ValueError: If schedule_id is invalid or schedule not found Examples: Get specific schedule: schedule_id=5 View schedule configuration details Check schedule execution status """ try: return unimus_client.get_schedule_by_id(schedule_id) except UnimusValidationError as e: raise ValueError(f"Invalid schedule ID parameter: {str(e)}") except UnimusNotFoundError as e: raise ValueError(f"Schedule not found: {str(e)}") except UnimusError as e: raise ValueError(f"Failed to get schedule: {str(e)}") @mcp.tool() def unimus_search_backup_content( pattern: str, device_filters: Optional[Dict[str, Any]] = None, context_lines: int = 2, limit: Optional[int] = None, since: Optional[int] = None, until: Optional[int] = None ) -> List[Dict[str, Any]]: """ Search through backup configurations for specific patterns using regular expressions. **Performance Warning**: This function retrieves backup content from devices and searches through them client-side. For large device inventories, always use device_filters and/or limit parameters to optimize performance and avoid timeouts. Args: pattern: Regular expression pattern to search for (case-insensitive, multiline) device_filters: Optional filters to limit device search scope: - vendor: String, filter by vendor name (e.g., "Cisco", "MikroTik", "Juniper") - type: String, filter by device type (e.g., "IOS", "RouterOS", "JunOS") - managed: Boolean, filter by managed devices only (true/false) - site_id: Number, filter by site ID - zone_id: String, filter by zone ID - address: String, filter by device address pattern context_lines: Number of context lines to include around matches (default: 2) limit: Optional limit on number of devices to search (**strongly recommended**) since: Start time range for backup filtering (unix timestamp, optional) until: End time range for backup filtering (unix timestamp, optional) Returns: List of search result objects, each containing: - device: Device information (id, address, description, vendor, type, model) - backup: Backup information (id, validSince, validUntil, type) - matches: List of pattern matches with: - line_number: Line number where pattern was found - line_content: Content of the matching line - context_before: Lines before the match for context - context_after: Lines after the match for context - match_groups: Regex capture groups if any Raises: ValueError: If pattern is invalid regex, parameters are invalid, or search fails Performance Tips: - **Always use device_filters** to reduce search scope (vendor, type, managed) - **Set a reasonable limit** for large inventories (e.g., 50-100 devices) - Use time filters to search recent backups only - Test regex patterns on small device sets first - Prefer specific patterns over broad wildcards Examples: # Search for interface configurations on Cisco devices results = unimus_search_backup_content( pattern=r"interface GigabitEthernet\d+/\d+", device_filters={"vendor": "Cisco", "managed": True}, limit=50 ) # Find VLAN configurations with extended context results = unimus_search_backup_content( pattern=r"vlan (\d+)", device_filters={"type": "IOS"}, context_lines=5, limit=25 ) # Search for recent configuration changes import time week_ago = int(time.time()) - (7 * 24 * 60 * 60) results = unimus_search_backup_content( pattern=r"ip route", since=week_ago, device_filters={"vendor": "Cisco"}, limit=30 ) Use Cases: - Find specific configuration patterns across devices - Audit security configurations (ACLs, users, passwords) - Locate interface or VLAN configurations - Search for routing or switching protocols - Identify configuration inconsistencies - Compliance checking across network infrastructure Note: Only searches TEXT-type backups with decoded content. Binary backups are skipped. Search is performed on the latest backup for each device. """ try: return unimus_client.search_backup_content( pattern=pattern, device_filters=device_filters, context_lines=context_lines, limit=limit, since=since, until=until ) except UnimusValidationError as e: raise ValueError(f"Invalid search parameter: {str(e)}") except UnimusError as e: raise ValueError(f"Failed to search backup content: {str(e)}") @mcp.tool() def unimus_get_device_relationships( device_id: int, include_network_neighbors: bool = True, include_zone_peers: bool = True, include_connection_analysis: bool = True ) -> Dict[str, Any]: """ Analyze and discover device relationships and topology connections (NEW v0.5.0 Phase 3). Provides comprehensive relationship mapping including network topology analysis, zone-based device groupings, and connection pattern analysis. Args: device_id: The device ID to analyze relationships for include_network_neighbors: Whether to discover network neighbors via IP subnet analysis include_zone_peers: Whether to include devices in the same zone include_connection_analysis: Whether to analyze connection patterns and security Returns: Comprehensive relationship analysis containing: - device: Base device information with enhanced metadata - networkNeighbors: Devices in same network segments with subnet analysis - zonePeers: Devices in same zone with grouping information - connectionPatterns: Connection method analysis and security assessment - topologyInsights: High-level network topology analysis - relationshipMetadata: Calculated relationship metrics and statistics Network Analysis Features: - Intelligent subnet detection (supports /16, /20, /22, /24, /25, /26, /27, /28) - Network distance calculation between devices - IP address relationship mapping Zone Analysis Features: - Zone peer discovery and grouping - Vendor and device type diversity analysis - Management status distribution within zones Connection Analysis Features: - Connection method similarity scoring - Port usage pattern analysis - Security assessment (SSH vs TELNET usage) - Network-wide security recommendations Topology Insights: - Network position classification (isolated, lightly_connected, moderately_connected, highly_connected) - Connectivity role analysis (standalone, endpoint, network_node, infrastructure_hub) - Zone importance assessment (minimal, moderate, important, critical) Raises: ValueError: If device_id is invalid or device not found Examples: # Full relationship analysis relationships = unimus_get_device_relationships(123) # Network topology focus relationships = unimus_get_device_relationships( device_id=123, include_network_neighbors=True, include_zone_peers=False, include_connection_analysis=False ) # Security and connection analysis focus relationships = unimus_get_device_relationships( device_id=123, include_network_neighbors=False, include_zone_peers=False, include_connection_analysis=True ) Performance Note: This function analyzes relationships against all devices in the system. For very large device inventories (>1000 devices), analysis may take several seconds. Results include comprehensive metrics and insights. Use Cases: - Network topology visualization and mapping - Security assessment and audit (connection methods) - Zone organization and device grouping analysis - Infrastructure planning and network segmentation - Device relationship discovery for troubleshooting - Compliance checking across network zones """ try: return unimus_client.get_device_relationships( device_id=device_id, include_network_neighbors=include_network_neighbors, include_zone_peers=include_zone_peers, include_connection_analysis=include_connection_analysis ) except UnimusNotFoundError: raise ValueError(f"Device with ID {device_id} not found") except UnimusValidationError as e: raise ValueError(f"Invalid device ID: {str(e)}") except UnimusError as e: raise ValueError(f"Failed to analyze device relationships: {str(e)}") @mcp.tool() def unimus_get_network_topology_analysis( zone_id: Optional[str] = None, include_clusters: bool = True, include_security_analysis: bool = True ) -> Dict[str, Any]: """ Analyze network topology and device clusters across infrastructure (NEW v0.5.0 Phase 3). Provides comprehensive network-wide topology analysis including device clustering, network segmentation, zone analysis, and security pattern assessment. Args: zone_id: Optional zone ID to limit analysis scope (default: analyze all zones) include_clusters: Whether to perform device clustering and network segmentation analysis include_security_analysis: Whether to include network-wide security pattern analysis Returns: Comprehensive network topology analysis containing: - networkOverview: High-level network statistics and device distribution - deviceClusters: Identified device clusters (network, vendor, zone-based) - networkSegments: Discovered network segments and subnet analysis - zoneTopology: Zone-based topology insights and device groupings - securityPatterns: Network-wide security analysis and recommendations - topologyMetadata: Analysis metadata, insights, and performance metrics Network Overview Features: - Total device count and management status distribution - Vendor and device type distribution across infrastructure - Zone distribution and connection method statistics - Health status distribution and managed device percentages Device Clustering Features: - Network subnet-based clustering (/24 subnets) - Vendor-based device groupings (minimum 3 devices) - Zone-based device clusters (minimum 2 devices) - Cluster size analysis and device count metrics Network Segmentation Features: - Multi-subnet analysis (supports /16, /20, /22, /24, /25, /26, /27, /28) - Network segment device count and diversity analysis - Vendor and device type diversity within segments - Top 20 largest network segments identification Zone Topology Features: - Per-zone device statistics and management percentages - Vendor and device type diversity within zones - Health status distribution per zone - Connection method analysis per zone Security Analysis Features: - Protocol distribution (SSH vs TELNET usage) - Security score calculation and risk assessment - Network-wide security recommendations - Connection security percentage analysis Raises: ValueError: If topology analysis fails or invalid parameters provided Examples: # Full network topology analysis topology = unimus_get_network_topology_analysis() # Zone-specific analysis topology = unimus_get_network_topology_analysis(zone_id="1") # Clustering focus (no security analysis) topology = unimus_get_network_topology_analysis( include_clusters=True, include_security_analysis=False ) # Security assessment focus topology = unimus_get_network_topology_analysis( include_clusters=False, include_security_analysis=True ) Performance Note: This function analyzes all devices in the specified scope (all zones or specific zone). For very large infrastructures (>1000 devices), analysis may take several seconds. Consider using zone_id parameter to limit scope for faster analysis. Use Cases: - Infrastructure overview and visualization - Network segmentation planning and optimization - Security audit and compliance assessment - Device clustering and organization analysis - Zone-based infrastructure management - Vendor diversity and standardization analysis - Connection method security assessment """ try: return unimus_client.get_network_topology_analysis( zone_id=zone_id, include_clusters=include_clusters, include_security_analysis=include_security_analysis ) except UnimusError as e: raise ValueError(f"Failed to analyze network topology: {str(e)}") @mcp.tool() def unimus_get_latest_backups(device_ids: List[int]) -> List[Dict[str, Any]]: """ Get the latest backups for multiple specific devices. Args: device_ids: List of device IDs to retrieve latest backups for Returns: List of objects, each containing: - deviceId: The device ID - address: Device IP address or hostname - backup: Backup object with: - id: Backup ID - validSince: Timestamp when configuration was first retrieved - validUntil: Timestamp when configuration was last seen - type: Backup type ('TEXT' or 'BINARY') - content: Decoded backup content (readable text for TEXT, base64 for BINARY) - content_type: Format indicator ('text' or 'base64') Raises: ValueError: If device_ids list is empty, contains invalid IDs, or devices not found Examples: Single device: device_ids=[42] Multiple devices: device_ids=[1, 15, 23, 45] Note: Only returns data for devices that exist and have backups. TEXT backups are automatically decoded - massive performance improvement! """ try: return unimus_client.get_latest_backups(device_ids) except UnimusValidationError as e: raise ValueError(f"Invalid device_ids parameter: {str(e)}") except UnimusError as e: raise ValueError(f"Failed to get latest backups: {str(e)}") def load_configuration(config_path: Optional[str] = None) -> UnimusConfig: """ Load configuration from file and environment variables. Args: config_path: Optional explicit path to configuration file Returns: Loaded and validated configuration Raises: ValueError: If configuration is invalid or missing required values """ try: config = load_config(config_path) # Update logging level based on configuration numeric_level = getattr(logging, config.log_level.upper(), None) if numeric_level is not None: logging.getLogger().setLevel(numeric_level) logger.info(f"Log level set to {config.log_level}") return config except Exception as e: # Fallback to legacy environment variable validation for backward compatibility logger.warning(f"Failed to load configuration file: {e}") logger.info("Falling back to environment variable configuration") unimus_url = os.getenv("UNIMUS_URL") unimus_token = os.getenv("UNIMUS_TOKEN") if not unimus_url: raise ValueError("UNIMUS_URL environment variable must be set (or provide configuration file)") if not unimus_token: raise ValueError("UNIMUS_TOKEN environment variable must be set (or provide configuration file)") # Create minimal configuration from environment variables return UnimusConfig(url=unimus_url, token=unimus_token) def validate_environment() -> tuple[str, str]: """ Legacy function for backward compatibility. Returns: Tuple of (unimus_url, unimus_token) Raises: ValueError: If required environment variables are missing """ logger.warning("validate_environment() is deprecated. Use load_configuration() instead.") unimus_url = os.getenv("UNIMUS_URL") unimus_token = os.getenv("UNIMUS_TOKEN") if not unimus_url: raise ValueError("UNIMUS_URL environment variable must be set") if not unimus_token: raise ValueError("UNIMUS_TOKEN environment variable must be set") return unimus_url, unimus_token class HealthCheckHandler(BaseHTTPRequestHandler): """ HTTP handler that implements both liveness (/healthz) and readiness (/readyz) checks. Follows Kubernetes health check patterns for production-ready containers. """ def __init__(self, client: Optional[UnimusRestClient], *args, **kwargs): self.client = client super().__init__(*args, **kwargs) def do_GET(self) -> None: if self.path == '/healthz': # Liveness: Is the server process alive? Yes, because this code is running. self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() response_body = json.dumps({ 'status': 'alive', 'server_version': '1.0.0', 'check_type': 'liveness' }) self.wfile.write(response_body.encode('utf-8')) return if self.path == '/readyz': # Readiness: Can the server do its work? Check Unimus connection. try: if self.client and self.client.validate_connection(): # Get actual Unimus health status health_info = self.client.get_health() unimus_status = health_info.get('status', 'UNKNOWN') self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() response_body = json.dumps({ 'status': 'ready', 'unimus_status': unimus_status, 'server_version': '1.0.0', 'check_type': 'readiness' }) self.wfile.write(response_body.encode('utf-8')) else: self.send_response(503) # Service Unavailable self.send_header('Content-type', 'application/json') self.end_headers() response_body = json.dumps({ 'status': 'not_ready', 'reason': 'Unimus connection failed or client not initialized', 'server_version': '1.0.0', 'check_type': 'readiness' }) self.wfile.write(response_body.encode('utf-8')) except Exception as e: self.send_response(503) self.send_header('Content-type', 'application/json') self.end_headers() response_body = json.dumps({ 'status': 'not_ready', 'reason': f'Readiness check error: {str(e)}', 'server_version': '1.0.0', 'check_type': 'readiness' }) self.wfile.write(response_body.encode('utf-8')) return # Legacy /health endpoint for backward compatibility - maps to readiness if self.path == '/health': # Redirect to readiness check self.send_response(301) # Moved Permanently self.send_header('Location', '/readyz') self.end_headers() return # Fallback for other paths self.send_response(404) self.end_headers() self.wfile.write(b'Not Found') def log_message(self, format: str, *args: Any) -> None: # Suppress HTTP server logging to keep console clean return def start_health_check_server(client: Optional[UnimusRestClient], host: str = '0.0.0.0', port: int = 8080) -> None: """Start HTTP health check server in a separate daemon thread.""" try: # Use partial to pass the client instance to the handler handler = partial(HealthCheckHandler, client) server_address = (host, port) httpd = HTTPServer(server_address, handler) health_thread = threading.Thread(target=httpd.serve_forever) health_thread.daemon = True # Critical: ensures thread stops when main thread stops health_thread.start() logger.info(f"Health check server started. Liveness: http://{host}:{port}/healthz, Readiness: http://{host}:{port}/readyz") except Exception as e: logger.warning(f"Failed to start health check server: {e}") def setup_unimus_client(config_path: Optional[str] = None) -> tuple[Optional[UnimusRestClient], Optional[UnimusConfig]]: """ Initialize Unimus client and configuration without crashing the application on failure. Args: config_path: Optional explicit path to configuration file Returns: Tuple of (client instance or None, configuration or None) """ try: config = load_configuration(config_path) # Create client with configuration settings including cache config client = UnimusRestClient( url=config.url, token=config.token, timeout=config.timeout, verify_ssl=config.verify_ssl, config=config # Pass complete config for cache initialization ) # Note: Custom headers support could be added to UnimusRestClient in the future logger.info(f"Unimus client initialized with URL: {config.url}") return client, config except ValueError as e: logger.error(f"Unimus client configuration failed: {e}. Server starting in degraded mode.") return None, None def initialize_client() -> UnimusRestClient: """ Initialize and validate the Unimus client (legacy function for backward compatibility). Returns: Configured and validated UnimusRestClient Raises: ValueError: If client initialization or validation fails """ logger.warning("initialize_client() is deprecated. Use setup_unimus_client() instead.") unimus_url, unimus_token = validate_environment() # Initialize client client = UnimusRestClient(url=unimus_url, token=unimus_token) # Validate connection if not client.validate_connection(): raise ValueError( f"Failed to connect to Unimus at {unimus_url}. " "Please check URL, token, and network connectivity." ) logger.info(f"Successfully connected to Unimus at {unimus_url}") return client if __name__ == "__main__": # 1. Initialize the client and configuration unimus_client, config = setup_unimus_client() # 2. Start the health check server (if enabled) if config is None or config.enable_health_server: health_port = config.health_check_port if config else 8080 start_health_check_server(unimus_client, port=health_port) else: logger.info("Health check server disabled by configuration") # 3. Define the MCP server task to run in a thread def run_mcp_server(): try: logger.info("Starting Unimus MCP server on a dedicated thread...") mcp.run(transport="stdio") except Exception as e: logger.error(f"MCP server thread encountered an error: {e}", exc_info=True) # 4. Start the MCP server in a daemon thread mcp_thread = threading.Thread(target=run_mcp_server) mcp_thread.daemon = True mcp_thread.start() # 5. Perform initial connection validation if unimus_client: try: if unimus_client.validate_connection(): health = unimus_client.get_health() logger.info(f"Successfully connected to Unimus. System status: {health.get('status', 'UNKNOWN')}") else: logger.warning("Could not validate Unimus connection. Server is in degraded mode.") except Exception as e: logger.warning(f"Could not retrieve system health at startup: {e}. Server is in degraded mode.") else: logger.warning("Unimus client not initialized. Server is running in degraded mode.") # 6. Keep the main thread alive to allow daemon threads to run logger.info("Health endpoints: /healthz (liveness), /readyz (readiness), /health (legacy->readiness)") try: while True: time.sleep(3600) # Sleep for a long time except KeyboardInterrupt: logger.info("Shutting down MCP server.")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/unimus-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server