Skip to main content
Glama

NetBox Read/Write MCP Server

secrets.py12.2 kB
#!/usr/bin/env python3 """ Enterprise Secrets Management for NetBox MCP Provides secure handling of sensitive configuration data including: - Environment variables with prefix support - Docker secrets integration - Kubernetes secrets mounting - Encrypted configuration files - Vault integration (future) Security Features: - No secrets in logs or error messages - Memory clearing of sensitive data - Configuration validation without exposing secrets - Multiple secret source priorities """ import os import logging from pathlib import Path from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class SecretMask: """Utility class to mask secrets in logs and error messages.""" @staticmethod def mask_secret(value: str, visible_chars: int = 4) -> str: """ Mask a secret value for safe logging. Args: value: The secret value to mask visible_chars: Number of characters to show at the end Returns: Masked string like "***abc123" for a token ending in "abc123" """ if not value or len(value) <= visible_chars: return "***" return "***" + value[-visible_chars:] @staticmethod def mask_url(url: str) -> str: """ Mask credentials in URLs for safe logging. Args: url: URL that might contain credentials Returns: URL with credentials masked """ if "@" in url: # URL contains credentials, mask them parts = url.split("@") if len(parts) == 2: # Keep protocol and domain, mask credentials protocol_creds = parts[0] domain_path = parts[1] if "://" in protocol_creds: protocol, creds = protocol_creds.split("://", 1) return f"{protocol}://***@{domain_path}" return url class SecretsManager: """ Enterprise secrets management with multiple source support. Source Priority (highest to lowest): 1. Environment variables (NETBOX_*) 2. Docker secrets (/run/secrets/*) 3. Kubernetes secrets (/var/secrets/*) 4. Encrypted config files 5. Plain config files (development only) """ # Docker secrets standard paths DOCKER_SECRETS_PATH = Path("/run/secrets") # Kubernetes secrets standard paths K8S_SECRETS_PATH = Path("/var/secrets") # Environment variable prefix ENV_PREFIX = "NETBOX_" def __init__(self): self._secrets_cache: Dict[str, Any] = {} self._load_sources() def _load_sources(self): """Load secrets from all available sources in priority order.""" logger.info("Loading secrets from available sources...") # Load from each source (priority order maintained by update order) sources_loaded = [] # 1. Check for Kubernetes secrets if self.K8S_SECRETS_PATH.exists(): k8s_secrets = self._load_kubernetes_secrets() if k8s_secrets: self._secrets_cache.update(k8s_secrets) sources_loaded.append("kubernetes") # 2. Check for Docker secrets if self.DOCKER_SECRETS_PATH.exists(): docker_secrets = self._load_docker_secrets() if docker_secrets: self._secrets_cache.update(docker_secrets) sources_loaded.append("docker") # 3. Load environment variables (highest priority) env_secrets = self._load_environment_secrets() if env_secrets: self._secrets_cache.update(env_secrets) sources_loaded.append("environment") logger.info(f"Secrets loaded from sources: {sources_loaded}") def _load_environment_secrets(self) -> Dict[str, Any]: """Load secrets from environment variables.""" secrets = {} # Load all NETBOX_* environment variables for key, value in os.environ.items(): if key.startswith(self.ENV_PREFIX): secrets[key] = value # Also load .env file if present (development) env_file = Path(".env") if env_file.exists(): try: secrets.update(self._load_env_file(env_file)) logger.debug("Loaded secrets from .env file") except Exception as e: logger.warning(f"Failed to load .env file: {e}") return secrets def _load_env_file(self, env_file: Path) -> Dict[str, str]: """Load environment variables from .env file.""" secrets = {} try: with open(env_file, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Parse KEY=VALUE format if '=' not in line: logger.warning(f"Invalid line {line_num} in {env_file}: {line}") continue key, value = line.split('=', 1) key = key.strip() value = value.strip() # Remove quotes if present if value.startswith('"') and value.endswith('"'): value = value[1:-1] elif value.startswith("'") and value.endswith("'"): value = value[1:-1] # Only load NetBox-related secrets if key.startswith(self.ENV_PREFIX): secrets[key] = value except Exception as e: logger.error(f"Error reading .env file: {e}") raise return secrets def _load_docker_secrets(self) -> Dict[str, Any]: """Load secrets from Docker secrets directory.""" secrets = {} if not self.DOCKER_SECRETS_PATH.exists(): return secrets # Map secret file names to config keys secret_mappings = { "netbox_url": "NETBOX_URL", "netbox_token": "NETBOX_TOKEN", "netbox_api_key": "NETBOX_TOKEN", # Alternative "netbox_password": "NETBOX_PASSWORD", "netbox_username": "NETBOX_USERNAME" } for secret_file, config_key in secret_mappings.items(): secret_path = self.DOCKER_SECRETS_PATH / secret_file if secret_path.exists(): try: with open(secret_path, 'r', encoding='utf-8') as f: secret_value = f.read().strip() if secret_value: secrets[config_key] = secret_value logger.debug(f"Loaded Docker secret: {secret_file}") except Exception as e: logger.warning(f"Failed to load Docker secret {secret_file}: {e}") return secrets def _load_kubernetes_secrets(self) -> Dict[str, Any]: """Load secrets from Kubernetes secrets directory.""" secrets = {} if not self.K8S_SECRETS_PATH.exists(): return secrets # Kubernetes typically mounts secrets as individual files secret_mappings = { "url": "NETBOX_URL", "token": "NETBOX_TOKEN", "api-key": "NETBOX_TOKEN", # Alternative "username": "NETBOX_USERNAME", "password": "NETBOX_PASSWORD" } for secret_file, config_key in secret_mappings.items(): secret_path = self.K8S_SECRETS_PATH / secret_file if secret_path.exists(): try: with open(secret_path, 'r', encoding='utf-8') as f: secret_value = f.read().strip() if secret_value: secrets[config_key] = secret_value logger.debug(f"Loaded Kubernetes secret: {secret_file}") except Exception as e: logger.warning(f"Failed to load Kubernetes secret {secret_file}: {e}") return secrets def get_secret(self, key: str, default: Optional[str] = None) -> Optional[str]: """ Get a secret value by key. Args: key: Secret key (e.g., "NETBOX_TOKEN") default: Default value if secret not found Returns: Secret value or default """ value = self._secrets_cache.get(key, default) return value def get_required_secret(self, key: str) -> str: """ Get a required secret value. Args: key: Secret key (e.g., "NETBOX_TOKEN") Returns: Secret value Raises: ValueError: If secret is not found """ value = self.get_secret(key) if not value: raise ValueError(f"Required secret '{key}' not found in any source") return value def mask_for_logging(self, key: str) -> str: """ Get a masked version of a secret for safe logging. Args: key: Secret key Returns: Masked secret value """ value = self.get_secret(key) if not value: return "Not Set" if key in ["NETBOX_URL"]: return SecretMask.mask_url(value) else: return SecretMask.mask_secret(value) def validate_secrets(self) -> Dict[str, bool]: """ Validate that required secrets are available. Returns: Dictionary mapping secret names to availability status """ required_secrets = ["NETBOX_URL", "NETBOX_TOKEN"] validation_results = {} for secret in required_secrets: validation_results[secret] = bool(self.get_secret(secret)) return validation_results def get_connection_info(self) -> Dict[str, Any]: """ Get connection information with masked secrets for safe logging. Returns: Dictionary with connection info suitable for logging """ return { "url": self.mask_for_logging("NETBOX_URL"), "token": self.mask_for_logging("NETBOX_TOKEN"), "has_ssl_cert": bool(self.get_secret("NETBOX_SSL_CERT_PATH")), "has_ssl_key": bool(self.get_secret("NETBOX_SSL_KEY_PATH")), "has_ca_cert": bool(self.get_secret("NETBOX_CA_CERT_PATH")) } def clear_cache(self): """Clear the secrets cache (security best practice).""" self._secrets_cache.clear() logger.debug("Secrets cache cleared") # Global secrets manager instance _secrets_manager: Optional[SecretsManager] = None def get_secrets_manager() -> SecretsManager: """Get the global secrets manager instance.""" global _secrets_manager if _secrets_manager is None: _secrets_manager = SecretsManager() return _secrets_manager def get_secret(key: str, default: Optional[str] = None) -> Optional[str]: """Convenience function to get a secret.""" return get_secrets_manager().get_secret(key, default) def get_required_secret(key: str) -> str: """Convenience function to get a required secret.""" return get_secrets_manager().get_required_secret(key) def validate_secrets() -> Dict[str, bool]: """Convenience function to validate secrets.""" return get_secrets_manager().validate_secrets() def get_connection_info() -> Dict[str, Any]: """Convenience function to get safe connection info.""" return get_secrets_manager().get_connection_info()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server