server.py•42.7 kB
#!/usr/bin/env python3
"""
Unimus MCP Server
A Model Context Protocol server for read-only access to Unimus network configuration management.
Provides tools for querying devices, backups, and system health.
Version: 1.0.0
"""
from mcp.server.fastmcp import FastMCP
from unimus_client import UnimusRestClient, UnimusError, UnimusAuthenticationError, UnimusNotFoundError, UnimusValidationError
from config import load_config, UnimusConfig
import os
import logging
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from functools import partial
from typing import Dict, List, Optional, Any
# Global configuration and client
config: Optional[UnimusConfig] = None
unimus_client: Optional[UnimusRestClient] = None
# Configure logging (will be updated from config)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("Unimus", description="Read-only MCP server for Unimus network configuration management")
@mcp.tool()
def unimus_get_health() -> Dict[str, str]:
"""
Get Unimus system health status.
Returns:
Health status information containing:
- status: 'OK', 'LICENSING_UNREACHABLE', or 'ERROR'
The status meanings:
- OK: Unimus is ready to handle all requests
- LICENSING_UNREACHABLE: License server unreachable, limited functionality
- ERROR: Unimus requires user interaction, requests will be refused
"""
try:
return unimus_client.get_health()
except UnimusError as e:
raise ValueError(f"Failed to get health status: {str(e)}")
@mcp.tool()
def unimus_get_devices(filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Get all devices from Unimus with optional filtering.
Automatically handles pagination to return complete results.
Args:
filters: Optional dictionary of filters to apply. Common options include:
- managed: Boolean, filter by managed state (true/false)
- vendor: String, filter by vendor name (e.g., "Cisco", "MikroTik")
- type: String, filter by device type (e.g., "IOS", "RouterOS")
- model: String, filter by device model
- site_id: Number, filter by site ID
- zone_id: String, filter by zone ID
- address: String, filter by device address
- description: String, filter by description content
Returns:
List of device objects, each containing:
- id: Device ID
- uuid: Device UUID
- address: IP address or hostname
- description: Device description
- managed: Management status
- vendor: Device vendor
- type: Device type
- model: Device model
- lastJobStatus: Last job status (SUCCESSFUL, FAILED, UNKNOWN)
- createTime: Creation timestamp
- zoneId: Zone identifier
Examples:
Get all Cisco devices: {"vendor": "Cisco"}
Get managed devices only: {"managed": true}
Get devices from specific zone: {"zone_id": "1"}
"""
try:
filters = filters or {}
return unimus_client.get_devices(filters)
except UnimusError as e:
raise ValueError(f"Failed to get devices: {str(e)}")
@mcp.tool()
def unimus_get_device_by_id(
device_id: int,
include_connections: Optional[bool] = None,
include_attributes: Optional[List[str]] = None,
enrich_metadata: bool = False
) -> Dict[str, Any]:
"""
Get detailed information about a specific device by its ID with flexible attribute selection and enhanced metadata.
Args:
device_id: The numeric ID of the device to retrieve
include_connections: Whether to include connection information (backward compatibility)
If None and include_attributes is None, defaults to True
include_attributes: List of specific attributes to include. Options:
- 'schedule' or 's': Include backup schedule information
- 'connections' or 'c': Include connection details (SSH/TELNET)
- None: Use include_connections parameter or default behavior
enrich_metadata: Whether to add calculated metadata fields (Phase 2 feature)
Adds backup health, timing, connectivity, and configuration analysis
Returns:
Device object with detailed information including:
- Basic device info (id, address, description, vendor, type, model, etc.)
- Schedule information (if requested via attributes or include_connections)
- Connection details (if requested via attributes or include_connections):
- Connection type (SSH, TELNET)
- Port number
- Credentials information
- Enable password status
- Enhanced metadata (if enrich_metadata=True):
- backupAge, lastBackupTime, backupFreshness
- deviceHealth, deviceLifecycle, connectionTypes
- configurationStability, changeFrequency, hasRecentChanges
Raises:
ValueError: If device_id is invalid or device not found
Examples:
# Backward compatibility - include all (schedule + connections)
device = unimus_get_device_by_id(123)
device = unimus_get_device_by_id(123, include_connections=True)
# New flexible selection - only schedule information
device = unimus_get_device_by_id(123, include_attributes=['schedule'])
# New flexible selection - only connection details
device = unimus_get_device_by_id(123, include_attributes=['connections'])
# New flexible selection - specific combination
device = unimus_get_device_by_id(123, include_attributes=['schedule', 'connections'])
# Basic device info only (no additional attributes)
device = unimus_get_device_by_id(123, include_attributes=[])
device = unimus_get_device_by_id(123, include_connections=False)
# Enhanced metadata with calculated fields (Phase 2)
device = unimus_get_device_by_id(123, enrich_metadata=True)
# Returns additional calculated fields for advanced analysis
# Combined: specific attributes + enhanced metadata
device = unimus_get_device_by_id(123, include_attributes=['schedule'], enrich_metadata=True)
"""
try:
return unimus_client.get_device_by_id(device_id, include_connections, include_attributes, enrich_metadata)
except UnimusNotFoundError:
raise ValueError(f"Device with ID {device_id} not found")
except UnimusValidationError as e:
raise ValueError(f"Invalid device ID: {str(e)}")
except UnimusError as e:
raise ValueError(f"Failed to get device: {str(e)}")
@mcp.tool()
def unimus_get_device_by_address(address: str, zone_id: Optional[str] = None) -> Dict[str, Any]:
"""
Get a device by its network address (IP address or hostname).
Args:
address: The device address - can be IPv4, IPv6, or hostname
zone_id: Optional zone ID to limit search scope (default: search default zone)
Returns:
Device object with detailed information including schedule and connections
Raises:
ValueError: If device with given address is not found
Examples:
Find device by IP: address="192.168.1.1"
Find in specific zone: address="router.example.com", zone_id="2"
"""
try:
return unimus_client.get_device_by_address(address, zone_id)
except UnimusNotFoundError:
zone_info = f" in zone {zone_id}" if zone_id else ""
raise ValueError(f"Device with address '{address}'{zone_info} not found")
except UnimusError as e:
raise ValueError(f"Failed to get device: {str(e)}")
@mcp.tool()
def unimus_get_device_backups(device_id: int, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get all configuration backups for a specific device.
Backups are returned in descending chronological order (newest first).
Args:
device_id: The numeric ID of the device
limit: Optional limit on number of backups to return (default: all backups)
Returns:
List of backup objects, each containing:
- id: Backup ID
- validSince: Timestamp when configuration was first retrieved
- validUntil: Timestamp when configuration was last seen (null if only retrieved once)
- type: Backup type ('TEXT' or 'BINARY')
- content: Decoded backup content (readable text for TEXT, base64 for BINARY)
- content_type: Format indicator ('text' or 'base64')
Note: TEXT backups are automatically decoded from base64 to readable text.
BINARY backups remain in base64 format. No more manual decoding needed!
Raises:
ValueError: If device_id is invalid or device not found
"""
try:
return unimus_client.get_device_backups(device_id, limit)
except UnimusNotFoundError:
raise ValueError(f"Device with ID {device_id} not found")
except UnimusError as e:
raise ValueError(f"Failed to get device backups: {str(e)}")
@mcp.tool()
def unimus_get_device_latest_backup(device_id: int) -> Optional[Dict[str, Any]]:
"""
Get the most recent configuration backup for a specific device.
Args:
device_id: The numeric ID of the device
Returns:
Latest backup object or None if no backups exist.
Backup object contains:
- id: Backup ID
- validSince: Timestamp when configuration was first retrieved
- validUntil: Timestamp when configuration was last seen
- type: Backup type ('TEXT' or 'BINARY')
- content: Decoded backup content (readable text for TEXT, base64 for BINARY)
- content_type: Format indicator ('text' or 'base64')
Raises:
ValueError: If device_id is invalid or device not found
Note: Returns None if the device exists but has no backups yet.
TEXT backups are automatically decoded - no more base64 artifacts needed!
"""
try:
return unimus_client.get_device_latest_backup(device_id)
except UnimusNotFoundError:
raise ValueError(f"Device with ID {device_id} not found")
except UnimusError as e:
raise ValueError(f"Failed to get latest backup: {str(e)}")
@mcp.tool()
def unimus_get_backup_by_id(backup_id: int) -> Dict[str, Any]:
"""
Get a specific backup by its ID.
Args:
backup_id: The numeric ID of the backup
Returns:
Backup object containing:
- id: Backup ID
- validSince: Timestamp when configuration was first retrieved
- validUntil: Timestamp when configuration was last seen
- type: Backup type ('TEXT' or 'BINARY')
- content: Decoded backup content (readable text for TEXT, base64 for BINARY)
- content_type: Format indicator ('text' or 'base64')
Raises:
ValueError: If backup_id is invalid or backup not found
Note: TEXT backups are automatically decoded to readable text format.
"""
try:
return unimus_client.get_backup_by_id(backup_id)
except UnimusNotFoundError:
raise ValueError(f"Backup with ID {backup_id} not found")
except UnimusError as e:
raise ValueError(f"Failed to get backup: {str(e)}")
@mcp.tool()
def unimus_get_devices_by_description(description: str, exact_match: bool = False, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get devices by description content with exact or partial matching.
**Performance Note**: When exact_match=True, this retrieves ALL devices that partially
match the description and filters them in Python. For large device inventories, always
use the limit parameter to improve performance.
Args:
description: Description text to search for
exact_match: If True, match description exactly. If False, partial match (default)
**Warning: May be slow with large inventories without limit**
limit: Optional limit on number of results. **Strongly recommended** when exact_match=True
Returns:
List of device objects matching the description criteria, each containing:
- id: Device ID
- uuid: Device UUID
- address: IP address or hostname
- description: Device description
- managed: Management status
- vendor: Device vendor
- type: Device type
- model: Device model
- lastJobStatus: Last job status (SUCCESSFUL, FAILED, UNKNOWN)
- createTime: Creation timestamp
- zoneId: Zone identifier
Examples:
Fast partial match: description="router", exact_match=False
Optimized exact match: description="Main Core Router", exact_match=True, limit=10
Performance-friendly: description="switch", limit=10
"""
try:
return unimus_client.get_devices_by_description(description, exact_match, limit)
except UnimusError as e:
raise ValueError(f"Failed to get devices by description: {str(e)}")
@mcp.tool()
def unimus_get_backup_diff(orig_id: int, rev_id: int) -> Dict[str, Any]:
"""
Get differences between two backup configurations.
Args:
orig_id: ID of the backup that will be considered as original
rev_id: ID of the backup that will be considered as revised
Returns:
Dictionary containing diff information with:
- original: Information about the original backup
- revised: Information about the revised backup
- added: Lines that were added in the revised backup
- removed: Lines that were removed from the original backup
- changed: Lines that were changed between backups
Raises:
ValueError: If backup IDs are invalid or backups not found
Examples:
Compare two backups: orig_id=100, rev_id=105
Compare device config changes over time
Analyze configuration differences between devices
Note: This function can compare backups from different devices.
"""
try:
return unimus_client.get_backup_diff(orig_id, rev_id)
except UnimusValidationError as e:
raise ValueError(f"Invalid backup ID parameter: {str(e)}")
except UnimusNotFoundError as e:
raise ValueError(f"Backup not found: {str(e)}")
except UnimusError as e:
raise ValueError(f"Failed to get backup diff: {str(e)}")
@mcp.tool()
def unimus_get_devices_with_changed_backups(since: Optional[int] = None, until: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get devices that had backup changes within a specific time range.
Args:
since: Start of time range in seconds (unix timestamp). If None, defaults to 0
until: End of time range in seconds (unix timestamp). If None, defaults to current time
Returns:
List of device objects that had backup changes, each containing:
- id: Device ID
- createTime: Device creation time in seconds
- address: Hostname, IPv4 or IPv6 address
- description: Device description
Raises:
ValueError: If time range parameters are invalid
Examples:
All changes since epoch: since=None, until=None
Last 24 hours: since=current_time-86400, until=current_time
Specific period: since=1640995200, until=1641081600
Note: Returns empty list if no devices had backup changes in the time range.
"""
try:
return unimus_client.get_devices_with_changed_backups(since, until)
except UnimusValidationError as e:
raise ValueError(f"Invalid time range parameter: {str(e)}")
except UnimusError as e:
raise ValueError(f"Failed to get devices with changed backups: {str(e)}")
@mcp.tool()
def unimus_get_schedules() -> List[Dict[str, Any]]:
"""
Get a list of all schedules in Unimus.
Returns:
List of schedule objects containing schedule information including:
- Schedule ID and name
- Configuration and timing details
- Associated devices and rules
- Status and execution information
Raises:
ValueError: If the request fails
Examples:
List all backup schedules
View schedule configuration details
Check schedule status and effectiveness
"""
try:
return unimus_client.get_schedules()
except UnimusError as e:
raise ValueError(f"Failed to get schedules: {str(e)}")
@mcp.tool()
def unimus_get_schedule_by_id(schedule_id: int) -> Dict[str, Any]:
"""
Get detailed information about a specific schedule.
Args:
schedule_id: The schedule ID to retrieve
Returns:
Schedule object containing detailed information including:
- Complete schedule configuration
- Timing and recurrence settings
- Associated devices and filters
- Execution history and status
Raises:
ValueError: If schedule_id is invalid or schedule not found
Examples:
Get specific schedule: schedule_id=5
View schedule configuration details
Check schedule execution status
"""
try:
return unimus_client.get_schedule_by_id(schedule_id)
except UnimusValidationError as e:
raise ValueError(f"Invalid schedule ID parameter: {str(e)}")
except UnimusNotFoundError as e:
raise ValueError(f"Schedule not found: {str(e)}")
except UnimusError as e:
raise ValueError(f"Failed to get schedule: {str(e)}")
@mcp.tool()
def unimus_search_backup_content(
pattern: str,
device_filters: Optional[Dict[str, Any]] = None,
context_lines: int = 2,
limit: Optional[int] = None,
since: Optional[int] = None,
until: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Search through backup configurations for specific patterns using regular expressions.
**Performance Warning**: This function retrieves backup content from devices and searches
through them client-side. For large device inventories, always use device_filters and/or
limit parameters to optimize performance and avoid timeouts.
Args:
pattern: Regular expression pattern to search for (case-insensitive, multiline)
device_filters: Optional filters to limit device search scope:
- vendor: String, filter by vendor name (e.g., "Cisco", "MikroTik", "Juniper")
- type: String, filter by device type (e.g., "IOS", "RouterOS", "JunOS")
- managed: Boolean, filter by managed devices only (true/false)
- site_id: Number, filter by site ID
- zone_id: String, filter by zone ID
- address: String, filter by device address pattern
context_lines: Number of context lines to include around matches (default: 2)
limit: Optional limit on number of devices to search (**strongly recommended**)
since: Start time range for backup filtering (unix timestamp, optional)
until: End time range for backup filtering (unix timestamp, optional)
Returns:
List of search result objects, each containing:
- device: Device information (id, address, description, vendor, type, model)
- backup: Backup information (id, validSince, validUntil, type)
- matches: List of pattern matches with:
- line_number: Line number where pattern was found
- line_content: Content of the matching line
- context_before: Lines before the match for context
- context_after: Lines after the match for context
- match_groups: Regex capture groups if any
Raises:
ValueError: If pattern is invalid regex, parameters are invalid, or search fails
Performance Tips:
- **Always use device_filters** to reduce search scope (vendor, type, managed)
- **Set a reasonable limit** for large inventories (e.g., 50-100 devices)
- Use time filters to search recent backups only
- Test regex patterns on small device sets first
- Prefer specific patterns over broad wildcards
Examples:
# Search for interface configurations on Cisco devices
results = unimus_search_backup_content(
pattern=r"interface GigabitEthernet\d+/\d+",
device_filters={"vendor": "Cisco", "managed": True},
limit=50
)
# Find VLAN configurations with extended context
results = unimus_search_backup_content(
pattern=r"vlan (\d+)",
device_filters={"type": "IOS"},
context_lines=5,
limit=25
)
# Search for recent configuration changes
import time
week_ago = int(time.time()) - (7 * 24 * 60 * 60)
results = unimus_search_backup_content(
pattern=r"ip route",
since=week_ago,
device_filters={"vendor": "Cisco"},
limit=30
)
Use Cases:
- Find specific configuration patterns across devices
- Audit security configurations (ACLs, users, passwords)
- Locate interface or VLAN configurations
- Search for routing or switching protocols
- Identify configuration inconsistencies
- Compliance checking across network infrastructure
Note: Only searches TEXT-type backups with decoded content. Binary backups are skipped.
Search is performed on the latest backup for each device.
"""
try:
return unimus_client.search_backup_content(
pattern=pattern,
device_filters=device_filters,
context_lines=context_lines,
limit=limit,
since=since,
until=until
)
except UnimusValidationError as e:
raise ValueError(f"Invalid search parameter: {str(e)}")
except UnimusError as e:
raise ValueError(f"Failed to search backup content: {str(e)}")
@mcp.tool()
def unimus_get_device_relationships(
device_id: int,
include_network_neighbors: bool = True,
include_zone_peers: bool = True,
include_connection_analysis: bool = True
) -> Dict[str, Any]:
"""
Analyze and discover device relationships and topology connections (NEW v0.5.0 Phase 3).
Provides comprehensive relationship mapping including network topology analysis,
zone-based device groupings, and connection pattern analysis.
Args:
device_id: The device ID to analyze relationships for
include_network_neighbors: Whether to discover network neighbors via IP subnet analysis
include_zone_peers: Whether to include devices in the same zone
include_connection_analysis: Whether to analyze connection patterns and security
Returns:
Comprehensive relationship analysis containing:
- device: Base device information with enhanced metadata
- networkNeighbors: Devices in same network segments with subnet analysis
- zonePeers: Devices in same zone with grouping information
- connectionPatterns: Connection method analysis and security assessment
- topologyInsights: High-level network topology analysis
- relationshipMetadata: Calculated relationship metrics and statistics
Network Analysis Features:
- Intelligent subnet detection (supports /16, /20, /22, /24, /25, /26, /27, /28)
- Network distance calculation between devices
- IP address relationship mapping
Zone Analysis Features:
- Zone peer discovery and grouping
- Vendor and device type diversity analysis
- Management status distribution within zones
Connection Analysis Features:
- Connection method similarity scoring
- Port usage pattern analysis
- Security assessment (SSH vs TELNET usage)
- Network-wide security recommendations
Topology Insights:
- Network position classification (isolated, lightly_connected, moderately_connected, highly_connected)
- Connectivity role analysis (standalone, endpoint, network_node, infrastructure_hub)
- Zone importance assessment (minimal, moderate, important, critical)
Raises:
ValueError: If device_id is invalid or device not found
Examples:
# Full relationship analysis
relationships = unimus_get_device_relationships(123)
# Network topology focus
relationships = unimus_get_device_relationships(
device_id=123,
include_network_neighbors=True,
include_zone_peers=False,
include_connection_analysis=False
)
# Security and connection analysis focus
relationships = unimus_get_device_relationships(
device_id=123,
include_network_neighbors=False,
include_zone_peers=False,
include_connection_analysis=True
)
Performance Note:
This function analyzes relationships against all devices in the system.
For very large device inventories (>1000 devices), analysis may take
several seconds. Results include comprehensive metrics and insights.
Use Cases:
- Network topology visualization and mapping
- Security assessment and audit (connection methods)
- Zone organization and device grouping analysis
- Infrastructure planning and network segmentation
- Device relationship discovery for troubleshooting
- Compliance checking across network zones
"""
try:
return unimus_client.get_device_relationships(
device_id=device_id,
include_network_neighbors=include_network_neighbors,
include_zone_peers=include_zone_peers,
include_connection_analysis=include_connection_analysis
)
except UnimusNotFoundError:
raise ValueError(f"Device with ID {device_id} not found")
except UnimusValidationError as e:
raise ValueError(f"Invalid device ID: {str(e)}")
except UnimusError as e:
raise ValueError(f"Failed to analyze device relationships: {str(e)}")
@mcp.tool()
def unimus_get_network_topology_analysis(
zone_id: Optional[str] = None,
include_clusters: bool = True,
include_security_analysis: bool = True
) -> Dict[str, Any]:
"""
Analyze network topology and device clusters across infrastructure (NEW v0.5.0 Phase 3).
Provides comprehensive network-wide topology analysis including device clustering,
network segmentation, zone analysis, and security pattern assessment.
Args:
zone_id: Optional zone ID to limit analysis scope (default: analyze all zones)
include_clusters: Whether to perform device clustering and network segmentation analysis
include_security_analysis: Whether to include network-wide security pattern analysis
Returns:
Comprehensive network topology analysis containing:
- networkOverview: High-level network statistics and device distribution
- deviceClusters: Identified device clusters (network, vendor, zone-based)
- networkSegments: Discovered network segments and subnet analysis
- zoneTopology: Zone-based topology insights and device groupings
- securityPatterns: Network-wide security analysis and recommendations
- topologyMetadata: Analysis metadata, insights, and performance metrics
Network Overview Features:
- Total device count and management status distribution
- Vendor and device type distribution across infrastructure
- Zone distribution and connection method statistics
- Health status distribution and managed device percentages
Device Clustering Features:
- Network subnet-based clustering (/24 subnets)
- Vendor-based device groupings (minimum 3 devices)
- Zone-based device clusters (minimum 2 devices)
- Cluster size analysis and device count metrics
Network Segmentation Features:
- Multi-subnet analysis (supports /16, /20, /22, /24, /25, /26, /27, /28)
- Network segment device count and diversity analysis
- Vendor and device type diversity within segments
- Top 20 largest network segments identification
Zone Topology Features:
- Per-zone device statistics and management percentages
- Vendor and device type diversity within zones
- Health status distribution per zone
- Connection method analysis per zone
Security Analysis Features:
- Protocol distribution (SSH vs TELNET usage)
- Security score calculation and risk assessment
- Network-wide security recommendations
- Connection security percentage analysis
Raises:
ValueError: If topology analysis fails or invalid parameters provided
Examples:
# Full network topology analysis
topology = unimus_get_network_topology_analysis()
# Zone-specific analysis
topology = unimus_get_network_topology_analysis(zone_id="1")
# Clustering focus (no security analysis)
topology = unimus_get_network_topology_analysis(
include_clusters=True,
include_security_analysis=False
)
# Security assessment focus
topology = unimus_get_network_topology_analysis(
include_clusters=False,
include_security_analysis=True
)
Performance Note:
This function analyzes all devices in the specified scope (all zones or specific zone).
For very large infrastructures (>1000 devices), analysis may take several seconds.
Consider using zone_id parameter to limit scope for faster analysis.
Use Cases:
- Infrastructure overview and visualization
- Network segmentation planning and optimization
- Security audit and compliance assessment
- Device clustering and organization analysis
- Zone-based infrastructure management
- Vendor diversity and standardization analysis
- Connection method security assessment
"""
try:
return unimus_client.get_network_topology_analysis(
zone_id=zone_id,
include_clusters=include_clusters,
include_security_analysis=include_security_analysis
)
except UnimusError as e:
raise ValueError(f"Failed to analyze network topology: {str(e)}")
@mcp.tool()
def unimus_get_latest_backups(device_ids: List[int]) -> List[Dict[str, Any]]:
"""
Get the latest backups for multiple specific devices.
Args:
device_ids: List of device IDs to retrieve latest backups for
Returns:
List of objects, each containing:
- deviceId: The device ID
- address: Device IP address or hostname
- backup: Backup object with:
- id: Backup ID
- validSince: Timestamp when configuration was first retrieved
- validUntil: Timestamp when configuration was last seen
- type: Backup type ('TEXT' or 'BINARY')
- content: Decoded backup content (readable text for TEXT, base64 for BINARY)
- content_type: Format indicator ('text' or 'base64')
Raises:
ValueError: If device_ids list is empty, contains invalid IDs, or devices not found
Examples:
Single device: device_ids=[42]
Multiple devices: device_ids=[1, 15, 23, 45]
Note: Only returns data for devices that exist and have backups.
TEXT backups are automatically decoded - massive performance improvement!
"""
try:
return unimus_client.get_latest_backups(device_ids)
except UnimusValidationError as e:
raise ValueError(f"Invalid device_ids parameter: {str(e)}")
except UnimusError as e:
raise ValueError(f"Failed to get latest backups: {str(e)}")
def load_configuration(config_path: Optional[str] = None) -> UnimusConfig:
"""
Load configuration from file and environment variables.
Args:
config_path: Optional explicit path to configuration file
Returns:
Loaded and validated configuration
Raises:
ValueError: If configuration is invalid or missing required values
"""
try:
config = load_config(config_path)
# Update logging level based on configuration
numeric_level = getattr(logging, config.log_level.upper(), None)
if numeric_level is not None:
logging.getLogger().setLevel(numeric_level)
logger.info(f"Log level set to {config.log_level}")
return config
except Exception as e:
# Fallback to legacy environment variable validation for backward compatibility
logger.warning(f"Failed to load configuration file: {e}")
logger.info("Falling back to environment variable configuration")
unimus_url = os.getenv("UNIMUS_URL")
unimus_token = os.getenv("UNIMUS_TOKEN")
if not unimus_url:
raise ValueError("UNIMUS_URL environment variable must be set (or provide configuration file)")
if not unimus_token:
raise ValueError("UNIMUS_TOKEN environment variable must be set (or provide configuration file)")
# Create minimal configuration from environment variables
return UnimusConfig(url=unimus_url, token=unimus_token)
def validate_environment() -> tuple[str, str]:
"""
Legacy function for backward compatibility.
Returns:
Tuple of (unimus_url, unimus_token)
Raises:
ValueError: If required environment variables are missing
"""
logger.warning("validate_environment() is deprecated. Use load_configuration() instead.")
unimus_url = os.getenv("UNIMUS_URL")
unimus_token = os.getenv("UNIMUS_TOKEN")
if not unimus_url:
raise ValueError("UNIMUS_URL environment variable must be set")
if not unimus_token:
raise ValueError("UNIMUS_TOKEN environment variable must be set")
return unimus_url, unimus_token
class HealthCheckHandler(BaseHTTPRequestHandler):
"""
HTTP handler that implements both liveness (/healthz) and readiness (/readyz) checks.
Follows Kubernetes health check patterns for production-ready containers.
"""
def __init__(self, client: Optional[UnimusRestClient], *args, **kwargs):
self.client = client
super().__init__(*args, **kwargs)
def do_GET(self) -> None:
if self.path == '/healthz':
# Liveness: Is the server process alive? Yes, because this code is running.
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_body = json.dumps({
'status': 'alive',
'server_version': '1.0.0',
'check_type': 'liveness'
})
self.wfile.write(response_body.encode('utf-8'))
return
if self.path == '/readyz':
# Readiness: Can the server do its work? Check Unimus connection.
try:
if self.client and self.client.validate_connection():
# Get actual Unimus health status
health_info = self.client.get_health()
unimus_status = health_info.get('status', 'UNKNOWN')
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_body = json.dumps({
'status': 'ready',
'unimus_status': unimus_status,
'server_version': '1.0.0',
'check_type': 'readiness'
})
self.wfile.write(response_body.encode('utf-8'))
else:
self.send_response(503) # Service Unavailable
self.send_header('Content-type', 'application/json')
self.end_headers()
response_body = json.dumps({
'status': 'not_ready',
'reason': 'Unimus connection failed or client not initialized',
'server_version': '1.0.0',
'check_type': 'readiness'
})
self.wfile.write(response_body.encode('utf-8'))
except Exception as e:
self.send_response(503)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_body = json.dumps({
'status': 'not_ready',
'reason': f'Readiness check error: {str(e)}',
'server_version': '1.0.0',
'check_type': 'readiness'
})
self.wfile.write(response_body.encode('utf-8'))
return
# Legacy /health endpoint for backward compatibility - maps to readiness
if self.path == '/health':
# Redirect to readiness check
self.send_response(301) # Moved Permanently
self.send_header('Location', '/readyz')
self.end_headers()
return
# Fallback for other paths
self.send_response(404)
self.end_headers()
self.wfile.write(b'Not Found')
def log_message(self, format: str, *args: Any) -> None:
# Suppress HTTP server logging to keep console clean
return
def start_health_check_server(client: Optional[UnimusRestClient], host: str = '0.0.0.0', port: int = 8080) -> None:
"""Start HTTP health check server in a separate daemon thread."""
try:
# Use partial to pass the client instance to the handler
handler = partial(HealthCheckHandler, client)
server_address = (host, port)
httpd = HTTPServer(server_address, handler)
health_thread = threading.Thread(target=httpd.serve_forever)
health_thread.daemon = True # Critical: ensures thread stops when main thread stops
health_thread.start()
logger.info(f"Health check server started. Liveness: http://{host}:{port}/healthz, Readiness: http://{host}:{port}/readyz")
except Exception as e:
logger.warning(f"Failed to start health check server: {e}")
def setup_unimus_client(config_path: Optional[str] = None) -> tuple[Optional[UnimusRestClient], Optional[UnimusConfig]]:
"""
Initialize Unimus client and configuration without crashing the application on failure.
Args:
config_path: Optional explicit path to configuration file
Returns:
Tuple of (client instance or None, configuration or None)
"""
try:
config = load_configuration(config_path)
# Create client with configuration settings including cache config
client = UnimusRestClient(
url=config.url,
token=config.token,
timeout=config.timeout,
verify_ssl=config.verify_ssl,
config=config # Pass complete config for cache initialization
)
# Note: Custom headers support could be added to UnimusRestClient in the future
logger.info(f"Unimus client initialized with URL: {config.url}")
return client, config
except ValueError as e:
logger.error(f"Unimus client configuration failed: {e}. Server starting in degraded mode.")
return None, None
def initialize_client() -> UnimusRestClient:
"""
Initialize and validate the Unimus client (legacy function for backward compatibility).
Returns:
Configured and validated UnimusRestClient
Raises:
ValueError: If client initialization or validation fails
"""
logger.warning("initialize_client() is deprecated. Use setup_unimus_client() instead.")
unimus_url, unimus_token = validate_environment()
# Initialize client
client = UnimusRestClient(url=unimus_url, token=unimus_token)
# Validate connection
if not client.validate_connection():
raise ValueError(
f"Failed to connect to Unimus at {unimus_url}. "
"Please check URL, token, and network connectivity."
)
logger.info(f"Successfully connected to Unimus at {unimus_url}")
return client
if __name__ == "__main__":
# 1. Initialize the client and configuration
unimus_client, config = setup_unimus_client()
# 2. Start the health check server (if enabled)
if config is None or config.enable_health_server:
health_port = config.health_check_port if config else 8080
start_health_check_server(unimus_client, port=health_port)
else:
logger.info("Health check server disabled by configuration")
# 3. Define the MCP server task to run in a thread
def run_mcp_server():
try:
logger.info("Starting Unimus MCP server on a dedicated thread...")
mcp.run(transport="stdio")
except Exception as e:
logger.error(f"MCP server thread encountered an error: {e}", exc_info=True)
# 4. Start the MCP server in a daemon thread
mcp_thread = threading.Thread(target=run_mcp_server)
mcp_thread.daemon = True
mcp_thread.start()
# 5. Perform initial connection validation
if unimus_client:
try:
if unimus_client.validate_connection():
health = unimus_client.get_health()
logger.info(f"Successfully connected to Unimus. System status: {health.get('status', 'UNKNOWN')}")
else:
logger.warning("Could not validate Unimus connection. Server is in degraded mode.")
except Exception as e:
logger.warning(f"Could not retrieve system health at startup: {e}. Server is in degraded mode.")
else:
logger.warning("Unimus client not initialized. Server is running in degraded mode.")
# 6. Keep the main thread alive to allow daemon threads to run
logger.info("Health endpoints: /healthz (liveness), /readyz (readiness), /health (legacy->readiness)")
try:
while True:
time.sleep(3600) # Sleep for a long time
except KeyboardInterrupt:
logger.info("Shutting down MCP server.")