Skip to main content
Glama

OPNsense MCP Server

opnsense-mcp-server.py350 kB
#!/usr/bin/env python3 """ OPNsense MCP Server A Model Context Protocol (MCP) server implementation for managing OPNsense firewalls. This server allows Claude and other MCP-compatible clients to interact with all features exposed by the OPNsense API. This server is designed to be run on a local machine and not exposed to the public internet. Please see the README.md file for more information. """ import os import json import logging import asyncio import base64 import hashlib from typing import Dict, List, Any, Optional, Union, Tuple, TypedDict from datetime import datetime, timedelta from contextlib import asynccontextmanager from dataclasses import dataclass import urllib.parse import httpx from mcp.server.fastmcp import FastMCP, Context from mcp import types from pydantic import BaseModel, Field, field_validator, ValidationError import keyring from aiolimiter import AsyncLimiter # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("opnsense-mcp") # ========== ENHANCED EXCEPTION HIERARCHY ========== class OPNsenseError(Exception): """Base exception for all OPNsense-related errors with enhanced context.""" def __init__(self, message: str, error_code: Optional[str] = None, context: Optional[Dict[str, Any]] = None): super().__init__(message) self.message = message self.error_code = error_code or self.__class__.__name__ self.context = context or {} self.timestamp = datetime.utcnow() # Keep backward compatibility self.details = context or {} def to_dict(self) -> Dict[str, Any]: """Convert exception to dictionary for structured logging.""" return { "error_type": self.__class__.__name__, "error_code": self.error_code, "message": self.message, "context": self.context, "timestamp": self.timestamp.isoformat() } class ConfigurationError(OPNsenseError): """Client not configured or invalid configuration.""" pass class AuthenticationError(OPNsenseError): """Authentication failed.""" pass class APIError(OPNsenseError): """API call failed.""" def __init__(self, message: str, status_code: Optional[int] = None, response_text: Optional[str] = None): super().__init__(message) self.status_code = status_code self.response_text = response_text class NetworkError(OPNsenseError): """Network communication error.""" pass class RateLimitError(OPNsenseError): """Rate limit exceeded.""" pass class ValidationError(OPNsenseError): """Input parameter validation failed.""" pass class TimeoutError(OPNsenseError): """Request timed out.""" pass class ResourceNotFoundError(OPNsenseError): """Requested resource not found.""" pass class AuthorizationError(OPNsenseError): """User doesn't have permission for the requested operation.""" pass # Pydantic Models for Configuration and Validation class OPNsenseConfig(BaseModel): """Configuration for OPNsense connection.""" url: str = Field(..., description="OPNsense base URL") api_key: str = Field(..., description="API key") api_secret: str = Field(..., description="API secret", repr=False) # Hide in logs verify_ssl: bool = Field(default=True, description="Whether to verify SSL certificates") @field_validator('url') @classmethod def validate_url(cls, v): """Validate URL format.""" if not v.startswith(('http://', 'https://')): raise ValueError('URL must start with http:// or https://') return v.rstrip('/') class Config: """Pydantic configuration.""" validate_assignment = True # Connection Pool and Rate Limiting class ConnectionPool: """Manages OPNsense client connections with pooling and rate limiting.""" def __init__(self, max_connections: int = 5, ttl_seconds: int = 300): self.max_connections = max_connections self.ttl = timedelta(seconds=ttl_seconds) self.connections: Dict[str, Tuple[OPNsenseClient, datetime]] = {} self.lock = asyncio.Lock() # Rate limiting: 10 requests per second with burst of 20 self.rate_limiter = AsyncLimiter(max_rate=10, time_period=1.0) self.burst_limiter = AsyncLimiter(max_rate=20, time_period=1.0) def _get_config_hash(self, config: OPNsenseConfig) -> str: """Generate hash for config to use as pool key.""" config_str = f"{config.url}:{config.api_key}" return hashlib.sha256(config_str.encode()).hexdigest()[:16] async def get_client(self, config: OPNsenseConfig) -> 'OPNsenseClient': """Get or create client from pool.""" config_hash = self._get_config_hash(config) async with self.lock: # Check if we have a valid existing client if config_hash in self.connections: client, created_at = self.connections[config_hash] if datetime.now() - created_at < self.ttl: return client else: # Client expired, close and remove await client.close() del self.connections[config_hash] # Create new client client = OPNsenseClient(config, self) self.connections[config_hash] = (client, datetime.now()) # Cleanup old connections if pool is full if len(self.connections) > self.max_connections: await self._cleanup_oldest() return client async def _cleanup_oldest(self): """Remove oldest connection from pool.""" if not self.connections: return oldest_key = min( self.connections.keys(), key=lambda k: self.connections[k][1] ) client, _ = self.connections[oldest_key] await client.close() del self.connections[oldest_key] async def check_rate_limit(self): """Check and enforce rate limits.""" # Try burst limit first if not self.burst_limiter.has_capacity(): raise RateLimitError("Burst rate limit exceeded. Please slow down requests.") # Apply rate limit await self.rate_limiter.acquire() await self.burst_limiter.acquire() async def close_all(self): """Close all connections in pool.""" async with self.lock: for client, _ in self.connections.values(): await client.close() self.connections.clear() # Server State Management @dataclass class ServerState: """Managed server state with proper lifecycle.""" config: Optional[OPNsenseConfig] = None pool: Optional[ConnectionPool] = None session_created: Optional[datetime] = None session_ttl: timedelta = timedelta(hours=1) # 1 hour session timeout async def initialize(self, config: OPNsenseConfig): """Initialize server state with validation.""" await self.cleanup() # Store encrypted credentials try: await self._store_credentials(config) except Exception as e: logger.warning(f"Could not store credentials securely: {e}. Using in-memory storage.") self.config = config self.pool = ConnectionPool() self.session_created = datetime.now() # Validate connection client = await self.pool.get_client(config) await client.request("GET", API_CORE_FIRMWARE_STATUS) logger.info("OPNsense connection initialized successfully") async def _store_credentials(self, config: OPNsenseConfig): """Store credentials securely using keyring.""" service_name = "opnsense-mcp-server" username = f"{config.url}-{config.api_key[:8]}" # Partial key for identification # Store as JSON for easy retrieval credentials = { "url": config.url, "api_key": config.api_key, "api_secret": config.api_secret, "verify_ssl": config.verify_ssl } keyring.set_password(service_name, username, json.dumps(credentials)) logger.debug("Credentials stored securely") async def get_client(self) -> 'OPNsenseClient': """Get OPNsense client with session validation.""" if not self.config or not self.pool: raise ConfigurationError("OPNsense client not configured. Use configure_opnsense_connection first.") # Check session expiry if self.session_created and datetime.now() - self.session_created > self.session_ttl: logger.info("Session expired, reinitializing...") await self.initialize(self.config) return await self.pool.get_client(self.config) async def cleanup(self): """Cleanup resources.""" if self.pool: await self.pool.close_all() self.pool = None self.config = None self.session_created = None # ========== ERROR RESPONSE SYSTEM ========== from enum import Enum class ErrorSeverity(str, Enum): """Enumeration for error severity levels.""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class ErrorResponse: """Structured error response system with user-friendly messaging.""" def __init__(self, error: Exception, operation: str, severity: ErrorSeverity = ErrorSeverity.MEDIUM): self.error = error self.operation = operation self.severity = severity self.timestamp = datetime.utcnow() self.error_id = f"{operation}_{int(self.timestamp.timestamp())}" def get_user_message(self) -> str: """Get user-friendly error message.""" if isinstance(self.error, AuthenticationError): return "Authentication failed. Please check your API credentials." elif isinstance(self.error, AuthorizationError): return "Access denied. You don't have permission for this operation." elif isinstance(self.error, NetworkError): return "Cannot connect to OPNsense. Please check the URL and network connectivity." elif isinstance(self.error, ConfigurationError): return "OPNsense connection not configured. Please configure the connection first." elif isinstance(self.error, ValidationError): return f"Invalid input: {self.error.message}" elif isinstance(self.error, APIError): if self.error.status_code == 404: return "The requested resource was not found." elif self.error.status_code == 429: return "API rate limit exceeded. Please wait before trying again." else: return f"API error: {self.error.message}" elif isinstance(self.error, TimeoutError): return "Request timed out. The OPNsense server may be overloaded." elif isinstance(self.error, ResourceNotFoundError): return f"Resource not found: {self.error.message}" elif isinstance(self.error, RateLimitError): return "Rate limit exceeded. Please slow down your requests." else: return f"An unexpected error occurred during {self.operation}." def get_technical_details(self) -> Dict[str, Any]: """Get technical error details for logging.""" details = { "error_id": self.error_id, "operation": self.operation, "severity": self.severity.value, "timestamp": self.timestamp.isoformat(), "error_type": type(self.error).__name__, "message": str(self.error) } if isinstance(self.error, OPNsenseError): details.update(self.error.to_dict()) if isinstance(self.error, APIError): details["status_code"] = self.error.status_code details["response_text"] = self.error.response_text return details # ========== RETRY MECHANISM ========== class RetryConfig: """Configuration for retry mechanism with exponential backoff.""" def __init__(self, max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, exponential_backoff: bool = True, retryable_errors: Optional[List[type]] = None): self.max_attempts = max_attempts self.base_delay = base_delay self.max_delay = max_delay self.exponential_backoff = exponential_backoff self.retryable_errors = retryable_errors or [NetworkError, TimeoutError, APIError, RateLimitError] async def retry_with_backoff(func, *args, retry_config: RetryConfig = None, **kwargs): """Retry function with exponential backoff for transient failures.""" if retry_config is None: retry_config = RetryConfig() last_exception = None for attempt in range(retry_config.max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e # Check if error is retryable if not any(isinstance(e, err_type) for err_type in retry_config.retryable_errors): raise e # Don't retry on last attempt if attempt == retry_config.max_attempts - 1: break # Calculate delay with exponential backoff if retry_config.exponential_backoff: delay = min(retry_config.base_delay * (2 ** attempt), retry_config.max_delay) else: delay = retry_config.base_delay logger.info(f"Attempt {attempt + 1} failed, retrying in {delay}s: {str(e)}") await asyncio.sleep(delay) # All attempts failed raise last_exception # ========== REQUEST/RESPONSE LOGGING ========== class RequestResponseLogger: """Framework for logging API requests and responses with sensitive data protection.""" def __init__(self, logger: logging.Logger): self.logger = logger def log_request(self, method: str, url: str, headers: Optional[Dict] = None, data: Optional[Dict] = None, operation: str = "unknown"): """Log API request details with sensitive data sanitization.""" # Sanitize sensitive headers safe_headers = {} if headers: for key, value in headers.items(): if key.lower() in ['authorization', 'x-api-key']: safe_headers[key] = '[REDACTED]' else: safe_headers[key] = value log_data = { "operation": operation, "request": { "method": method, "url": url, "headers": safe_headers, "has_data": bool(data) } } self.logger.info(f"API Request: {json.dumps(log_data)}") def log_response(self, status_code: int, response_size: Optional[int] = None, duration_ms: Optional[float] = None, operation: str = "unknown", error: Optional[Exception] = None): """Log API response details with performance metrics.""" log_data = { "operation": operation, "response": { "status_code": status_code, "response_size": response_size, "duration_ms": duration_ms, "success": 200 <= status_code < 300, "has_error": bool(error) } } if error: log_data["error"] = str(error) level = logging.INFO if log_data["response"]["success"] else logging.WARNING self.logger.log(level, f"API Response: {json.dumps(log_data)}") # Initialize request/response logger request_logger = RequestResponseLogger(logger) # API Endpoint Constants # Core API_CORE_MENU_GET_ITEMS = "/core/menu/getItems" API_CORE_FIRMWARE_STATUS = "/core/firmware/status" API_CORE_SYSTEM_INFO = "/core/system/info" API_CORE_SERVICE_SEARCH = "/core/service/search" API_CORE_SERVICE_RESTART = "/core/service/restart" # Needs /{service_name} API_CORE_BACKUP_DOWNLOAD = "/core/backup/download" API_CORE_FIRMWARE_PLUGINS = "/core/firmware/plugins" API_CORE_FIRMWARE_INSTALL = "/core/firmware/install" # Needs /{plugin_name} # Firewall API_FIREWALL_FILTER_SEARCH_RULE = "/firewall/filter/searchRule" API_FIREWALL_FILTER_ADD_RULE = "/firewall/filter/addRule" API_FIREWALL_FILTER_DEL_RULE = "/firewall/filter/delRule" # Needs /{uuid} API_FIREWALL_FILTER_TOGGLE_RULE = "/firewall/filter/toggleRule" # Needs /{uuid}/{enabled_int} API_FIREWALL_FILTER_APPLY = "/firewall/filter/apply" API_FIREWALL_ALIAS_SEARCH_ITEM = "/firewall/alias/searchItem" API_FIREWALL_ALIAS_UTIL_ADD = "/firewall/alias_util/add" # Needs /{alias_name}/{address} API_FIREWALL_ALIAS_UTIL_DELETE = "/firewall/alias_util/delete" # Needs /{alias_name}/{address} API_FIREWALL_ALIAS_RECONFIGURE = "/firewall/alias/reconfigure" # Interfaces API_INTERFACES_OVERVIEW_INFO = "/interfaces/overview/interfacesInfo" # DHCP API_DHCP_LEASES_SEARCH = "/dhcp/leases/searchLease" # ========== INTERFACE & VLAN MANAGEMENT API ENDPOINTS ========== # Interface Overview & Control API_INTERFACES_OVERVIEW_GET_INTERFACE = "/interfaces/overview/getInterface" # /{interface} API_INTERFACES_OVERVIEW_RELOAD_INTERFACE = "/interfaces/overview/reloadInterface" # /{identifier} API_INTERFACES_OVERVIEW_EXPORT = "/interfaces/overview/export" # Bridge Management API_INTERFACES_BRIDGE_SEARCH = "/interfaces/bridge_settings/search_item" API_INTERFACES_BRIDGE_GET = "/interfaces/bridge_settings/get_item" # /{uuid} API_INTERFACES_BRIDGE_ADD = "/interfaces/bridge_settings/add_item" API_INTERFACES_BRIDGE_SET = "/interfaces/bridge_settings/set_item" # /{uuid} API_INTERFACES_BRIDGE_DEL = "/interfaces/bridge_settings/del_item" # /{uuid} API_INTERFACES_BRIDGE_RECONFIGURE = "/interfaces/bridge_settings/reconfigure" # LAGG (Link Aggregation) Management API_INTERFACES_LAGG_SEARCH = "/interfaces/lagg_settings/search_item" API_INTERFACES_LAGG_GET = "/interfaces/lagg_settings/get_item" # /{uuid} API_INTERFACES_LAGG_ADD = "/interfaces/lagg_settings/add_item" API_INTERFACES_LAGG_SET = "/interfaces/lagg_settings/set_item" # /{uuid} API_INTERFACES_LAGG_DEL = "/interfaces/lagg_settings/del_item" # /{uuid} API_INTERFACES_LAGG_RECONFIGURE = "/interfaces/lagg_settings/reconfigure" # VLAN Management API_INTERFACES_VLAN_SEARCH = "/interfaces/vlan_settings/search_item" API_INTERFACES_VLAN_GET = "/interfaces/vlan_settings/get_item" # /{uuid} API_INTERFACES_VLAN_ADD = "/interfaces/vlan_settings/add_item" API_INTERFACES_VLAN_SET = "/interfaces/vlan_settings/set_item" # /{uuid} API_INTERFACES_VLAN_DEL = "/interfaces/vlan_settings/del_item" # /{uuid} API_INTERFACES_VLAN_RECONFIGURE = "/interfaces/vlan_settings/reconfigure" # VXLAN Management API_INTERFACES_VXLAN_SEARCH = "/interfaces/vxlan_settings/search_item" API_INTERFACES_VXLAN_GET = "/interfaces/vxlan_settings/get_item" # /{uuid} API_INTERFACES_VXLAN_ADD = "/interfaces/vxlan_settings/add_item" API_INTERFACES_VXLAN_SET = "/interfaces/vxlan_settings/set_item" # /{uuid} API_INTERFACES_VXLAN_DEL = "/interfaces/vxlan_settings/del_item" # /{uuid} API_INTERFACES_VXLAN_RECONFIGURE = "/interfaces/vxlan_settings/reconfigure" # Virtual IP Management API_INTERFACES_VIP_SEARCH = "/interfaces/vip_settings/search_item" API_INTERFACES_VIP_GET = "/interfaces/vip_settings/get_item" # /{uuid} API_INTERFACES_VIP_ADD = "/interfaces/vip_settings/add_item" API_INTERFACES_VIP_SET = "/interfaces/vip_settings/set_item" # /{uuid} API_INTERFACES_VIP_DEL = "/interfaces/vip_settings/del_item" # /{uuid} API_INTERFACES_VIP_GET_UNUSED_VHID = "/interfaces/vip_settings/get_unused_vhid" API_INTERFACES_VIP_RECONFIGURE = "/interfaces/vip_settings/reconfigure" # Loopback Interface Management API_INTERFACES_LOOPBACK_SEARCH = "/interfaces/loopback_settings/search_item" API_INTERFACES_LOOPBACK_GET = "/interfaces/loopback_settings/get_item" # /{uuid} API_INTERFACES_LOOPBACK_ADD = "/interfaces/loopback_settings/add_item" API_INTERFACES_LOOPBACK_SET = "/interfaces/loopback_settings/set_item" # /{uuid} API_INTERFACES_LOOPBACK_DEL = "/interfaces/loopback_settings/del_item" # /{uuid} API_INTERFACES_LOOPBACK_RECONFIGURE = "/interfaces/loopback_settings/reconfigure" # GIF Tunnel Interface Management API_INTERFACES_GIF_SEARCH = "/interfaces/gif_settings/search_item" API_INTERFACES_GIF_GET = "/interfaces/gif_settings/get_item" # /{uuid} API_INTERFACES_GIF_ADD = "/interfaces/gif_settings/add_item" API_INTERFACES_GIF_SET = "/interfaces/gif_settings/set_item" # /{uuid} API_INTERFACES_GIF_DEL = "/interfaces/gif_settings/del_item" # /{uuid} API_INTERFACES_GIF_RECONFIGURE = "/interfaces/gif_settings/reconfigure" # GRE Tunnel Interface Management API_INTERFACES_GRE_SEARCH = "/interfaces/gre_settings/search_item" API_INTERFACES_GRE_GET = "/interfaces/gre_settings/get_item" # /{uuid} API_INTERFACES_GRE_ADD = "/interfaces/gre_settings/add_item" API_INTERFACES_GRE_SET = "/interfaces/gre_settings/set_item" # /{uuid} API_INTERFACES_GRE_DEL = "/interfaces/gre_settings/del_item" # /{uuid} API_INTERFACES_GRE_RECONFIGURE = "/interfaces/gre_settings/reconfigure" # ========== CERTIFICATE MANAGEMENT API ENDPOINTS ========== # Certificate Authority Management API_CERTIFICATES_CA_SEARCH = "/certificates/ca/search" API_CERTIFICATES_CA_GET = "/certificates/ca/get" # /{uuid} API_CERTIFICATES_CA_ADD = "/certificates/ca/add" API_CERTIFICATES_CA_SET = "/certificates/ca/set" # /{uuid} API_CERTIFICATES_CA_DEL = "/certificates/ca/del" # /{uuid} API_CERTIFICATES_CA_EXPORT = "/certificates/ca/export" # /{uuid} # Certificate Management API_CERTIFICATES_CERT_SEARCH = "/certificates/cert/search" API_CERTIFICATES_CERT_GET = "/certificates/cert/get" # /{uuid} API_CERTIFICATES_CERT_ADD = "/certificates/cert/add" API_CERTIFICATES_CERT_SET = "/certificates/cert/set" # /{uuid} API_CERTIFICATES_CERT_DEL = "/certificates/cert/del" # /{uuid} API_CERTIFICATES_CERT_EXPORT = "/certificates/cert/export" # /{uuid} # Certificate Signing Request (CSR) Management API_CERTIFICATES_CSR_SEARCH = "/certificates/csr/search" API_CERTIFICATES_CSR_GET = "/certificates/csr/get" # /{uuid} API_CERTIFICATES_CSR_ADD = "/certificates/csr/add" API_CERTIFICATES_CSR_SET = "/certificates/csr/set" # /{uuid} API_CERTIFICATES_CSR_DEL = "/certificates/csr/del" # /{uuid} # Certificate Revocation List (CRL) Management API_CERTIFICATES_CRL_SEARCH = "/certificates/crl/search" API_CERTIFICATES_CRL_GET = "/certificates/crl/get" # /{uuid} API_CERTIFICATES_CRL_ADD = "/certificates/crl/add" API_CERTIFICATES_CRL_SET = "/certificates/crl/set" # /{uuid} API_CERTIFICATES_CRL_DEL = "/certificates/crl/del" # /{uuid} # ACME (Let's Encrypt) Client Management API_CERTIFICATES_ACME_ACCOUNTS_SEARCH = "/certificates/acme_accounts/search" API_CERTIFICATES_ACME_ACCOUNTS_GET = "/certificates/acme_accounts/get" # /{uuid} API_CERTIFICATES_ACME_ACCOUNTS_ADD = "/certificates/acme_accounts/add" API_CERTIFICATES_ACME_ACCOUNTS_SET = "/certificates/acme_accounts/set" # /{uuid} API_CERTIFICATES_ACME_ACCOUNTS_DEL = "/certificates/acme_accounts/del" # /{uuid} # ACME Certificate Management API_CERTIFICATES_ACME_CERTS_SEARCH = "/certificates/acme_certs/search" API_CERTIFICATES_ACME_CERTS_GET = "/certificates/acme_certs/get" # /{uuid} API_CERTIFICATES_ACME_CERTS_ADD = "/certificates/acme_certs/add" API_CERTIFICATES_ACME_CERTS_SET = "/certificates/acme_certs/set" # /{uuid} API_CERTIFICATES_ACME_CERTS_DEL = "/certificates/acme_certs/del" # /{uuid} API_CERTIFICATES_ACME_CERTS_SIGN = "/certificates/acme_certs/sign" # /{uuid} API_CERTIFICATES_ACME_CERTS_REVOKE = "/certificates/acme_certs/revoke" # /{uuid} # ACME Challenge Management API_CERTIFICATES_ACME_CHALLENGES_SEARCH = "/certificates/acme_challenges/search" API_CERTIFICATES_ACME_CHALLENGES_GET = "/certificates/acme_challenges/get" # /{uuid} API_CERTIFICATES_ACME_CHALLENGES_ADD = "/certificates/acme_challenges/add" API_CERTIFICATES_ACME_CHALLENGES_SET = "/certificates/acme_challenges/set" # /{uuid} API_CERTIFICATES_ACME_CHALLENGES_DEL = "/certificates/acme_challenges/del" # /{uuid} # ACME Validation Methods API_CERTIFICATES_ACME_VALIDATIONS_SEARCH = "/certificates/acme_validations/search" API_CERTIFICATES_ACME_VALIDATIONS_GET = "/certificates/acme_validations/get" # /{uuid} API_CERTIFICATES_ACME_VALIDATIONS_ADD = "/certificates/acme_validations/add" API_CERTIFICATES_ACME_VALIDATIONS_SET = "/certificates/acme_validations/set" # /{uuid} API_CERTIFICATES_ACME_VALIDATIONS_DEL = "/certificates/acme_validations/del" # /{uuid} # Certificate Services Integration API_CERTIFICATES_SERVICE_RECONFIGURE = "/certificates/service/reconfigure" # ========== DNS & DHCP MANAGEMENT API ENDPOINTS ========== # DHCP Server Management API_DHCP_SERVER_SEARCH = "/dhcp/server/search" API_DHCP_SERVER_GET = "/dhcp/server/get" # Optional /{uuid} API_DHCP_SERVER_ADD = "/dhcp/server/add" API_DHCP_SERVER_SET = "/dhcp/server/set" # Needs /{uuid} API_DHCP_SERVER_DEL = "/dhcp/server/del" # Needs /{uuid} API_DHCP_SERVER_TOGGLE = "/dhcp/server/toggle" # Needs /{uuid}/{enabled} # DHCP Static Mappings (Reservations) API_DHCP_STATIC_SEARCH = "/dhcp/static/search" API_DHCP_STATIC_GET = "/dhcp/static/get" # Optional /{uuid} API_DHCP_STATIC_ADD = "/dhcp/static/add" API_DHCP_STATIC_SET = "/dhcp/static/set" # Needs /{uuid} API_DHCP_STATIC_DEL = "/dhcp/static/del" # Needs /{uuid} # DHCP Service Control API_DHCP_SERVICE_STATUS = "/dhcp/service/status" API_DHCP_SERVICE_START = "/dhcp/service/start" API_DHCP_SERVICE_STOP = "/dhcp/service/stop" API_DHCP_SERVICE_RESTART = "/dhcp/service/restart" API_DHCP_SERVICE_RECONFIGURE = "/dhcp/service/reconfigure" # DNS Resolver (Unbound) API_DNS_RESOLVER_SETTINGS = "/dns/resolver/settings" API_DNS_RESOLVER_SET_SETTINGS = "/dns/resolver/setSettings" API_DNS_RESOLVER_HOST_SEARCH = "/dns/resolver/searchHost" API_DNS_RESOLVER_HOST_GET = "/dns/resolver/getHost" # Optional /{uuid} API_DNS_RESOLVER_HOST_ADD = "/dns/resolver/addHost" API_DNS_RESOLVER_HOST_SET = "/dns/resolver/setHost" # Needs /{uuid} API_DNS_RESOLVER_HOST_DEL = "/dns/resolver/delHost" # Needs /{uuid} # DNS Resolver Domain Overrides API_DNS_RESOLVER_DOMAIN_SEARCH = "/dns/resolver/searchDomain" API_DNS_RESOLVER_DOMAIN_GET = "/dns/resolver/getDomain" # Optional /{uuid} API_DNS_RESOLVER_DOMAIN_ADD = "/dns/resolver/addDomain" API_DNS_RESOLVER_DOMAIN_SET = "/dns/resolver/setDomain" # Needs /{uuid} API_DNS_RESOLVER_DOMAIN_DEL = "/dns/resolver/delDomain" # Needs /{uuid} # DNS Forwarder (dnsmasq) API_DNS_FORWARDER_SETTINGS = "/dns/forwarder/settings" API_DNS_FORWARDER_SET_SETTINGS = "/dns/forwarder/setSettings" API_DNS_FORWARDER_HOST_SEARCH = "/dns/forwarder/searchHost" API_DNS_FORWARDER_HOST_GET = "/dns/forwarder/getHost" # Optional /{uuid} API_DNS_FORWARDER_HOST_ADD = "/dns/forwarder/addHost" API_DNS_FORWARDER_HOST_SET = "/dns/forwarder/setHost" # Needs /{uuid} API_DNS_FORWARDER_HOST_DEL = "/dns/forwarder/delHost" # Needs /{uuid} # DNS Service Control API_DNS_RESOLVER_SERVICE_STATUS = "/dns/resolver/status" API_DNS_RESOLVER_SERVICE_START = "/dns/resolver/start" API_DNS_RESOLVER_SERVICE_STOP = "/dns/resolver/stop" API_DNS_RESOLVER_SERVICE_RESTART = "/dns/resolver/restart" API_DNS_RESOLVER_SERVICE_RECONFIGURE = "/dns/resolver/reconfigure" API_DNS_FORWARDER_SERVICE_STATUS = "/dns/forwarder/status" API_DNS_FORWARDER_SERVICE_START = "/dns/forwarder/start" API_DNS_FORWARDER_SERVICE_STOP = "/dns/forwarder/stop" API_DNS_FORWARDER_SERVICE_RESTART = "/dns/forwarder/restart" API_DNS_FORWARDER_SERVICE_RECONFIGURE = "/dns/forwarder/reconfigure" # Diagnostics API_DIAGNOSTICS_LOG_FIREWALL = "/diagnostics/log/firewall" API_DIAGNOSTICS_SYSTEM_PROCESSOR = "/diagnostics/system/processor" API_DIAGNOSTICS_SYSTEM_MEMORY = "/diagnostics/system/memory" API_DIAGNOSTICS_SYSTEM_STORAGE = "/diagnostics/system/storage" API_DIAGNOSTICS_SYSTEM_TEMPERATURE = "/diagnostics/system/temperature" # ========== LOGGING & LOG MANAGEMENT API ENDPOINTS ========== # Core Logging API_DIAGNOSTICS_LOG_SYSTEM = "/diagnostics/log/system" API_DIAGNOSTICS_LOG_SYSTEM_SEARCH = "/diagnostics/log/system/search" API_DIAGNOSTICS_LOG_ACCESS = "/diagnostics/log/access" API_DIAGNOSTICS_LOG_AUTHENTICATION = "/diagnostics/log/authentication" API_DIAGNOSTICS_LOG_DHCP = "/diagnostics/log/dhcp" API_DIAGNOSTICS_LOG_DNS = "/diagnostics/log/dns" API_DIAGNOSTICS_LOG_OPENVPN = "/diagnostics/log/openvpn" API_DIAGNOSTICS_LOG_IPSEC = "/diagnostics/log/ipsec" API_DIAGNOSTICS_LOG_SQUID = "/diagnostics/log/squid" API_DIAGNOSTICS_LOG_HAPROXY = "/diagnostics/log/haproxy" # Log Management API_DIAGNOSTICS_LOG_CLEAR = "/diagnostics/log/clear" # Needs /{log_type} API_DIAGNOSTICS_LOG_EXPORT = "/diagnostics/log/export" # Needs /{log_type} API_DIAGNOSTICS_LOG_STATS = "/diagnostics/log/stats" # Needs /{log_type} API_DIAGNOSTICS_LOG_TAIL = "/diagnostics/log/tail" # Needs /{log_type} API_DIAGNOSTICS_LOG_SETTINGS = "/diagnostics/log/settings" API_DIAGNOSTICS_LOG_SET_SETTINGS = "/diagnostics/log/setSettings" # Log Streaming and Real-time API_DIAGNOSTICS_LOG_STREAM = "/diagnostics/log/stream" # Needs /{log_type} API_DIAGNOSTICS_LOG_FIREWALL_STREAM = "/diagnostics/log/firewall/stream" # Advanced Log Analysis API_DIAGNOSTICS_LOG_PATTERNS = "/diagnostics/log/patterns" # Needs /{log_type} API_DIAGNOSTICS_LOG_SUMMARY = "/diagnostics/log/summary" # Needs /{log_type} API_DIAGNOSTICS_LOG_SEARCH_ALL = "/diagnostics/log/search" # Routes API_ROUTES_GET = "/routes/routes/get" # VPN API_OPENVPN_SERVICE_STATUS = "/openvpn/service/getStatus" API_IPSEC_SERVICE_STATUS = "/ipsec/service/status" API_WIREGUARD_SERVICE_SHOW = "/wireguard/service/show" # NAT (Network Address Translation) # Source NAT (Outbound NAT) API_FIREWALL_SOURCE_NAT_SEARCH_RULE = "/firewall/source_nat/search_rule" API_FIREWALL_SOURCE_NAT_GET_RULE = "/firewall/source_nat/get_rule" # Needs /{uuid} API_FIREWALL_SOURCE_NAT_ADD_RULE = "/firewall/source_nat/add_rule" API_FIREWALL_SOURCE_NAT_SET_RULE = "/firewall/source_nat/set_rule" # Needs /{uuid} API_FIREWALL_SOURCE_NAT_DEL_RULE = "/firewall/source_nat/del_rule" # Needs /{uuid} API_FIREWALL_SOURCE_NAT_TOGGLE_RULE = "/firewall/source_nat/toggle_rule" # Needs /{uuid}/{enabled} # One-to-One NAT API_FIREWALL_ONE_TO_ONE_SEARCH_RULE = "/firewall/one_to_one/search_rule" API_FIREWALL_ONE_TO_ONE_GET_RULE = "/firewall/one_to_one/get_rule" # Needs /{uuid} API_FIREWALL_ONE_TO_ONE_ADD_RULE = "/firewall/one_to_one/add_rule" API_FIREWALL_ONE_TO_ONE_SET_RULE = "/firewall/one_to_one/set_rule" # Needs /{uuid} API_FIREWALL_ONE_TO_ONE_DEL_RULE = "/firewall/one_to_one/del_rule" # Needs /{uuid} API_FIREWALL_ONE_TO_ONE_TOGGLE_RULE = "/firewall/one_to_one/toggle_rule" # Needs /{uuid}/{enabled} # Firewall Configuration Management API_FIREWALL_FILTER_BASE_APPLY = "/firewall/filter_base/apply" API_FIREWALL_FILTER_BASE_SAVEPOINT = "/firewall/filter_base/savepoint" API_FIREWALL_FILTER_BASE_REVERT = "/firewall/filter_base/revert" # ========== USER & GROUP MANAGEMENT API ENDPOINTS ========== # Core User Management API_CORE_USER_SEARCH = "/core/user/searchUser" API_CORE_USER_GET = "/core/user/getUser" # Optional /{uuid} API_CORE_USER_ADD = "/core/user/addUser" API_CORE_USER_SET = "/core/user/setUser" # Needs /{uuid} API_CORE_USER_DEL = "/core/user/delUser" # Needs /{uuid} API_CORE_USER_TOGGLE = "/core/user/toggleUser" # Needs /{uuid}/{enabled} # Core Group Management API_CORE_GROUP_SEARCH = "/core/group/searchGroup" API_CORE_GROUP_GET = "/core/group/getGroup" # Optional /{uuid} API_CORE_GROUP_ADD = "/core/group/addGroup" API_CORE_GROUP_SET = "/core/group/setGroup" # Needs /{uuid} API_CORE_GROUP_DEL = "/core/group/delGroup" # Needs /{uuid} # Authentication & Privileges API_CORE_AUTH_PRIVILEGES = "/core/auth/privileges" API_CORE_AUTH_SERVERS = "/core/auth/authServers" API_CORE_AUTH_TEST = "/core/auth/testAuthentication" # Configuration Management API_CORE_CONFIG_RELOAD = "/core/config/reload" # ========== TRAFFIC SHAPER API ENDPOINTS ========== # Traffic Shaper Service Controller API_TRAFFICSHAPER_SERVICE_FLUSHRELOAD = "/trafficshaper/service/flushreload" API_TRAFFICSHAPER_SERVICE_RECONFIGURE = "/trafficshaper/service/reconfigure" API_TRAFFICSHAPER_SERVICE_STATISTICS = "/trafficshaper/service/statistics" # Traffic Shaper Settings - Pipes API_TRAFFICSHAPER_SETTINGS_ADD_PIPE = "/trafficshaper/settings/add_pipe" API_TRAFFICSHAPER_SETTINGS_DEL_PIPE = "/trafficshaper/settings/del_pipe" # Needs /{uuid} API_TRAFFICSHAPER_SETTINGS_GET_PIPE = "/trafficshaper/settings/get_pipe" # Optional /{uuid} API_TRAFFICSHAPER_SETTINGS_SET_PIPE = "/trafficshaper/settings/set_pipe" # Needs /{uuid} API_TRAFFICSHAPER_SETTINGS_TOGGLE_PIPE = "/trafficshaper/settings/toggle_pipe" # Needs /{uuid}/{enabled} API_TRAFFICSHAPER_SETTINGS_SEARCH_PIPES = "/trafficshaper/settings/search_pipes" # Traffic Shaper Settings - Queues API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE = "/trafficshaper/settings/add_queue" API_TRAFFICSHAPER_SETTINGS_DEL_QUEUE = "/trafficshaper/settings/del_queue" # Needs /{uuid} API_TRAFFICSHAPER_SETTINGS_GET_QUEUE = "/trafficshaper/settings/get_queue" # Optional /{uuid} API_TRAFFICSHAPER_SETTINGS_SET_QUEUE = "/trafficshaper/settings/set_queue" # Needs /{uuid} API_TRAFFICSHAPER_SETTINGS_TOGGLE_QUEUE = "/trafficshaper/settings/toggle_queue" # Needs /{uuid}/{enabled} API_TRAFFICSHAPER_SETTINGS_SEARCH_QUEUES = "/trafficshaper/settings/search_queues" # Traffic Shaper Settings - Rules API_TRAFFICSHAPER_SETTINGS_ADD_RULE = "/trafficshaper/settings/add_rule" API_TRAFFICSHAPER_SETTINGS_DEL_RULE = "/trafficshaper/settings/del_rule" # Needs /{uuid} API_TRAFFICSHAPER_SETTINGS_GET_RULE = "/trafficshaper/settings/get_rule" # Optional /{uuid} API_TRAFFICSHAPER_SETTINGS_SET_RULE = "/trafficshaper/settings/set_rule" # Needs /{uuid} API_TRAFFICSHAPER_SETTINGS_TOGGLE_RULE = "/trafficshaper/settings/toggle_rule" # Needs /{uuid}/{enabled} API_TRAFFICSHAPER_SETTINGS_SEARCH_RULES = "/trafficshaper/settings/search_rules" # Traffic Shaper Settings - General API_TRAFFICSHAPER_SETTINGS_GET = "/trafficshaper/settings/get" API_TRAFFICSHAPER_SETTINGS_SET = "/trafficshaper/settings/set" class OPNsenseClient: """Client for interacting with OPNsense API.""" def __init__(self, config: OPNsenseConfig, pool: Optional['ConnectionPool'] = None): """Initialize OPNsense API client. Args: config: Configuration for OPNsense connection pool: Connection pool for rate limiting """ self.base_url = config.url.rstrip("/") self.api_key = config.api_key self.api_secret = config.api_secret self.verify_ssl = config.verify_ssl self.pool = pool # Enhanced client configuration self.client = httpx.AsyncClient( verify=self.verify_ssl, timeout=httpx.Timeout(30.0, pool=5.0), limits=httpx.Limits( max_keepalive_connections=5, max_connections=10, keepalive_expiry=30.0 ) ) # Set up Basic Auth auth_str = f"{self.api_key}:{self.api_secret}" self.auth_header = base64.b64encode(auth_str.encode()).decode() logger.info(f"Initialized OPNsense client for {self.base_url}") async def close(self): """Close the httpx client.""" await self.client.aclose() async def request( self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, operation: str = "api_request", timeout: float = 30.0, retry_config: Optional[RetryConfig] = None ) -> Dict[str, Any]: """Make a request to the OPNsense API with comprehensive error handling and logging. Args: method: HTTP method (GET, POST, etc.) endpoint: API endpoint (e.g., "/core/firmware/status") data: Request payload for POST requests params: Query parameters for GET requests operation: Name of operation for logging/error context timeout: Request timeout in seconds retry_config: Configuration for retry mechanism Returns: Response from the API as a dictionary Raises: ValidationError: For invalid input parameters AuthenticationError: If authentication fails (401) AuthorizationError: If authorization fails (403) ResourceNotFoundError: For not found errors (404) RateLimitError: If rate limit is exceeded (429) APIError: For other HTTP errors NetworkError: For network connection issues TimeoutError: For request timeouts """ # Validate inputs if not method or not endpoint: raise ValidationError("Method and endpoint are required", context={"method": method, "endpoint": endpoint}) if method.upper() not in ["GET", "POST", "PUT", "DELETE", "PATCH"]: raise ValidationError(f"Unsupported HTTP method: {method}", context={"method": method}) # Apply rate limiting if pool is available if self.pool: await self.pool.check_rate_limit() url = f"{self.base_url}/api{endpoint}" headers = { "Authorization": f"Basic {self.auth_header}", "Accept": "application/json", "User-Agent": "OPNsense-MCP-Server/1.0" } # Log the request request_logger.log_request(method, url, headers, data, operation) start_time = datetime.utcnow() async def _make_request(): """Internal request function for retry mechanism.""" try: if method.upper() == "GET": response = await self.client.get(url, headers=headers, params=params, timeout=timeout) elif method.upper() == "POST": response = await self.client.post(url, headers=headers, json=data, timeout=timeout) elif method.upper() == "PUT": response = await self.client.put(url, headers=headers, json=data, timeout=timeout) elif method.upper() == "DELETE": response = await self.client.delete(url, headers=headers, params=params, timeout=timeout) elif method.upper() == "PATCH": response = await self.client.patch(url, headers=headers, json=data, timeout=timeout) # Calculate response time duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 response_size = len(response.content) if response.content else 0 # Handle HTTP errors with proper exception mapping if response.status_code == 401: request_logger.log_response(response.status_code, response_size, duration_ms, operation) raise AuthenticationError("Authentication failed - invalid API credentials", context={"status_code": 401, "endpoint": endpoint}) elif response.status_code == 403: request_logger.log_response(response.status_code, response_size, duration_ms, operation) raise AuthorizationError("Access denied - insufficient permissions", context={"status_code": 403, "endpoint": endpoint}) elif response.status_code == 404: request_logger.log_response(response.status_code, response_size, duration_ms, operation) raise ResourceNotFoundError(f"Resource not found: {endpoint}", context={"status_code": 404, "endpoint": endpoint}) elif response.status_code == 429: request_logger.log_response(response.status_code, response_size, duration_ms, operation) raise RateLimitError("API rate limit exceeded", context={"status_code": 429, "endpoint": endpoint, "retry_after": response.headers.get("Retry-After")}) elif not (200 <= response.status_code < 300): request_logger.log_response(response.status_code, response_size, duration_ms, operation) try: error_data = response.json() except: error_data = {"error": response.text} raise APIError(f"API error: {response.status_code}", status_code=response.status_code, response_text=response.text) # Parse JSON response try: result = response.json() request_logger.log_response(response.status_code, response_size, duration_ms, operation) return result except json.JSONDecodeError as e: request_logger.log_response(response.status_code, response_size, duration_ms, operation, e) raise APIError(f"Invalid JSON response from OPNsense API: {str(e)}", status_code=response.status_code, response_text=response.text) except httpx.TimeoutException as e: duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 request_logger.log_response(0, 0, duration_ms, operation, e) raise TimeoutError(f"Request timed out after {timeout}s", context={"timeout": timeout, "endpoint": endpoint}) except httpx.ConnectError as e: duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 request_logger.log_response(0, 0, duration_ms, operation, e) raise NetworkError(f"Cannot connect to OPNsense at {self.base_url}", context={"base_url": self.base_url, "endpoint": endpoint, "error": str(e)}) except httpx.RequestError as e: duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 request_logger.log_response(0, 0, duration_ms, operation, e) raise NetworkError(f"Network error: {str(e)}", context={"endpoint": endpoint, "error": str(e)}) # Use retry mechanism if configured if retry_config: return await retry_with_backoff(_make_request, retry_config=retry_config) else: return await _make_request() # Initialize MCP server mcp = FastMCP("OPNsense MCP Server", description="Manage OPNsense firewalls via MCP") # Initialize server state management server_state = ServerState() # ========== ERROR HANDLING HELPERS ========== async def handle_tool_error(ctx: Context, operation: str, error: Exception, severity: ErrorSeverity = ErrorSeverity.MEDIUM) -> str: """Centralized error handling for MCP tools. Args: ctx: MCP context for error reporting operation: Name of the operation that failed error: The exception that occurred severity: Severity level of the error Returns: User-friendly error message """ error_response = ErrorResponse(error, operation, severity) # Log technical details technical_details = error_response.get_technical_details() logger.error(f"Tool error in {operation}: {json.dumps(technical_details, indent=2)}") # Report error to MCP context user_message = error_response.get_user_message() await ctx.error(user_message) return f"Error: {user_message}" def validate_uuid(uuid: str, operation: str) -> None: """Validate UUID format. Args: uuid: UUID string to validate operation: Operation name for error context Raises: ValidationError: If UUID format is invalid """ import re uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE) if not uuid_pattern.match(uuid): raise ValidationError( f"Invalid UUID format: {uuid}", context={"uuid": uuid, "operation": operation, "expected_format": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"} ) def validate_firewall_parameters(action: str, direction: str, ipprotocol: str, protocol: str, operation: str) -> None: """Validate common firewall rule parameters. Args: action: Rule action (pass, block, reject) direction: Traffic direction (in, out) ipprotocol: IP protocol (inet, inet6) protocol: Transport protocol (tcp, udp, icmp, any) operation: Operation name for error context Raises: ValidationError: If any parameter is invalid """ valid_actions = ["pass", "block", "reject"] valid_directions = ["in", "out"] valid_ipprotocols = ["inet", "inet6"] valid_protocols = ["tcp", "udp", "icmp", "any"] if action not in valid_actions: raise ValidationError(f"Invalid action '{action}'. Must be one of: {valid_actions}", context={"operation": operation, "parameter": "action", "value": action}) if direction not in valid_directions: raise ValidationError(f"Invalid direction '{direction}'. Must be one of: {valid_directions}", context={"operation": operation, "parameter": "direction", "value": direction}) if ipprotocol not in valid_ipprotocols: raise ValidationError(f"Invalid IP protocol '{ipprotocol}'. Must be one of: {valid_ipprotocols}", context={"operation": operation, "parameter": "ipprotocol", "value": ipprotocol}) if protocol not in valid_protocols: raise ValidationError(f"Invalid protocol '{protocol}'. Must be one of: {valid_protocols}", context={"operation": operation, "parameter": "protocol", "value": protocol}) async def get_opnsense_client() -> OPNsenseClient: """Get OPNsense client from server state with validation.""" return await server_state.get_client() @mcp.tool(name="get_api_endpoints", description="List available API endpoints from OPNsense") async def get_api_endpoints( ctx: Context, module: Optional[str] = None ) -> str: """List available API endpoints from OPNsense. Args: ctx: MCP context module: Optional module name to filter endpoints Returns: JSON string of available endpoints """ try: client = await get_opnsense_client() # Get all available modules first response = await client.request("GET", API_CORE_MENU_GET_ITEMS) if module: # Filter endpoints by module if specified if module in response: return json.dumps(response[module], indent=2) else: available_modules = list(response.keys()) return f"Module '{module}' not found. Available modules: {available_modules}" else: # Return all modules and endpoints return json.dumps(response, indent=2) except ConfigurationError as e: await ctx.error(str(e)) return f"Configuration Error: {str(e)}" except (AuthenticationError, NetworkError, APIError) as e: logger.error(f"Error in get_api_endpoints: {str(e)}", exc_info=True) await ctx.error(f"Error fetching API endpoints: {str(e)}") return f"Error: {str(e)}" except Exception as e: logger.error(f"Unexpected error in get_api_endpoints: {str(e)}", exc_info=True) await ctx.error(f"Unexpected error: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="get_system_status", description="Get OPNsense system status") async def get_system_status(ctx: Context) -> str: """Get OPNsense system status. Args: ctx: MCP context Returns: Formatted system status information """ try: client = await get_opnsense_client() # Get firmware status firmware = await client.request("GET", API_CORE_FIRMWARE_STATUS) # Get system information system_info = await client.request("GET", API_CORE_SYSTEM_INFO) # Get service status services = await client.request( "POST", API_CORE_SERVICE_SEARCH, data={"current": 1, "rowCount": -1, "searchPhrase": ""} ) # Format and return the combined status status = { "firmware": firmware, "system": system_info, "services": services.get("rows", []) } return json.dumps(status, indent=2) except ConfigurationError as e: await ctx.error(str(e)) return f"Configuration Error: {str(e)}" except (AuthenticationError, NetworkError, APIError) as e: logger.error(f"Error in get_system_status: {str(e)}", exc_info=True) await ctx.error(f"Error fetching system status: {str(e)}") return f"Error: {str(e)}" except Exception as e: logger.error(f"Unexpected error in get_system_status: {str(e)}", exc_info=True) await ctx.error(f"Unexpected error: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="firewall_get_rules", description="Get OPNsense firewall rules") async def firewall_get_rules( ctx: Context, search_phrase: str = "", page: int = 1, rows_per_page: int = 20 ) -> str: """Get OPNsense firewall rules. Args: ctx: MCP context search_phrase: Optional search phrase to filter rules page: Page number for pagination rows_per_page: Number of rows per page Returns: JSON string of firewall rules """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", API_FIREWALL_FILTER_SEARCH_RULE, data={ "current": page, "rowCount": rows_per_page, "searchPhrase": search_phrase } ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in firewall_get_rules: {str(e)}", exc_info=True) await ctx.error(f"Error fetching firewall rules: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="firewall_add_rule", description="Add a new firewall rule") async def firewall_add_rule( ctx: Context, description: str, action: str = "pass", interface: str = "lan", direction: str = "in", ipprotocol: str = "inet", protocol: str = "any", source_net: str = "any", destination_net: str = "any", destination_port: str = "", enabled: bool = True ) -> str: """Add a new firewall rule with comprehensive validation and error handling. Args: ctx: MCP context description: Rule description action: Rule action (pass, block, reject) interface: Network interface direction: Traffic direction (in, out) ipprotocol: IP protocol (inet for IPv4, inet6 for IPv6) protocol: Transport protocol (tcp, udp, icmp, any) source_net: Source network/host destination_net: Destination network/host destination_port: Destination port(s) enabled: Whether the rule is enabled Returns: JSON string with the result """ try: client = await get_opnsense_client() # Validate firewall rule parameters validate_firewall_parameters(action, direction, ipprotocol, protocol, "firewall_add_rule") # Validate description if not description or len(description.strip()) == 0: raise ValidationError("Rule description is required", context={"operation": "firewall_add_rule", "parameter": "description"}) # Additional validation for specific protocols and ports if protocol in ["tcp", "udp"] and destination_port and not destination_port.replace("-", "").replace(",", "").replace(" ", "").isdigit(): # Simple port validation - could be enhanced if not all(part.strip().isdigit() or "-" in part for part in destination_port.split(",")): raise ValidationError(f"Invalid port format: {destination_port}", context={"operation": "firewall_add_rule", "parameter": "destination_port", "value": destination_port}) # Prepare rule data rule_data = { "rule": { "description": description, "action": action, "interface": interface, "direction": direction, "ipprotocol": ipprotocol, "protocol": protocol, "source_net": source_net, "destination_net": destination_net, "destination_port": destination_port, "enabled": "1" if enabled else "0" } } # Add the rule add_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_ADD_RULE, data=rule_data ) # Apply changes await ctx.info("Rule added, applying changes...") apply_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_APPLY ) return json.dumps({ "add_result": add_result, "apply_result": apply_result }, indent=2) except Exception as e: return await handle_tool_error(ctx, "firewall_add_rule", e, ErrorSeverity.HIGH) @mcp.tool(name="firewall_delete_rule", description="Delete a firewall rule by UUID") async def firewall_delete_rule(ctx: Context, uuid: str) -> str: """Delete a firewall rule by UUID with enhanced validation and error handling. Args: ctx: MCP context uuid: UUID of the rule to delete Returns: JSON string with the result """ try: client = await get_opnsense_client() # Validate UUID format validate_uuid(uuid, "firewall_delete_rule") # Delete the rule delete_result = await client.request( "POST", f"{API_FIREWALL_FILTER_DEL_RULE}/{uuid}", operation="delete_firewall_rule" ) # Apply changes with retry for reliability await ctx.info("Rule deleted, applying changes...") retry_config = RetryConfig(max_attempts=2, base_delay=1.0) apply_result = await client.request( "POST", API_FIREWALL_FILTER_APPLY, operation="apply_firewall_changes", retry_config=retry_config ) return json.dumps({ "delete_result": delete_result, "apply_result": apply_result }, indent=2) except Exception as e: return await handle_tool_error(ctx, "firewall_delete_rule", e, ErrorSeverity.HIGH) @mcp.tool(name="firewall_toggle_rule", description="Enable or disable a firewall rule") async def firewall_toggle_rule(ctx: Context, uuid: str, enabled: bool) -> str: """Enable or disable a firewall rule. Args: ctx: MCP context uuid: UUID of the rule to toggle enabled: Whether to enable or disable the rule Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Toggle the rule toggle_result = await opnsense_client.request( "POST", f"{API_FIREWALL_FILTER_TOGGLE_RULE}/{uuid}/{1 if enabled else 0}" ) # Apply changes await ctx.info(f"Rule {'enabled' if enabled else 'disabled'}, applying changes...") apply_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_APPLY ) return json.dumps({ "toggle_result": toggle_result, "apply_result": apply_result }, indent=2) except Exception as e: logger.error(f"Error in firewall_toggle_rule (uuid: {uuid}, enabled: {enabled}): {str(e)}", exc_info=True) await ctx.error(f"Error toggling firewall rule: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="get_interfaces", description="Get network interfaces") async def get_interfaces(ctx: Context) -> str: """Get network interfaces. Args: ctx: MCP context Returns: JSON string of network interfaces """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request("GET", API_INTERFACES_OVERVIEW_INFO) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_interfaces: {str(e)}", exc_info=True) await ctx.error(f"Error fetching interfaces: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="get_dhcp_leases", description="Get DHCP leases") async def get_dhcp_leases(ctx: Context) -> str: """Get DHCP leases. Args: ctx: MCP context Returns: JSON string of DHCP leases """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request("GET", API_DHCP_LEASES_SEARCH) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_dhcp_leases: {str(e)}", exc_info=True) await ctx.error(f"Error fetching DHCP leases: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="get_firewall_aliases", description="Get firewall aliases") async def get_firewall_aliases( ctx: Context, search_phrase: str = "", page: int = 1, rows_per_page: int = 20 ) -> str: """Get firewall aliases. Args: ctx: MCP context search_phrase: Optional search phrase to filter aliases page: Page number for pagination rows_per_page: Number of rows per page Returns: JSON string of firewall aliases """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", API_FIREWALL_ALIAS_SEARCH_ITEM, data={ "current": page, "rowCount": rows_per_page, "searchPhrase": search_phrase } ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_firewall_aliases: {str(e)}", exc_info=True) await ctx.error(f"Error fetching firewall aliases: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="add_to_alias", description="Add an entry to a firewall alias") async def add_to_alias(ctx: Context, alias_name: str, address: str) -> str: """Add an entry to a firewall alias. Args: ctx: MCP context alias_name: Name of the alias address: IP address, network, or hostname to add Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Add to alias add_result = await opnsense_client.request( "POST", f"{API_FIREWALL_ALIAS_UTIL_ADD}/{alias_name}/{urllib.parse.quote_plus(address)}" ) # Reconfigure aliases await ctx.info("Entry added, applying changes...") reconfigure_result = await opnsense_client.request( "POST", API_FIREWALL_ALIAS_RECONFIGURE ) return json.dumps({ "add_result": add_result, "reconfigure_result": reconfigure_result }, indent=2) except Exception as e: logger.error(f"Error in add_to_alias (alias: {alias_name}, address: {address}): {str(e)}", exc_info=True) await ctx.error(f"Error adding to alias: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="delete_from_alias", description="Delete an entry from a firewall alias") async def delete_from_alias(ctx: Context, alias_name: str, address: str) -> str: """Delete an entry from a firewall alias. Args: ctx: MCP context alias_name: Name of the alias address: IP address, network, or hostname to delete Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Delete from alias delete_result = await opnsense_client.request( "POST", f"{API_FIREWALL_ALIAS_UTIL_DELETE}/{alias_name}/{urllib.parse.quote_plus(address)}" ) # Reconfigure aliases await ctx.info("Entry deleted, applying changes...") reconfigure_result = await opnsense_client.request( "POST", API_FIREWALL_ALIAS_RECONFIGURE ) return json.dumps({ "delete_result": delete_result, "reconfigure_result": reconfigure_result }, indent=2) except Exception as e: logger.error(f"Error in delete_from_alias (alias: {alias_name}, address: {address}): {str(e)}", exc_info=True) await ctx.error(f"Error deleting from alias: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="exec_api_call", description="Execute a custom API call to OPNsense") async def exec_api_call( ctx: Context, method: str, endpoint: str, data: Optional[str] = None, params: Optional[str] = None ) -> str: """Execute a custom API call to OPNsense. Args: ctx: MCP context method: HTTP method (GET, POST) endpoint: API endpoint (e.g., "/core/firmware/status") data: JSON string of POST data (optional) params: JSON string of query parameters for GET (optional) Returns: JSON string with the API response """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: data_dict = json.loads(data) if data else None params_dict = json.loads(params) if params else None response = await opnsense_client.request( method, endpoint, data=data_dict, params=params_dict ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in exec_api_call (method: {method}, endpoint: {endpoint}): {str(e)}", exc_info=True) await ctx.error(f"Error executing API call: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="configure_opnsense_connection", description="Configure the OPNsense connection with enhanced security") async def configure_opnsense_connection( ctx: Context, url: str, api_key: str, api_secret: str, verify_ssl: bool = True ) -> str: """Configure the OPNsense connection with enhanced security and validation. Args: ctx: MCP context url: OPNsense base URL (e.g., "https://192.168.1.1") api_key: API key api_secret: API secret verify_ssl: Whether to verify SSL certificates Returns: Success message """ try: # Validate configuration using Pydantic config = OPNsenseConfig( url=url, api_key=api_key, api_secret=api_secret, verify_ssl=verify_ssl ) # Initialize server state with new configuration await server_state.initialize(config) await ctx.info("OPNsense connection configured and validated successfully") return "OPNsense connection configured successfully with enhanced security" except AuthenticationError as e: error_msg = f"Authentication failed: {str(e)}" logger.error(error_msg, exc_info=True) await ctx.error(error_msg) return f"Authentication Error: {str(e)}" except NetworkError as e: error_msg = f"Network error: {str(e)}" logger.error(error_msg, exc_info=True) await ctx.error(error_msg) return f"Network Error: {str(e)}" except ValidationError as e: error_msg = f"Configuration validation failed: {str(e)}" logger.error(error_msg, exc_info=True) await ctx.error(error_msg) return f"Configuration Error: {str(e)}" except Exception as e: error_msg = f"Error configuring OPNsense connection: {str(e)}" logger.error(f"Unexpected error in configure_opnsense_connection (url: {url}): {str(e)}", exc_info=True) await ctx.error(error_msg) return f"Error: {str(e)}" # More tools for other OPNsense modules can be added here @mcp.tool(name="get_firewall_logs", description="Get firewall log entries") async def get_firewall_logs( ctx: Context, count: int = 100, filter_text: str = "" ) -> str: """Get firewall log entries. Args: ctx: MCP context count: Number of log entries to retrieve filter_text: Optional text to filter log entries Returns: JSON string of log entries """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "GET", API_DIAGNOSTICS_LOG_FIREWALL, params={"limit": count, "filter": filter_text} ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_firewall_logs: {str(e)}", exc_info=True) await ctx.error(f"Error fetching firewall logs: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="restart_service", description="Restart an OPNsense service") async def restart_service(ctx: Context, service_name: str) -> str: """Restart an OPNsense service. Args: ctx: MCP context service_name: Name of the service to restart Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", f"{API_CORE_SERVICE_RESTART}/{service_name}" ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in restart_service (service: {service_name}): {str(e)}", exc_info=True) await ctx.error(f"Error restarting service: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="backup_config", description="Create a backup of the OPNsense configuration") async def backup_config(ctx: Context) -> str: """Create a backup of the OPNsense configuration. Args: ctx: MCP context Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request("POST", API_CORE_BACKUP_DOWNLOAD) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in backup_config: {str(e)}", exc_info=True) await ctx.error(f"Error creating backup: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="get_system_routes", description="Get system routing table") async def get_system_routes(ctx: Context) -> str: """Get system routing table. Args: ctx: MCP context Returns: JSON string of system routes """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request("GET", API_ROUTES_GET) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_system_routes: {str(e)}", exc_info=True) await ctx.error(f"Error fetching system routes: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="get_system_health", description="Get system health metrics") async def get_system_health(ctx: Context) -> str: """Get system health metrics. Args: ctx: MCP context Returns: JSON string of system health metrics """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Get multiple health metrics cpu = await opnsense_client.request("GET", API_DIAGNOSTICS_SYSTEM_PROCESSOR) memory = await opnsense_client.request("GET", API_DIAGNOSTICS_SYSTEM_MEMORY) disk = await opnsense_client.request("GET", API_DIAGNOSTICS_SYSTEM_STORAGE) temperature = await opnsense_client.request("GET", API_DIAGNOSTICS_SYSTEM_TEMPERATURE) # Combine results return json.dumps({ "cpu": cpu, "memory": memory, "disk": disk, "temperature": temperature }, indent=2) except Exception as e: logger.error(f"Error in get_system_health: {str(e)}", exc_info=True) await ctx.error(f"Error fetching system health: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="list_plugins", description="List installed plugins") async def list_plugins(ctx: Context) -> str: """List installed plugins. Args: ctx: MCP context Returns: JSON string of installed plugins """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request("GET", API_CORE_FIRMWARE_PLUGINS) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in list_plugins: {str(e)}", exc_info=True) await ctx.error(f"Error listing plugins: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="install_plugin", description="Install a plugin") async def install_plugin(ctx: Context, plugin_name: str) -> str: """Install a plugin. Args: ctx: MCP context plugin_name: Name of the plugin to install Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", f"{API_CORE_FIRMWARE_INSTALL}/{plugin_name}" ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in install_plugin (plugin: {plugin_name}): {str(e)}", exc_info=True) await ctx.error(f"Error installing plugin: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="get_vpn_connections", description="Get VPN connection status") async def get_vpn_connections(ctx: Context, vpn_type: str = "OpenVPN") -> str: """Get VPN connection status. Args: ctx: MCP context vpn_type: Type of VPN (OpenVPN, IPsec, WireGuard) Returns: JSON string of VPN connections """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: if vpn_type.lower() == "openvpn": response = await opnsense_client.request("GET", API_OPENVPN_SERVICE_STATUS) elif vpn_type.lower() == "ipsec": response = await opnsense_client.request("GET", API_IPSEC_SERVICE_STATUS) elif vpn_type.lower() == "wireguard": response = await opnsense_client.request("GET", API_WIREGUARD_SERVICE_SHOW) else: return f"Unsupported VPN type: {vpn_type}" return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_vpn_connections (type: {vpn_type}): {str(e)}", exc_info=True) await ctx.error(f"Error fetching VPN connections: {str(e)}") return f"Error: {str(e)}" # --- Firewall Audit Feature --- async def _get_all_rules(client: OPNsenseClient) -> List[Dict[str, Any]]: """Helper to fetch all firewall rules using pagination.""" all_rules = [] current_page = 1 rows_per_page = 500 # Fetch in larger batches while True: try: response = await client.request( "POST", API_FIREWALL_FILTER_SEARCH_RULE, data={ "current": current_page, "rowCount": rows_per_page, "searchPhrase": "" } ) rules = response.get("rows", []) if not rules: break all_rules.extend(rules) if len(rules) < rows_per_page: break # Last page current_page += 1 except Exception as e: logger.error(f"Error fetching page {current_page} of firewall rules: {e}", exc_info=True) # Return what we have so far, audit can proceed with partial data break return all_rules async def _get_wan_interfaces(client: OPNsenseClient) -> List[str]: """Helper to identify WAN interfaces.""" wan_interfaces = [] try: interfaces_info = await client.request("GET", API_INTERFACES_OVERVIEW_INFO) for if_name, if_data in interfaces_info.items(): # Heuristic: Interface is likely WAN if it has a gateway and isn't loopback/internal # OPNsense often names the default WAN 'wan' but users can rename it. # Checking for a non-empty gateway field is a common indicator. if if_data.get("gateway") and if_data.get("gateway") != "none": wan_interfaces.append(if_name) # Fallback: Explicitly check for common WAN names if gateway check fails elif if_name.lower() == 'wan' and not wan_interfaces: wan_interfaces.append(if_name) except Exception as e: logger.error(f"Error fetching interfaces info for audit: {e}", exc_info=True) # If still no WAN identified, maybe return a default guess? For now, return empty. if not wan_interfaces: logger.warning("Could not reliably identify WAN interfaces for audit.") return wan_interfaces @mcp.tool(name="perform_firewall_audit", description="Performs a basic security audit of the OPNsense configuration.") async def perform_firewall_audit(ctx: Context) -> str: """Performs a basic security audit of the OPNsense configuration. Checks for common potential security issues like outdated firmware/plugins, management access from WAN, overly permissive rules, etc. Args: ctx: MCP context Returns: JSON string containing a list of audit findings. """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." findings = [] await ctx.info("Starting OPNsense firewall audit...") try: # --- Fetch Data --- await ctx.info("Fetching required data (firmware, rules, interfaces, services)...") firmware_status = await opnsense_client.request("GET", API_CORE_FIRMWARE_STATUS) all_rules = await _get_all_rules(opnsense_client) wan_interfaces = await _get_wan_interfaces(opnsense_client) services_response = await opnsense_client.request( "POST", API_CORE_SERVICE_SEARCH, data={"current": 1, "rowCount": -1, "searchPhrase": ""} # Fetch all services ) running_services = {svc['name']: svc for svc in services_response.get("rows", []) if svc.get('running') == 1} await ctx.info(f"Identified WAN interfaces: {wan_interfaces or 'None'}") await ctx.info(f"Fetched {len(all_rules)} firewall rules.") # --- Perform Checks --- # 1. Firmware Update Check if firmware_status.get("status") == "update_available": findings.append({ "check": "Firmware Update", "severity": "Medium", "description": f"Firmware update available. Current: {firmware_status.get('product_version', 'N/A')}, New: {firmware_status.get('product_new_version', 'N/A')}", "recommendation": "Consider updating OPNsense firmware via the GUI (System -> Firmware -> Updates)." }) else: findings.append({ "check": "Firmware Update", "severity": "Info", "description": "Firmware appears to be up-to-date.", "recommendation": None }) # 2. Plugin Update Check plugin_updates = firmware_status.get("upgrade_packages", []) if plugin_updates: plugin_names = [p.get('name', 'N/A') for p in plugin_updates] findings.append({ "check": "Plugin Updates", "severity": "Medium", "description": f"Updates available for {len(plugin_updates)} plugins: {', '.join(plugin_names)}", "recommendation": "Consider updating plugins via the GUI (System -> Firmware -> Updates)." }) else: findings.append({ "check": "Plugin Updates", "severity": "Info", "description": "Installed plugins appear to be up-to-date.", "recommendation": None }) # 3. WAN Management Access Check management_ports = {'80', '443', '22'} # HTTP, HTTPS, SSH insecure_protocols = {'21', '23'} # FTP, Telnet wan_mgmt_rules = [] wan_insecure_proto_rules = [] wan_any_any_rules = [] block_rules_no_log = [] for rule in all_rules: # Skip disabled rules if not rule.get('enabled', '0') == '1': continue interface = rule.get('interface') is_wan_rule = interface in wan_interfaces # Check logging on block/reject rules if rule.get('action') in ['block', 'reject'] and not rule.get('log', '0') == '1': block_rules_no_log.append(rule.get("descr", rule.get("uuid", "N/A"))) if not is_wan_rule: continue # Only check WAN rules for the following # Basic parsing - assumes 'any' if specific fields are missing/empty src_net = rule.get("source_net", "any") dst_net = rule.get("destination_net", "any") dst_port = rule.get("destination_port", "any") protocol = rule.get("protocol", "any").lower() action = rule.get('action') # Check Any-Any rule if action == 'pass' and src_net == 'any' and dst_net == 'any' and dst_port == 'any': wan_any_any_rules.append(rule.get("descr", rule.get("uuid", "N/A"))) # Check Management Access # Simplified: Checks if dest port is one of the management ports # Doesn't check destination address (assumes firewall itself) if action == 'pass' and dst_port in management_ports: wan_mgmt_rules.append(rule.get("descr", rule.get("uuid", "N/A"))) # Check Insecure Protocols if action == 'pass' and dst_port in insecure_protocols: wan_insecure_proto_rules.append(rule.get("descr", rule.get("uuid", "N/A"))) if wan_mgmt_rules: findings.append({ "check": "WAN Management Access", "severity": "High", "description": f"Potential firewall rules allowing management access (HTTP/HTTPS/SSH) from WAN found: {', '.join(wan_mgmt_rules)}", "recommendation": "Review these rules. Exposing management interfaces to the WAN is highly discouraged. Use VPNs for remote access." }) if wan_any_any_rules: findings.append({ "check": "WAN Allow Any-Any", "severity": "High", "description": f"Potential 'allow any source to any destination' rules found on WAN interface(s): {', '.join(wan_any_any_rules)}", "recommendation": "Review these rules. 'Allow any-any' rules on WAN are extremely dangerous and likely misconfigured." }) if wan_insecure_proto_rules: findings.append({ "check": "WAN Insecure Protocols", "severity": "High", "description": f"Potential rules allowing insecure protocols (e.g., Telnet, FTP) from WAN found: {', '.join(wan_insecure_proto_rules)}", "recommendation": "Review these rules. Avoid using insecure protocols, especially over the WAN." }) if block_rules_no_log: findings.append({ "check": "Firewall Log Settings", "severity": "Low", "description": f"{len(block_rules_no_log)} firewall rule(s) that block or reject traffic do not have logging enabled (Examples: {', '.join(block_rules_no_log[:3])}{'...' if len(block_rules_no_log) > 3 else ''}).", "recommendation": "Consider enabling logging on block/reject rules (especially the default deny, if applicable) to monitor potential malicious activity." }) else: findings.append({ "check": "Firewall Log Settings", "severity": "Info", "description": "Block/reject rules checked appear to have logging enabled.", "recommendation": None }) # 4. Check for enabled UPnP service if "miniupnpd" in running_services: findings.append({ "check": "UPnP Service", "severity": "Low", "description": "The UPnP (Universal Plug and Play) service is enabled and running.", "recommendation": "Ensure UPnP is intentionally enabled and configured securely if needed. Disable it if unused, as it can potentially open ports automatically." }) await ctx.info("Firewall audit checks complete.") except Exception as e: logger.error(f"Error during firewall audit: {str(e)}", exc_info=True) await ctx.error(f"Error performing firewall audit: {str(e)}") # Return partial findings if any were collected before the error if findings: findings.append({ "check": "Audit Error", "severity": "Critical", "description": f"An error occurred during the audit: {str(e)}. Results may be incomplete.", "recommendation": "Check server logs for details." }) return json.dumps({"audit_findings": findings}, indent=2) else: return json.dumps({"error": f"Failed to perform audit: {str(e)}"}, indent=2) return json.dumps({"audit_findings": findings}, indent=2) # --- End Firewall Audit Feature --- # ========== LOGGING & LOG MANAGEMENT ========== @mcp.tool() async def get_system_logs(ctx: RequestContext, log_type: str = "system", count: int = 100, filter_text: str = "", severity: str = "all") -> str: """ Retrieve system logs from OPNsense with filtering capabilities. Args: log_type: Type of log to retrieve (system, access, authentication, dhcp, dns) count: Number of log entries to retrieve (default: 100, max: 1000) filter_text: Optional text to filter log entries severity: Log severity filter (all, emergency, alert, critical, error, warning, notice, info, debug) Returns: JSON response with log entries """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Validate parameters valid_log_types = ["system", "access", "authentication", "dhcp", "dns", "openvpn", "ipsec"] if log_type not in valid_log_types: return json.dumps({ "error": f"Invalid log type '{log_type}'. Valid types: {valid_log_types}" }, indent=2) valid_severities = ["all", "emergency", "alert", "critical", "error", "warning", "notice", "info", "debug"] if severity not in valid_severities: return json.dumps({ "error": f"Invalid severity '{severity}'. Valid severities: {valid_severities}" }, indent=2) # Limit count to reasonable maximum if count > 1000: count = 1000 # Map log type to API endpoint endpoint_map = { "system": API_DIAGNOSTICS_LOG_SYSTEM, "access": API_DIAGNOSTICS_LOG_ACCESS, "authentication": API_DIAGNOSTICS_LOG_AUTHENTICATION, "dhcp": API_DIAGNOSTICS_LOG_DHCP, "dns": API_DIAGNOSTICS_LOG_DNS, "openvpn": API_DIAGNOSTICS_LOG_OPENVPN, "ipsec": API_DIAGNOSTICS_LOG_IPSEC } endpoint = endpoint_map.get(log_type, API_DIAGNOSTICS_LOG_SYSTEM) # Build parameters params = {"limit": count} if filter_text: params["filter"] = filter_text if severity != "all": params["severity"] = severity response = await client.request("GET", endpoint, params=params, operation="get_system_logs") return json.dumps({ "log_type": log_type, "count": count, "filter_applied": filter_text, "severity_filter": severity, "entries": response }, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_system_logs", e) @mcp.tool() async def get_service_logs(ctx: RequestContext, service_name: str, count: int = 100, filter_text: str = "") -> str: """ Retrieve logs for specific OPNsense services. Args: service_name: Name of the service (squid, haproxy, openvpn, ipsec, dhcp, dns) count: Number of log entries to retrieve (default: 100) filter_text: Optional text to filter log entries Returns: JSON response with service log entries """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Map service names to endpoints service_endpoints = { "squid": API_DIAGNOSTICS_LOG_SQUID, "haproxy": API_DIAGNOSTICS_LOG_HAPROXY, "openvpn": API_DIAGNOSTICS_LOG_OPENVPN, "ipsec": API_DIAGNOSTICS_LOG_IPSEC, "dhcp": API_DIAGNOSTICS_LOG_DHCP, "dns": API_DIAGNOSTICS_LOG_DNS } if service_name not in service_endpoints: return json.dumps({ "error": f"Service '{service_name}' not supported. Available services: {list(service_endpoints.keys())}" }, indent=2) endpoint = service_endpoints[service_name] params = {"limit": count} if filter_text: params["filter"] = filter_text response = await client.request("GET", endpoint, params=params, operation="get_service_logs") return json.dumps({ "service": service_name, "count": count, "filter_applied": filter_text, "entries": response }, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_service_logs", e) @mcp.tool() async def search_logs(ctx: RequestContext, search_query: str, log_types: str = "system,firewall", max_results: int = 200, case_sensitive: bool = False) -> str: """ Search across multiple log types for specific patterns or text. Args: search_query: Text or pattern to search for log_types: Comma-separated list of log types to search (system,firewall,access,authentication,dhcp,dns) max_results: Maximum number of results to return per log type case_sensitive: Whether to perform case-sensitive search Returns: JSON response with search results from all specified log types """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." if not search_query or len(search_query.strip()) < 2: return json.dumps({ "error": "Search query must be at least 2 characters long" }, indent=2) # Parse log types requested_types = [t.strip().lower() for t in log_types.split(",")] available_types = ["system", "firewall", "access", "authentication", "dhcp", "dns", "openvpn", "ipsec"] invalid_types = [t for t in requested_types if t not in available_types] if invalid_types: return json.dumps({ "error": f"Invalid log types: {invalid_types}. Available: {available_types}" }, indent=2) search_results = {} for log_type in requested_types: try: # Use the appropriate endpoint for each log type if log_type == "firewall": endpoint = API_DIAGNOSTICS_LOG_FIREWALL else: endpoint_map = { "system": API_DIAGNOSTICS_LOG_SYSTEM, "access": API_DIAGNOSTICS_LOG_ACCESS, "authentication": API_DIAGNOSTICS_LOG_AUTHENTICATION, "dhcp": API_DIAGNOSTICS_LOG_DHCP, "dns": API_DIAGNOSTICS_LOG_DNS, "openvpn": API_DIAGNOSTICS_LOG_OPENVPN, "ipsec": API_DIAGNOSTICS_LOG_IPSEC } endpoint = endpoint_map.get(log_type) if not endpoint: continue params = { "limit": max_results, "filter": search_query } if not case_sensitive: params["case_insensitive"] = "true" response = await client.request("GET", endpoint, params=params, operation=f"search_{log_type}_logs") # Extract relevant data and count matches if isinstance(response, dict) and "rows" in response: entries = response["rows"] elif isinstance(response, list): entries = response else: entries = [response] if response else [] search_results[log_type] = { "matches_found": len(entries), "entries": entries[:max_results] # Ensure we don't exceed limit } except Exception as log_error: search_results[log_type] = { "error": f"Failed to search {log_type} logs: {str(log_error)}", "matches_found": 0, "entries": [] } # Calculate total matches total_matches = sum(result.get("matches_found", 0) for result in search_results.values()) return json.dumps({ "search_query": search_query, "log_types_searched": requested_types, "case_sensitive": case_sensitive, "total_matches": total_matches, "results_by_log_type": search_results }, indent=2) except Exception as e: return await handle_tool_error(ctx, "search_logs", e) @mcp.tool() async def export_logs(ctx: RequestContext, log_type: str, export_format: str = "json", date_range: str = "today", include_filters: str = "") -> str: """ Export logs in various formats for analysis or archival. Args: log_type: Type of log to export (system, firewall, access, authentication, dhcp, dns) export_format: Export format (json, csv, text) date_range: Date range for export (today, yesterday, week, month, custom) include_filters: Optional filters to apply during export Returns: JSON response with export information and download details """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Validate parameters valid_formats = ["json", "csv", "text"] if export_format not in valid_formats: return json.dumps({ "error": f"Invalid export format '{export_format}'. Valid formats: {valid_formats}" }, indent=2) valid_ranges = ["today", "yesterday", "week", "month", "custom"] if date_range not in valid_ranges: return json.dumps({ "error": f"Invalid date range '{date_range}'. Valid ranges: {valid_ranges}" }, indent=2) # Try using export API endpoint first params = { "format": export_format, "range": date_range } if include_filters: params["filters"] = include_filters try: export_response = await client.request("GET", f"{API_DIAGNOSTICS_LOG_EXPORT}/{log_type}", params=params, operation="export_logs") return json.dumps({ "export_status": "completed", "log_type": log_type, "format": export_format, "date_range": date_range, "filters_applied": include_filters, "export_data": export_response }, indent=2) except (APIError, ResourceNotFoundError): # If export endpoint doesn't exist, fall back to retrieving logs and formatting endpoint_map = { "system": API_DIAGNOSTICS_LOG_SYSTEM, "firewall": API_DIAGNOSTICS_LOG_FIREWALL, "access": API_DIAGNOSTICS_LOG_ACCESS, "authentication": API_DIAGNOSTICS_LOG_AUTHENTICATION, "dhcp": API_DIAGNOSTICS_LOG_DHCP, "dns": API_DIAGNOSTICS_LOG_DNS, "openvpn": API_DIAGNOSTICS_LOG_OPENVPN, "ipsec": API_DIAGNOSTICS_LOG_IPSEC } endpoint = endpoint_map.get(log_type) if not endpoint: return json.dumps({ "error": f"Unsupported log type for export: {log_type}" }, indent=2) # Retrieve logs with larger limit for export retrieve_params = {"limit": 10000} if include_filters: retrieve_params["filter"] = include_filters logs_response = await client.request("GET", endpoint, params=retrieve_params, operation=f"retrieve_logs_for_export") return json.dumps({ "export_status": "completed_via_retrieval", "log_type": log_type, "format": export_format, "date_range": date_range, "filters_applied": include_filters, "note": "Export completed by retrieving logs (export API not available)", "entry_count": len(logs_response) if isinstance(logs_response, list) else 1, "export_data": logs_response }, indent=2) except Exception as e: return await handle_tool_error(ctx, "export_logs", e) @mcp.tool() async def get_log_statistics(ctx: RequestContext, log_type: str = "all", time_period: str = "24h") -> str: """ Get statistical analysis of log entries including counts, patterns, and trends. Args: log_type: Type of log to analyze (all, system, firewall, access, authentication) time_period: Time period for analysis (1h, 6h, 24h, 7d, 30d) Returns: JSON response with log statistics and analysis """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Try using statistics API endpoint try: params = {"period": time_period} stats_response = await client.request("GET", f"{API_DIAGNOSTICS_LOG_STATS}/{log_type}", params=params, operation="get_log_statistics") return json.dumps({ "statistics_source": "api_endpoint", "log_type": log_type, "time_period": time_period, "statistics": stats_response }, indent=2) except (APIError, ResourceNotFoundError): # If stats endpoint doesn't exist, generate basic statistics if log_type == "all": log_types_to_check = ["system", "firewall", "access", "authentication"] else: log_types_to_check = [log_type] statistics = {} for check_type in log_types_to_check: try: # Get recent logs for analysis endpoint_map = { "system": API_DIAGNOSTICS_LOG_SYSTEM, "firewall": API_DIAGNOSTICS_LOG_FIREWALL, "access": API_DIAGNOSTICS_LOG_ACCESS, "authentication": API_DIAGNOSTICS_LOG_AUTHENTICATION } endpoint = endpoint_map.get(check_type) if not endpoint: continue response = await client.request("GET", endpoint, params={"limit": 1000}, operation=f"get_{check_type}_stats") # Generate basic statistics if isinstance(response, list): entries = response elif isinstance(response, dict) and "rows" in response: entries = response["rows"] else: entries = [] statistics[check_type] = { "total_entries": len(entries), "sample_period": time_period, "entries_per_hour": round(len(entries) / 24, 2) if time_period == "24h" else "N/A" } except Exception as type_error: statistics[check_type] = { "error": f"Failed to get statistics: {str(type_error)}", "total_entries": 0 } return json.dumps({ "statistics_source": "calculated_from_logs", "log_type": log_type, "time_period": time_period, "statistics": statistics, "note": "Statistics calculated from log retrieval (stats API not available)" }, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_log_statistics", e) @mcp.tool() async def clear_logs(ctx: RequestContext, log_type: str, confirmation: str = "") -> str: """ Clear specific log files with confirmation requirement. Args: log_type: Type of log to clear (system, firewall, access, authentication, dhcp, dns) confirmation: Must be "CONFIRM_CLEAR" to proceed with clearing Returns: JSON response with clear operation status """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Require explicit confirmation if confirmation != "CONFIRM_CLEAR": return json.dumps({ "error": "Log clearing requires explicit confirmation", "instruction": "Set confirmation parameter to 'CONFIRM_CLEAR' to proceed", "warning": "This action will permanently delete log entries and cannot be undone" }, indent=2) # Validate log type valid_log_types = ["system", "firewall", "access", "authentication", "dhcp", "dns", "openvpn", "ipsec"] if log_type not in valid_log_types: return json.dumps({ "error": f"Invalid log type '{log_type}'. Valid types: {valid_log_types}" }, indent=2) try: # Try using dedicated clear API clear_response = await client.request("POST", f"{API_DIAGNOSTICS_LOG_CLEAR}/{log_type}", operation="clear_logs") return json.dumps({ "clear_status": "completed", "log_type": log_type, "message": f"Successfully cleared {log_type} logs", "response": clear_response }, indent=2) except (APIError, ResourceNotFoundError): return json.dumps({ "clear_status": "api_unavailable", "log_type": log_type, "message": f"Clear API not available for {log_type} logs", "recommendation": "Use OPNsense web interface: Firewall > Log Files > Clear Logs" }, indent=2) except Exception as e: return await handle_tool_error(ctx, "clear_logs", e) @mcp.tool() async def configure_logging(ctx: RequestContext, log_level: str = "info", remote_logging: bool = False, remote_server: str = "", log_rotation: str = "daily") -> str: """ Configure logging settings for OPNsense system. Args: log_level: Logging level (emergency, alert, critical, error, warning, notice, info, debug) remote_logging: Whether to enable remote logging remote_server: Remote syslog server (required if remote_logging is True) log_rotation: Log rotation schedule (daily, weekly, monthly) Returns: JSON response with configuration status """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Validate parameters valid_levels = ["emergency", "alert", "critical", "error", "warning", "notice", "info", "debug"] if log_level not in valid_levels: return json.dumps({ "error": f"Invalid log level '{log_level}'. Valid levels: {valid_levels}" }, indent=2) valid_rotations = ["daily", "weekly", "monthly"] if log_rotation not in valid_rotations: return json.dumps({ "error": f"Invalid log rotation '{log_rotation}'. Valid options: {valid_rotations}" }, indent=2) if remote_logging and not remote_server: return json.dumps({ "error": "Remote server must be specified when remote logging is enabled" }, indent=2) # Get current settings first try: current_settings = await client.request("GET", API_DIAGNOSTICS_LOG_SETTINGS, operation="get_current_log_settings") except (APIError, ResourceNotFoundError): current_settings = {} # Prepare configuration data config_data = { "log_level": log_level, "remote_logging": "1" if remote_logging else "0", "log_rotation": log_rotation } if remote_logging and remote_server: config_data["remote_server"] = remote_server try: # Try to apply settings via API set_response = await client.request("POST", API_DIAGNOSTICS_LOG_SET_SETTINGS, data=config_data, operation="configure_logging") return json.dumps({ "configuration_status": "completed", "previous_settings": current_settings, "new_settings": config_data, "response": set_response }, indent=2) except (APIError, ResourceNotFoundError): return json.dumps({ "configuration_status": "api_unavailable", "message": "Logging configuration API not available", "intended_settings": config_data, "recommendation": "Use OPNsense web interface: System > Settings > Logging" }, indent=2) except Exception as e: return await handle_tool_error(ctx, "configure_logging", e) @mcp.tool() async def analyze_security_events(ctx: RequestContext, time_window: str = "24h", event_types: str = "all") -> str: """ Analyze logs for security-related events and potential threats. Args: time_window: Time window for analysis (1h, 6h, 24h, 7d) event_types: Types of events to analyze (all, authentication, firewall, intrusion) Returns: JSON response with security event analysis """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." analysis_results = {} # Define security event patterns to search for security_patterns = { "failed_authentication": ["authentication failure", "login failed", "invalid user", "auth fail"], "firewall_blocks": ["blocked", "denied", "drop"], "brute_force": ["multiple failed", "repeated attempts", "too many"], "port_scans": ["port scan", "probe", "reconnaissance"], "suspicious_ips": ["suspicious", "malicious", "blacklist"] } # Get logs from relevant sources log_sources = ["system", "firewall", "authentication", "access"] if event_types != "all": requested_types = [t.strip() for t in event_types.split(",")] log_sources = [t for t in log_sources if t in requested_types] for log_source in log_sources: try: # Get recent logs for analysis endpoint_map = { "system": API_DIAGNOSTICS_LOG_SYSTEM, "firewall": API_DIAGNOSTICS_LOG_FIREWALL, "authentication": API_DIAGNOSTICS_LOG_AUTHENTICATION, "access": API_DIAGNOSTICS_LOG_ACCESS } endpoint = endpoint_map.get(log_source) if not endpoint: continue # Retrieve logs (larger sample for analysis) logs_response = await client.request("GET", endpoint, params={"limit": 5000}, operation=f"analyze_{log_source}_security") # Extract log entries if isinstance(logs_response, dict) and "rows" in logs_response: log_entries = logs_response["rows"] elif isinstance(logs_response, list): log_entries = logs_response else: log_entries = [] # Analyze for security patterns source_analysis = {"total_entries": len(log_entries)} for pattern_name, pattern_keywords in security_patterns.items(): matching_entries = [] for entry in log_entries: # Convert entry to searchable text entry_text = str(entry).lower() if entry else "" # Check if any pattern keywords match if any(keyword.lower() in entry_text for keyword in pattern_keywords): matching_entries.append(entry) source_analysis[pattern_name] = { "count": len(matching_entries), "percentage": round((len(matching_entries) / max(len(log_entries), 1)) * 100, 2), "sample_entries": matching_entries[:5] # First 5 matches as samples } analysis_results[log_source] = source_analysis except Exception as source_error: analysis_results[log_source] = { "error": f"Failed to analyze {log_source}: {str(source_error)}", "total_entries": 0 } # Generate security summary total_events = sum(source.get("total_entries", 0) for source in analysis_results.values()) high_risk_indicators = [] # Check for high-risk patterns for source, data in analysis_results.items(): if isinstance(data, dict): for pattern, details in data.items(): if isinstance(details, dict) and details.get("count", 0) > 10: high_risk_indicators.append(f"{source}: {pattern} ({details['count']} events)") return json.dumps({ "analysis_period": time_window, "event_types_analyzed": log_sources, "total_log_entries": total_events, "high_risk_indicators": high_risk_indicators, "detailed_analysis": analysis_results, "recommendation": "Review high-count security events and consider implementing additional security measures" if high_risk_indicators else "No significant security events detected" }, indent=2) except Exception as e: return await handle_tool_error(ctx, "analyze_security_events", e) @mcp.tool() async def generate_log_report(ctx: RequestContext, report_type: str = "summary", time_period: str = "24h", include_details: bool = False) -> str: """ Generate comprehensive log reports for analysis and compliance. Args: report_type: Type of report (summary, detailed, security, compliance) time_period: Time period for report (1h, 6h, 24h, 7d, 30d) include_details: Whether to include detailed log entries in report Returns: JSON response with generated log report """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Validate parameters valid_report_types = ["summary", "detailed", "security", "compliance"] if report_type not in valid_report_types: return json.dumps({ "error": f"Invalid report type '{report_type}'. Valid types: {valid_report_types}" }, indent=2) report_data = { "report_type": report_type, "time_period": time_period, "generated_at": datetime.utcnow().isoformat(), "sections": {} } # Get data from multiple log sources log_sources = ["system", "firewall", "authentication", "access"] for source in log_sources: try: endpoint_map = { "system": API_DIAGNOSTICS_LOG_SYSTEM, "firewall": API_DIAGNOSTICS_LOG_FIREWALL, "authentication": API_DIAGNOSTICS_LOG_AUTHENTICATION, "access": API_DIAGNOSTICS_LOG_ACCESS } endpoint = endpoint_map.get(source) if not endpoint: continue # Get logs for report limit = 10000 if include_details else 1000 response = await client.request("GET", endpoint, params={"limit": limit}, operation=f"report_{source}_logs") # Extract entries if isinstance(response, dict) and "rows" in response: entries = response["rows"] elif isinstance(response, list): entries = response else: entries = [] # Create section data based on report type section_data = { "entry_count": len(entries), "source": source } if report_type == "summary": section_data["summary"] = f"{len(entries)} entries in {time_period}" elif report_type == "detailed" and include_details: section_data["entries"] = entries[:100] # Limit detailed entries elif report_type == "security": # Focus on security-relevant entries security_keywords = ["fail", "error", "block", "deny", "suspicious", "attack"] security_entries = [] for entry in entries: entry_text = str(entry).lower() if any(keyword in entry_text for keyword in security_keywords): security_entries.append(entry) section_data["security_events"] = len(security_entries) if include_details: section_data["security_entries"] = security_entries[:50] elif report_type == "compliance": # Focus on compliance-relevant information section_data["compliance_summary"] = { "logging_active": len(entries) > 0, "entry_count": len(entries), "time_coverage": time_period } report_data["sections"][source] = section_data except Exception as source_error: report_data["sections"][source] = { "error": f"Failed to generate report for {source}: {str(source_error)}", "entry_count": 0 } # Add report summary total_entries = sum(section.get("entry_count", 0) for section in report_data["sections"].values()) report_data["report_summary"] = { "total_entries": total_entries, "sources_included": len([s for s in report_data["sections"] if not s.get("error")]), "sources_with_errors": len([s for s in report_data["sections"] if s.get("error")]) } return json.dumps(report_data, indent=2) except Exception as e: return await handle_tool_error(ctx, "generate_log_report", e) # --- NAT (Network Address Translation) Management --- @mcp.tool(name="nat_list_outbound_rules", description="List outbound NAT (source NAT) rules") async def nat_list_outbound_rules( ctx: Context, search_phrase: str = "", page: int = 1, rows_per_page: int = 20 ) -> str: """List outbound NAT (source NAT) rules. Args: ctx: MCP context search_phrase: Optional search phrase to filter rules page: Page number for pagination rows_per_page: Number of rows per page Returns: JSON string of outbound NAT rules """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", API_FIREWALL_SOURCE_NAT_SEARCH_RULE, data={ "current": page, "rowCount": rows_per_page, "searchPhrase": search_phrase } ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in nat_list_outbound_rules: {str(e)}", exc_info=True) await ctx.error(f"Error fetching outbound NAT rules: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="nat_add_outbound_rule", description="Add an outbound NAT (source NAT) rule") async def nat_add_outbound_rule( ctx: Context, description: str, interface: str, source: str = "any", destination: str = "any", target: str = "", enabled: bool = True ) -> str: """Add an outbound NAT (source NAT) rule. Args: ctx: MCP context description: Description of the NAT rule interface: Outgoing interface (e.g., "wan", "opt1") source: Source network/host (default: "any") destination: Destination network/host (default: "any") target: NAT target (blank for interface address) enabled: Whether the rule is enabled Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Prepare rule data rule_data = { "rule": { "description": description, "interface": interface, "source": source, "destination": destination, "target": target, "enabled": "1" if enabled else "0" } } # Add the rule add_result = await opnsense_client.request( "POST", API_FIREWALL_SOURCE_NAT_ADD_RULE, data=rule_data ) # Apply changes await ctx.info("Outbound NAT rule added, applying changes...") apply_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_BASE_APPLY ) return json.dumps({ "add_result": add_result, "apply_result": apply_result }, indent=2) except Exception as e: logger.error(f"Error in nat_add_outbound_rule: {str(e)}", exc_info=True) await ctx.error(f"Error adding outbound NAT rule: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="nat_delete_outbound_rule", description="Delete an outbound NAT rule") async def nat_delete_outbound_rule(ctx: Context, uuid: str) -> str: """Delete an outbound NAT rule. Args: ctx: MCP context uuid: UUID of the rule to delete Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Delete the rule delete_result = await opnsense_client.request( "POST", f"{API_FIREWALL_SOURCE_NAT_DEL_RULE}/{uuid}" ) # Apply changes await ctx.info("Outbound NAT rule deleted, applying changes...") apply_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_BASE_APPLY ) return json.dumps({ "delete_result": delete_result, "apply_result": apply_result }, indent=2) except Exception as e: logger.error(f"Error in nat_delete_outbound_rule: {str(e)}", exc_info=True) await ctx.error(f"Error deleting outbound NAT rule: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="nat_toggle_outbound_rule", description="Enable or disable an outbound NAT rule") async def nat_toggle_outbound_rule(ctx: Context, uuid: str, enabled: bool) -> str: """Enable or disable an outbound NAT rule. Args: ctx: MCP context uuid: UUID of the rule to toggle enabled: Whether to enable or disable the rule Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Toggle the rule toggle_result = await opnsense_client.request( "POST", f"{API_FIREWALL_SOURCE_NAT_TOGGLE_RULE}/{uuid}/{1 if enabled else 0}" ) # Apply changes await ctx.info(f"Outbound NAT rule {'enabled' if enabled else 'disabled'}, applying changes...") apply_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_BASE_APPLY ) return json.dumps({ "toggle_result": toggle_result, "apply_result": apply_result }, indent=2) except Exception as e: logger.error(f"Error in nat_toggle_outbound_rule: {str(e)}", exc_info=True) await ctx.error(f"Error toggling outbound NAT rule: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="nat_list_one_to_one_rules", description="List one-to-one NAT rules") async def nat_list_one_to_one_rules( ctx: Context, search_phrase: str = "", page: int = 1, rows_per_page: int = 20 ) -> str: """List one-to-one NAT rules. Args: ctx: MCP context search_phrase: Optional search phrase to filter rules page: Page number for pagination rows_per_page: Number of rows per page Returns: JSON string of one-to-one NAT rules """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", API_FIREWALL_ONE_TO_ONE_SEARCH_RULE, data={ "current": page, "rowCount": rows_per_page, "searchPhrase": search_phrase } ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in nat_list_one_to_one_rules: {str(e)}", exc_info=True) await ctx.error(f"Error fetching one-to-one NAT rules: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="nat_add_one_to_one_rule", description="Add a one-to-one NAT rule") async def nat_add_one_to_one_rule( ctx: Context, description: str, interface: str, external_ip: str, internal_ip: str, enabled: bool = True ) -> str: """Add a one-to-one NAT rule. Args: ctx: MCP context description: Description of the NAT rule interface: Interface (e.g., "wan", "opt1") external_ip: External IP address internal_ip: Internal IP address enabled: Whether the rule is enabled Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Prepare rule data rule_data = { "rule": { "description": description, "interface": interface, "external": external_ip, "internal": internal_ip, "enabled": "1" if enabled else "0" } } # Add the rule add_result = await opnsense_client.request( "POST", API_FIREWALL_ONE_TO_ONE_ADD_RULE, data=rule_data ) # Apply changes await ctx.info("One-to-one NAT rule added, applying changes...") apply_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_BASE_APPLY ) return json.dumps({ "add_result": add_result, "apply_result": apply_result }, indent=2) except Exception as e: logger.error(f"Error in nat_add_one_to_one_rule: {str(e)}", exc_info=True) await ctx.error(f"Error adding one-to-one NAT rule: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="nat_delete_one_to_one_rule", description="Delete a one-to-one NAT rule") async def nat_delete_one_to_one_rule(ctx: Context, uuid: str) -> str: """Delete a one-to-one NAT rule. Args: ctx: MCP context uuid: UUID of the rule to delete Returns: JSON string with the result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Delete the rule delete_result = await opnsense_client.request( "POST", f"{API_FIREWALL_ONE_TO_ONE_DEL_RULE}/{uuid}" ) # Apply changes await ctx.info("One-to-one NAT rule deleted, applying changes...") apply_result = await opnsense_client.request( "POST", API_FIREWALL_FILTER_BASE_APPLY ) return json.dumps({ "delete_result": delete_result, "apply_result": apply_result }, indent=2) except Exception as e: logger.error(f"Error in nat_delete_one_to_one_rule: {str(e)}", exc_info=True) await ctx.error(f"Error deleting one-to-one NAT rule: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="nat_get_port_forward_info", description="Information about port forwarding API availability") async def nat_get_port_forward_info(ctx: Context) -> str: """Get information about port forwarding API availability. Args: ctx: MCP context Returns: Information about port forwarding limitations and workarounds """ info = { "status": "Not Available", "message": "Dedicated port forwarding (destination NAT) API endpoints are not yet available in current OPNsense versions.", "expected_version": "26.1 (January 2026)", "github_issue": "https://github.com/opnsense/core/issues/8401", "current_alternatives": [ { "method": "Web Interface", "description": "Use OPNsense web interface at Firewall → NAT → Port Forward", "pros": ["Full functionality", "User-friendly"], "cons": ["Manual process", "Not scriptable"] }, { "method": "Browser Automation", "description": "Use browser automation tools to interact with web interface", "pros": ["Scriptable", "Uses existing interface"], "cons": ["Complex", "Fragile", "Requires browser"] }, { "method": "Config File Management", "description": "Direct XML configuration file manipulation", "pros": ["Complete control"], "cons": ["Complex", "Risk of corruption", "Requires deep knowledge"] } ], "available_nat_features": { "outbound_nat": "✅ Available via API (source NAT)", "one_to_one_nat": "✅ Available via API", "port_forwarding": "❌ Not available via API (destination NAT)", "nat_reflection": "❌ Not available via API" }, "recommendation": "Use available outbound NAT and one-to-one NAT APIs. For port forwarding, wait for OPNsense 26.1 or use web interface." } return json.dumps(info, indent=2) # --- End NAT Management --- # ========== DNS & DHCP MANAGEMENT ========== # --- DHCP Server Management --- @mcp.tool(name="dhcp_list_servers", description="List all DHCP server configurations") async def dhcp_list_servers(ctx: RequestContext) -> str: """List all DHCP server configurations. Returns: JSON string of DHCP server configurations """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "GET", API_DHCP_SERVER_SEARCH, operation="list_dhcp_servers" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_list_servers", e) @mcp.tool(name="dhcp_get_server", description="Get a specific DHCP server configuration") async def dhcp_get_server( ctx: RequestContext, interface: str ) -> str: """Get a specific DHCP server configuration by interface. Args: ctx: MCP context interface: Interface name (e.g., 'lan', 'opt1') Returns: JSON string of DHCP server configuration """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not interface: return json.dumps({"error": "Interface name is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_DHCP_SERVER_GET}/{interface}", operation=f"get_dhcp_server_{interface}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_get_server", e) @mcp.tool(name="dhcp_set_server", description="Configure DHCP server settings for an interface") async def dhcp_set_server( ctx: RequestContext, interface: str, enabled: bool = True, range_from: str = "", range_to: str = "", gateway: str = "", dns_servers: str = "", domain_name: str = "", lease_time: int = 7200, description: str = "" ) -> str: """Configure DHCP server settings for a specific interface. Args: ctx: MCP context interface: Interface name (e.g., 'lan', 'opt1') enabled: Whether DHCP server is enabled range_from: Start of DHCP range (e.g., '192.168.1.100') range_to: End of DHCP range (e.g., '192.168.1.200') gateway: Gateway IP address dns_servers: DNS servers (comma-separated) domain_name: Domain name for DHCP clients lease_time: Lease time in seconds (default: 7200) description: Description of this DHCP server Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not interface: return json.dumps({"error": "Interface name is required"}, indent=2) try: # Prepare configuration data config_data = { "enabled": "1" if enabled else "0", "description": description } if range_from: config_data["range_from"] = range_from if range_to: config_data["range_to"] = range_to if gateway: config_data["gateway"] = gateway if dns_servers: config_data["dns_servers"] = dns_servers if domain_name: config_data["domain_name"] = domain_name if lease_time: config_data["lease_time"] = str(lease_time) # Set DHCP server configuration response = await opnsense_client.request( "POST", f"{API_DHCP_SERVER_SET}/{interface}", data=config_data, operation=f"set_dhcp_server_{interface}" ) # Apply configuration if response and not response.get("error"): await opnsense_client.request( "POST", API_DHCP_RECONFIGURE, operation="apply_dhcp_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_set_server", e) @mcp.tool(name="dhcp_list_static_mappings", description="List DHCP static mappings (reservations)") async def dhcp_list_static_mappings( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all DHCP static mappings (reservations). Args: ctx: MCP context search_phrase: Optional search phrase to filter mappings Returns: JSON string of DHCP static mappings """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_DHCP_STATIC_MAPPING_SEARCH, data=params, operation="list_dhcp_static_mappings" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_list_static_mappings", e) @mcp.tool(name="dhcp_get_static_mapping", description="Get a specific DHCP static mapping") async def dhcp_get_static_mapping( ctx: RequestContext, uuid: str ) -> str: """Get a specific DHCP static mapping by UUID. Args: ctx: MCP context uuid: UUID of the static mapping Returns: JSON string of static mapping details """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_DHCP_STATIC_MAPPING_GET}/{uuid}", operation=f"get_dhcp_static_mapping_{uuid[:8]}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_get_static_mapping", e) @mcp.tool(name="dhcp_add_static_mapping", description="Add a new DHCP static mapping (reservation)") async def dhcp_add_static_mapping( ctx: RequestContext, interface: str, mac_address: str, ip_address: str, hostname: str = "", description: str = "" ) -> str: """Add a new DHCP static mapping (reservation). Args: ctx: MCP context interface: Interface name (e.g., 'lan', 'opt1') mac_address: MAC address of the device ip_address: IP address to assign hostname: Hostname for the device description: Description of this mapping Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not all([interface, mac_address, ip_address]): return json.dumps({ "error": "Interface, MAC address, and IP address are required" }, indent=2) try: mapping_data = { "interface": interface, "mac": mac_address, "ip": ip_address } if hostname: mapping_data["hostname"] = hostname if description: mapping_data["description"] = description response = await opnsense_client.request( "POST", API_DHCP_STATIC_MAPPING_ADD, data={"static": mapping_data}, operation="add_dhcp_static_mapping" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_DHCP_RECONFIGURE, operation="apply_dhcp_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_add_static_mapping", e) @mcp.tool(name="dhcp_update_static_mapping", description="Update an existing DHCP static mapping") async def dhcp_update_static_mapping( ctx: RequestContext, uuid: str, interface: str = "", mac_address: str = "", ip_address: str = "", hostname: str = "", description: str = "" ) -> str: """Update an existing DHCP static mapping. Args: ctx: MCP context uuid: UUID of the static mapping to update interface: Interface name (e.g., 'lan', 'opt1') mac_address: MAC address of the device ip_address: IP address to assign hostname: Hostname for the device description: Description of this mapping Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: # Get current mapping current = await opnsense_client.request( "GET", f"{API_DHCP_STATIC_MAPPING_GET}/{uuid}", operation=f"get_current_static_mapping_{uuid[:8]}" ) if not current or "static" not in current: return json.dumps({"error": "Static mapping not found"}, indent=2) # Update fields mapping_data = current["static"] if interface: mapping_data["interface"] = interface if mac_address: mapping_data["mac"] = mac_address if ip_address: mapping_data["ip"] = ip_address if hostname: mapping_data["hostname"] = hostname if description: mapping_data["description"] = description response = await opnsense_client.request( "POST", f"{API_DHCP_STATIC_MAPPING_SET}/{uuid}", data={"static": mapping_data}, operation=f"update_dhcp_static_mapping_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_DHCP_RECONFIGURE, operation="apply_dhcp_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_update_static_mapping", e) @mcp.tool(name="dhcp_delete_static_mapping", description="Delete a DHCP static mapping") async def dhcp_delete_static_mapping( ctx: RequestContext, uuid: str ) -> str: """Delete a DHCP static mapping by UUID. Args: ctx: MCP context uuid: UUID of the static mapping to delete Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "POST", f"{API_DHCP_STATIC_MAPPING_DELETE}/{uuid}", operation=f"delete_dhcp_static_mapping_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_DHCP_RECONFIGURE, operation="apply_dhcp_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_delete_static_mapping", e) @mcp.tool(name="dhcp_restart_service", description="Restart the DHCP service") async def dhcp_restart_service(ctx: RequestContext) -> str: """Restart the DHCP service. Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", API_DHCP_SERVICE_RESTART, operation="restart_dhcp_service" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_restart_service", e) # --- DNS Resolver (Unbound) Management --- @mcp.tool(name="dns_resolver_get_settings", description="Get DNS resolver (Unbound) settings") async def dns_resolver_get_settings(ctx: RequestContext) -> str: """Get DNS resolver (Unbound) configuration settings. Returns: JSON string of DNS resolver settings """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "GET", API_UNBOUND_SETTINGS_GET, operation="get_dns_resolver_settings" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_get_settings", e) @mcp.tool(name="dns_resolver_set_settings", description="Configure DNS resolver (Unbound) settings") async def dns_resolver_set_settings( ctx: RequestContext, enabled: bool = True, port: int = 53, dnssec: bool = True, forwarding: bool = False, forward_tls_upstream: bool = False, cache_size: int = 4, cache_min_ttl: int = 0, cache_max_ttl: int = 86400, outgoing_interfaces: str = "", incoming_interfaces: str = "" ) -> str: """Configure DNS resolver (Unbound) settings. Args: ctx: MCP context enabled: Enable DNS resolver port: Port number (default: 53) dnssec: Enable DNSSEC validation forwarding: Enable forwarding mode forward_tls_upstream: Use TLS for upstream queries cache_size: Cache size in MB (default: 4) cache_min_ttl: Minimum TTL in seconds (default: 0) cache_max_ttl: Maximum TTL in seconds (default: 86400) outgoing_interfaces: Outgoing interfaces (comma-separated) incoming_interfaces: Incoming interfaces (comma-separated) Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: settings_data = { "general": { "enabled": "1" if enabled else "0", "port": str(port), "dnssec": "1" if dnssec else "0", "forwarding": "1" if forwarding else "0", "forward_tls_upstream": "1" if forward_tls_upstream else "0", "cache_size": str(cache_size), "cache_min_ttl": str(cache_min_ttl), "cache_max_ttl": str(cache_max_ttl) } } if outgoing_interfaces: settings_data["general"]["outgoing_interfaces"] = outgoing_interfaces if incoming_interfaces: settings_data["general"]["incoming_interfaces"] = incoming_interfaces response = await opnsense_client.request( "POST", API_UNBOUND_SETTINGS_SET, data=settings_data, operation="set_dns_resolver_settings" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_UNBOUND_SERVICE_RECONFIGURE, operation="apply_dns_resolver_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_set_settings", e) @mcp.tool(name="dns_resolver_list_host_overrides", description="List DNS resolver host overrides") async def dns_resolver_list_host_overrides( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all DNS resolver host overrides. Args: ctx: MCP context search_phrase: Optional search phrase to filter overrides Returns: JSON string of host overrides """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_UNBOUND_HOST_OVERRIDES_SEARCH, data=params, operation="list_dns_resolver_host_overrides" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_list_host_overrides", e) @mcp.tool(name="dns_resolver_get_host_override", description="Get a specific DNS resolver host override") async def dns_resolver_get_host_override( ctx: RequestContext, uuid: str ) -> str: """Get a specific DNS resolver host override by UUID. Args: ctx: MCP context uuid: UUID of the host override Returns: JSON string of host override details """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_UNBOUND_HOST_OVERRIDES_GET}/{uuid}", operation=f"get_dns_resolver_host_override_{uuid[:8]}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_get_host_override", e) @mcp.tool(name="dns_resolver_add_host_override", description="Add a new DNS resolver host override") async def dns_resolver_add_host_override( ctx: RequestContext, hostname: str, domain: str, ip_address: str, description: str = "" ) -> str: """Add a new DNS resolver host override. Args: ctx: MCP context hostname: Hostname to override domain: Domain name ip_address: IP address to resolve to description: Optional description Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not all([hostname, domain, ip_address]): return json.dumps({ "error": "Hostname, domain, and IP address are required" }, indent=2) try: override_data = { "host": hostname, "domain": domain, "server": ip_address, "description": description } response = await opnsense_client.request( "POST", API_UNBOUND_HOST_OVERRIDES_ADD, data={"host": override_data}, operation="add_dns_resolver_host_override" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_UNBOUND_SERVICE_RECONFIGURE, operation="apply_dns_resolver_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_add_host_override", e) @mcp.tool(name="dns_resolver_update_host_override", description="Update an existing DNS resolver host override") async def dns_resolver_update_host_override( ctx: RequestContext, uuid: str, hostname: str = "", domain: str = "", ip_address: str = "", description: str = "" ) -> str: """Update an existing DNS resolver host override. Args: ctx: MCP context uuid: UUID of the host override to update hostname: Hostname to override domain: Domain name ip_address: IP address to resolve to description: Optional description Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: # Get current override current = await opnsense_client.request( "GET", f"{API_UNBOUND_HOST_OVERRIDES_GET}/{uuid}", operation=f"get_current_host_override_{uuid[:8]}" ) if not current or "host" not in current: return json.dumps({"error": "Host override not found"}, indent=2) # Update fields override_data = current["host"] if hostname: override_data["host"] = hostname if domain: override_data["domain"] = domain if ip_address: override_data["server"] = ip_address if description: override_data["description"] = description response = await opnsense_client.request( "POST", f"{API_UNBOUND_HOST_OVERRIDES_SET}/{uuid}", data={"host": override_data}, operation=f"update_dns_resolver_host_override_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_UNBOUND_SERVICE_RECONFIGURE, operation="apply_dns_resolver_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_update_host_override", e) @mcp.tool(name="dns_resolver_delete_host_override", description="Delete a DNS resolver host override") async def dns_resolver_delete_host_override( ctx: RequestContext, uuid: str ) -> str: """Delete a DNS resolver host override by UUID. Args: ctx: MCP context uuid: UUID of the host override to delete Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "POST", f"{API_UNBOUND_HOST_OVERRIDES_DELETE}/{uuid}", operation=f"delete_dns_resolver_host_override_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_UNBOUND_SERVICE_RECONFIGURE, operation="apply_dns_resolver_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_delete_host_override", e) @mcp.tool(name="dns_resolver_list_domain_overrides", description="List DNS resolver domain overrides") async def dns_resolver_list_domain_overrides( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all DNS resolver domain overrides. Args: ctx: MCP context search_phrase: Optional search phrase to filter overrides Returns: JSON string of domain overrides """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_UNBOUND_DOMAIN_OVERRIDES_SEARCH, data=params, operation="list_dns_resolver_domain_overrides" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_list_domain_overrides", e) @mcp.tool(name="dns_resolver_add_domain_override", description="Add a new DNS resolver domain override") async def dns_resolver_add_domain_override( ctx: RequestContext, domain: str, server: str, description: str = "" ) -> str: """Add a new DNS resolver domain override. Args: ctx: MCP context domain: Domain to override (e.g., 'example.com') server: DNS server to forward queries to description: Optional description Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not all([domain, server]): return json.dumps({ "error": "Domain and server are required" }, indent=2) try: override_data = { "domain": domain, "server": server, "description": description } response = await opnsense_client.request( "POST", API_UNBOUND_DOMAIN_OVERRIDES_ADD, data={"domain": override_data}, operation="add_dns_resolver_domain_override" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_UNBOUND_SERVICE_RECONFIGURE, operation="apply_dns_resolver_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_add_domain_override", e) @mcp.tool(name="dns_resolver_restart_service", description="Restart the DNS resolver service") async def dns_resolver_restart_service(ctx: RequestContext) -> str: """Restart the DNS resolver (Unbound) service. Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", API_UNBOUND_SERVICE_RESTART, operation="restart_dns_resolver_service" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_resolver_restart_service", e) # --- DHCP Lease Management --- @mcp.tool(name="dhcp_get_leases", description="Get current DHCP leases") async def dhcp_get_leases( ctx: RequestContext, interface: str = "" ) -> str: """Get current DHCP leases from the server. Args: ctx: MCP context interface: Optional interface filter (e.g., 'lan', 'opt1') Returns: JSON string of current DHCP leases """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Use the general DHCP leases endpoint endpoint = API_DHCP_LEASES_SEARCH if not interface else f"{API_DHCP_LEASES_SEARCH}?interface={interface}" response = await opnsense_client.request( "GET", endpoint, operation=f"get_dhcp_leases{'_' + interface if interface else ''}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_get_leases", e) @mcp.tool(name="dhcp_search_leases", description="Search DHCP leases with filters") async def dhcp_search_leases( ctx: RequestContext, search_phrase: str = "", interface: str = "", state: str = "" ) -> str: """Search DHCP leases with various filters. Args: ctx: MCP context search_phrase: Search phrase to filter leases interface: Interface filter (e.g., 'lan', 'opt1') state: Lease state filter (e.g., 'active', 'expired') Returns: JSON string of filtered DHCP leases """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase if interface: params["interface"] = interface if state: params["state"] = state response = await opnsense_client.request( "POST", API_DHCP_LEASES_SEARCH, data=params, operation="search_dhcp_leases" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_search_leases", e) @mcp.tool(name="dhcp_get_lease_statistics", description="Get DHCP lease statistics") async def dhcp_get_lease_statistics(ctx: RequestContext) -> str: """Get statistics about DHCP leases across all interfaces. Returns: JSON string of DHCP lease statistics """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: # Get all leases response = await opnsense_client.request( "GET", API_DHCP_LEASES_SEARCH, operation="get_dhcp_lease_statistics" ) if not response: return json.dumps({"error": "Unable to retrieve lease data"}, indent=2) # Process statistics stats = { "total_leases": 0, "active_leases": 0, "expired_leases": 0, "static_mappings": 0, "interfaces": {} } # If response contains lease data, process it if isinstance(response, dict) and "rows" in response: leases = response["rows"] stats["total_leases"] = len(leases) for lease in leases: # Count by state lease_state = lease.get("state", "unknown").lower() if "active" in lease_state: stats["active_leases"] += 1 elif "expired" in lease_state: stats["expired_leases"] += 1 # Count by interface interface = lease.get("interface", "unknown") if interface not in stats["interfaces"]: stats["interfaces"][interface] = 0 stats["interfaces"][interface] += 1 # Count static mappings if lease.get("type") == "static": stats["static_mappings"] += 1 elif isinstance(response, list): stats["total_leases"] = len(response) # Basic counting for list format for lease in response: interface = lease.get("interface", "unknown") if interface not in stats["interfaces"]: stats["interfaces"][interface] = 0 stats["interfaces"][interface] += 1 return json.dumps(stats, indent=2) except Exception as e: return await handle_tool_error(ctx, "dhcp_get_lease_statistics", e) # --- DNS Forwarder (dnsmasq) Management --- @mcp.tool(name="dns_forwarder_get_settings", description="Get DNS forwarder (dnsmasq) settings") async def dns_forwarder_get_settings(ctx: RequestContext) -> str: """Get DNS forwarder (dnsmasq) configuration settings. Returns: JSON string of DNS forwarder settings """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "GET", API_DNSMASQ_SETTINGS_GET, operation="get_dns_forwarder_settings" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_forwarder_get_settings", e) @mcp.tool(name="dns_forwarder_set_settings", description="Configure DNS forwarder (dnsmasq) settings") async def dns_forwarder_set_settings( ctx: RequestContext, enabled: bool = True, port: int = 53, domain: str = "", no_hosts: bool = False, strict_order: bool = False, no_dhcp_interface: str = "" ) -> str: """Configure DNS forwarder (dnsmasq) settings. Args: ctx: MCP context enabled: Enable DNS forwarder port: Port number (default: 53) domain: Local domain name no_hosts: Don't read /etc/hosts strict_order: Strict order of DNS servers no_dhcp_interface: Interfaces to exclude from DHCP Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: settings_data = { "general": { "enabled": "1" if enabled else "0", "port": str(port), "no_hosts": "1" if no_hosts else "0", "strict_order": "1" if strict_order else "0" } } if domain: settings_data["general"]["domain"] = domain if no_dhcp_interface: settings_data["general"]["no_dhcp_interface"] = no_dhcp_interface response = await opnsense_client.request( "POST", API_DNSMASQ_SETTINGS_SET, data=settings_data, operation="set_dns_forwarder_settings" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_DNSMASQ_SERVICE_RECONFIGURE, operation="apply_dns_forwarder_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_forwarder_set_settings", e) @mcp.tool(name="dns_forwarder_list_hosts", description="List DNS forwarder host overrides") async def dns_forwarder_list_hosts( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all DNS forwarder host overrides. Args: ctx: MCP context search_phrase: Optional search phrase to filter hosts Returns: JSON string of host overrides """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_DNSMASQ_HOSTS_SEARCH, data=params, operation="list_dns_forwarder_hosts" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_forwarder_list_hosts", e) @mcp.tool(name="dns_forwarder_add_host", description="Add a new DNS forwarder host override") async def dns_forwarder_add_host( ctx: RequestContext, hostname: str, domain: str, ip_address: str, description: str = "" ) -> str: """Add a new DNS forwarder host override. Args: ctx: MCP context hostname: Hostname to override domain: Domain name ip_address: IP address to resolve to description: Optional description Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not all([hostname, domain, ip_address]): return json.dumps({ "error": "Hostname, domain, and IP address are required" }, indent=2) try: host_data = { "host": hostname, "domain": domain, "ip": ip_address, "description": description } response = await opnsense_client.request( "POST", API_DNSMASQ_HOSTS_ADD, data={"host": host_data}, operation="add_dns_forwarder_host" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_DNSMASQ_SERVICE_RECONFIGURE, operation="apply_dns_forwarder_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_forwarder_add_host", e) @mcp.tool(name="dns_forwarder_restart_service", description="Restart the DNS forwarder service") async def dns_forwarder_restart_service(ctx: RequestContext) -> str: """Restart the DNS forwarder (dnsmasq) service. Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "POST", API_DNSMASQ_SERVICE_RESTART, operation="restart_dns_forwarder_service" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "dns_forwarder_restart_service", e) # ========== INTERFACE & VLAN MANAGEMENT ========== # --- Basic Interface Management --- @mcp.tool(name="get_interface_details", description="Get detailed information for a specific interface") async def get_interface_details( ctx: RequestContext, interface: str ) -> str: """Get detailed information for a specific interface. Args: ctx: MCP context interface: Interface identifier (e.g., 'lan', 'wan', 'opt1') Returns: JSON string of interface details """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not interface: return json.dumps({"error": "Interface identifier is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_INTERFACES_OVERVIEW_GET_INTERFACE}/{interface}", operation=f"get_interface_details_{interface}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_interface_details", e) @mcp.tool(name="reload_interface", description="Reload configuration for a specific interface") async def reload_interface( ctx: RequestContext, interface: str ) -> str: """Reload configuration for a specific interface. Args: ctx: MCP context interface: Interface identifier to reload Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not interface: return json.dumps({"error": "Interface identifier is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_INTERFACES_OVERVIEW_RELOAD_INTERFACE}/{interface}", operation=f"reload_interface_{interface}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "reload_interface", e) @mcp.tool(name="export_interface_config", description="Export interface configuration") async def export_interface_config(ctx: RequestContext) -> str: """Export current interface configuration. Returns: JSON string of interface configuration export """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "GET", API_INTERFACES_OVERVIEW_EXPORT, operation="export_interface_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "export_interface_config", e) # --- VLAN Management --- @mcp.tool(name="list_vlan_interfaces", description="List all VLAN interfaces") async def list_vlan_interfaces( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all VLAN interfaces with optional search filtering. Args: ctx: MCP context search_phrase: Optional search phrase to filter VLANs Returns: JSON string of VLAN interfaces """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_INTERFACES_VLAN_SEARCH, data=params, operation="list_vlan_interfaces" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_vlan_interfaces", e) @mcp.tool(name="get_vlan_interface", description="Get VLAN interface configuration") async def get_vlan_interface( ctx: RequestContext, uuid: str ) -> str: """Get specific VLAN interface configuration by UUID. Args: ctx: MCP context uuid: UUID of the VLAN interface Returns: JSON string of VLAN interface configuration """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_INTERFACES_VLAN_GET}/{uuid}", operation=f"get_vlan_interface_{uuid[:8]}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_vlan_interface", e) @mcp.tool(name="create_vlan_interface", description="Create a new VLAN interface") async def create_vlan_interface( ctx: RequestContext, parent_interface: str, vlan_tag: int, description: str = "" ) -> str: """Create a new VLAN interface. Args: ctx: MCP context parent_interface: Parent interface for the VLAN (e.g., 'igb0', 'em0') vlan_tag: VLAN tag (1-4094) description: Optional description for the VLAN Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not all([parent_interface, vlan_tag]): return json.dumps({ "error": "Parent interface and VLAN tag are required" }, indent=2) # Validate VLAN tag range if not (1 <= vlan_tag <= 4094): return json.dumps({ "error": "VLAN tag must be between 1 and 4094" }, indent=2) try: vlan_data = { "if": parent_interface, "tag": str(vlan_tag), "descr": description } response = await opnsense_client.request( "POST", API_INTERFACES_VLAN_ADD, data={"vlan": vlan_data}, operation="create_vlan_interface" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_VLAN_RECONFIGURE, operation="apply_vlan_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_vlan_interface", e) @mcp.tool(name="update_vlan_interface", description="Update VLAN interface configuration") async def update_vlan_interface( ctx: RequestContext, uuid: str, parent_interface: str = "", vlan_tag: int = 0, description: str = "" ) -> str: """Update existing VLAN interface configuration. Args: ctx: MCP context uuid: UUID of the VLAN interface to update parent_interface: Parent interface for the VLAN vlan_tag: VLAN tag (1-4094) description: Description for the VLAN Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) # Validate VLAN tag if provided if vlan_tag > 0 and not (1 <= vlan_tag <= 4094): return json.dumps({ "error": "VLAN tag must be between 1 and 4094" }, indent=2) try: # Get current VLAN configuration current = await opnsense_client.request( "GET", f"{API_INTERFACES_VLAN_GET}/{uuid}", operation=f"get_current_vlan_{uuid[:8]}" ) if not current or "vlan" not in current: return json.dumps({"error": "VLAN interface not found"}, indent=2) # Update fields vlan_data = current["vlan"] if parent_interface: vlan_data["if"] = parent_interface if vlan_tag > 0: vlan_data["tag"] = str(vlan_tag) if description is not None: # Allow empty string to clear description vlan_data["descr"] = description response = await opnsense_client.request( "POST", f"{API_INTERFACES_VLAN_SET}/{uuid}", data={"vlan": vlan_data}, operation=f"update_vlan_interface_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_VLAN_RECONFIGURE, operation="apply_vlan_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_vlan_interface", e) @mcp.tool(name="delete_vlan_interface", description="Delete a VLAN interface") async def delete_vlan_interface( ctx: RequestContext, uuid: str ) -> str: """Delete a VLAN interface by UUID. Args: ctx: MCP context uuid: UUID of the VLAN interface to delete Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "POST", f"{API_INTERFACES_VLAN_DEL}/{uuid}", operation=f"delete_vlan_interface_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_VLAN_RECONFIGURE, operation="apply_vlan_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_vlan_interface", e) # --- Bridge Management --- @mcp.tool(name="list_bridge_interfaces", description="List all bridge interfaces") async def list_bridge_interfaces( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all bridge interfaces with optional search filtering. Args: ctx: MCP context search_phrase: Optional search phrase to filter bridges Returns: JSON string of bridge interfaces """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_INTERFACES_BRIDGE_SEARCH, data=params, operation="list_bridge_interfaces" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_bridge_interfaces", e) @mcp.tool(name="get_bridge_interface", description="Get bridge interface configuration") async def get_bridge_interface( ctx: RequestContext, uuid: str ) -> str: """Get specific bridge interface configuration by UUID. Args: ctx: MCP context uuid: UUID of the bridge interface Returns: JSON string of bridge interface configuration """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_INTERFACES_BRIDGE_GET}/{uuid}", operation=f"get_bridge_interface_{uuid[:8]}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_bridge_interface", e) @mcp.tool(name="create_bridge_interface", description="Create a new bridge interface") async def create_bridge_interface( ctx: RequestContext, description: str, member_interfaces: str = "", stp_enabled: bool = False ) -> str: """Create a new bridge interface. Args: ctx: MCP context description: Description for the bridge member_interfaces: Comma-separated list of member interfaces stp_enabled: Enable Spanning Tree Protocol Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not description: return json.dumps({ "error": "Description is required" }, indent=2) try: bridge_data = { "descr": description, "stp": "1" if stp_enabled else "0" } if member_interfaces: bridge_data["members"] = member_interfaces response = await opnsense_client.request( "POST", API_INTERFACES_BRIDGE_ADD, data={"bridge": bridge_data}, operation="create_bridge_interface" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_BRIDGE_RECONFIGURE, operation="apply_bridge_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_bridge_interface", e) @mcp.tool(name="update_bridge_interface", description="Update bridge interface configuration") async def update_bridge_interface( ctx: RequestContext, uuid: str, description: str = "", member_interfaces: str = "", stp_enabled: bool = None ) -> str: """Update existing bridge interface configuration. Args: ctx: MCP context uuid: UUID of the bridge interface to update description: Description for the bridge member_interfaces: Comma-separated list of member interfaces stp_enabled: Enable/disable Spanning Tree Protocol Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: # Get current bridge configuration current = await opnsense_client.request( "GET", f"{API_INTERFACES_BRIDGE_GET}/{uuid}", operation=f"get_current_bridge_{uuid[:8]}" ) if not current or "bridge" not in current: return json.dumps({"error": "Bridge interface not found"}, indent=2) # Update fields bridge_data = current["bridge"] if description: bridge_data["descr"] = description if member_interfaces is not None: # Allow empty string to clear members bridge_data["members"] = member_interfaces if stp_enabled is not None: bridge_data["stp"] = "1" if stp_enabled else "0" response = await opnsense_client.request( "POST", f"{API_INTERFACES_BRIDGE_SET}/{uuid}", data={"bridge": bridge_data}, operation=f"update_bridge_interface_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_BRIDGE_RECONFIGURE, operation="apply_bridge_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_bridge_interface", e) @mcp.tool(name="delete_bridge_interface", description="Delete a bridge interface") async def delete_bridge_interface( ctx: RequestContext, uuid: str ) -> str: """Delete a bridge interface by UUID. Args: ctx: MCP context uuid: UUID of the bridge interface to delete Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "POST", f"{API_INTERFACES_BRIDGE_DEL}/{uuid}", operation=f"delete_bridge_interface_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_BRIDGE_RECONFIGURE, operation="apply_bridge_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_bridge_interface", e) # --- LAGG (Link Aggregation) Management --- @mcp.tool(name="list_lagg_interfaces", description="List all LAGG (Link Aggregation) interfaces") async def list_lagg_interfaces( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all LAGG interfaces with optional search filtering. Args: ctx: MCP context search_phrase: Optional search phrase to filter LAGG interfaces Returns: JSON string of LAGG interfaces """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_INTERFACES_LAGG_SEARCH, data=params, operation="list_lagg_interfaces" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_lagg_interfaces", e) @mcp.tool(name="get_lagg_interface", description="Get LAGG interface configuration") async def get_lagg_interface( ctx: RequestContext, uuid: str ) -> str: """Get specific LAGG interface configuration by UUID. Args: ctx: MCP context uuid: UUID of the LAGG interface Returns: JSON string of LAGG interface configuration """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_INTERFACES_LAGG_GET}/{uuid}", operation=f"get_lagg_interface_{uuid[:8]}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_lagg_interface", e) @mcp.tool(name="create_lagg_interface", description="Create a new LAGG (Link Aggregation) interface") async def create_lagg_interface( ctx: RequestContext, description: str, parent_interfaces: str, protocol: str = "lacp" ) -> str: """Create a new LAGG interface. Args: ctx: MCP context description: Description for the LAGG interface parent_interfaces: Comma-separated list of parent interfaces protocol: LAGG protocol (lacp, failover, loadbalance, roundrobin) Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not all([description, parent_interfaces]): return json.dumps({ "error": "Description and parent interfaces are required" }, indent=2) # Validate protocol valid_protocols = ["lacp", "failover", "loadbalance", "roundrobin"] if protocol not in valid_protocols: return json.dumps({ "error": f"Protocol must be one of: {', '.join(valid_protocols)}" }, indent=2) try: lagg_data = { "descr": description, "members": parent_interfaces, "proto": protocol } response = await opnsense_client.request( "POST", API_INTERFACES_LAGG_ADD, data={"lagg": lagg_data}, operation="create_lagg_interface" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_LAGG_RECONFIGURE, operation="apply_lagg_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_lagg_interface", e) @mcp.tool(name="update_lagg_interface", description="Update LAGG interface configuration") async def update_lagg_interface( ctx: RequestContext, uuid: str, description: str = "", parent_interfaces: str = "", protocol: str = "" ) -> str: """Update existing LAGG interface configuration. Args: ctx: MCP context uuid: UUID of the LAGG interface to update description: Description for the LAGG interface parent_interfaces: Comma-separated list of parent interfaces protocol: LAGG protocol (lacp, failover, loadbalance, roundrobin) Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) # Validate protocol if provided if protocol: valid_protocols = ["lacp", "failover", "loadbalance", "roundrobin"] if protocol not in valid_protocols: return json.dumps({ "error": f"Protocol must be one of: {', '.join(valid_protocols)}" }, indent=2) try: # Get current LAGG configuration current = await opnsense_client.request( "GET", f"{API_INTERFACES_LAGG_GET}/{uuid}", operation=f"get_current_lagg_{uuid[:8]}" ) if not current or "lagg" not in current: return json.dumps({"error": "LAGG interface not found"}, indent=2) # Update fields lagg_data = current["lagg"] if description: lagg_data["descr"] = description if parent_interfaces is not None: # Allow empty string to clear members lagg_data["members"] = parent_interfaces if protocol: lagg_data["proto"] = protocol response = await opnsense_client.request( "POST", f"{API_INTERFACES_LAGG_SET}/{uuid}", data={"lagg": lagg_data}, operation=f"update_lagg_interface_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_LAGG_RECONFIGURE, operation="apply_lagg_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_lagg_interface", e) @mcp.tool(name="delete_lagg_interface", description="Delete a LAGG interface") async def delete_lagg_interface( ctx: RequestContext, uuid: str ) -> str: """Delete a LAGG interface by UUID. Args: ctx: MCP context uuid: UUID of the LAGG interface to delete Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "POST", f"{API_INTERFACES_LAGG_DEL}/{uuid}", operation=f"delete_lagg_interface_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_LAGG_RECONFIGURE, operation="apply_lagg_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_lagg_interface", e) # --- Virtual IP Management --- @mcp.tool(name="list_virtual_ips", description="List all virtual IP addresses") async def list_virtual_ips( ctx: RequestContext, search_phrase: str = "" ) -> str: """List all virtual IP addresses with optional search filtering. Args: ctx: MCP context search_phrase: Optional search phrase to filter virtual IPs Returns: JSON string of virtual IP addresses """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: params = {} if search_phrase: params["searchPhrase"] = search_phrase response = await opnsense_client.request( "POST", API_INTERFACES_VIP_SEARCH, data=params, operation="list_virtual_ips" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_virtual_ips", e) @mcp.tool(name="get_virtual_ip", description="Get virtual IP configuration") async def get_virtual_ip( ctx: RequestContext, uuid: str ) -> str: """Get specific virtual IP configuration by UUID. Args: ctx: MCP context uuid: UUID of the virtual IP Returns: JSON string of virtual IP configuration """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "GET", f"{API_INTERFACES_VIP_GET}/{uuid}", operation=f"get_virtual_ip_{uuid[:8]}" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_virtual_ip", e) @mcp.tool(name="create_virtual_ip", description="Create a new virtual IP address") async def create_virtual_ip( ctx: RequestContext, interface: str, subnet: str, vip_type: str = "single", description: str = "", vhid: int = 0 ) -> str: """Create a new virtual IP address. Args: ctx: MCP context interface: Interface to assign the virtual IP subnet: IP subnet (e.g., '192.168.1.100/24') vip_type: Virtual IP type (single, carp, proxyarp, other) description: Optional description vhid: VHID for CARP (if vip_type is 'carp') Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not all([interface, subnet]): return json.dumps({ "error": "Interface and subnet are required" }, indent=2) # Validate VIP type valid_types = ["single", "carp", "proxyarp", "other"] if vip_type not in valid_types: return json.dumps({ "error": f"VIP type must be one of: {', '.join(valid_types)}" }, indent=2) try: vip_data = { "interface": interface, "subnet": subnet, "type": vip_type, "descr": description } # Add VHID for CARP type if vip_type == "carp": if vhid <= 0: # Get an unused VHID automatically try: vhid_response = await opnsense_client.request( "GET", API_INTERFACES_VIP_GET_UNUSED_VHID, operation="get_unused_vhid" ) if vhid_response and "vhid" in vhid_response: vhid = vhid_response["vhid"] else: vhid = 1 # Default fallback except: vhid = 1 # Default fallback vip_data["vhid"] = str(vhid) response = await opnsense_client.request( "POST", API_INTERFACES_VIP_ADD, data={"vip": vip_data}, operation="create_virtual_ip" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_VIP_RECONFIGURE, operation="apply_vip_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_virtual_ip", e) @mcp.tool(name="update_virtual_ip", description="Update virtual IP configuration") async def update_virtual_ip( ctx: RequestContext, uuid: str, interface: str = "", subnet: str = "", vip_type: str = "", description: str = "", vhid: int = 0 ) -> str: """Update existing virtual IP configuration. Args: ctx: MCP context uuid: UUID of the virtual IP to update interface: Interface to assign the virtual IP subnet: IP subnet (e.g., '192.168.1.100/24') vip_type: Virtual IP type (single, carp, proxyarp, other) description: Description vhid: VHID for CARP (if vip_type is 'carp') Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) # Validate VIP type if provided if vip_type: valid_types = ["single", "carp", "proxyarp", "other"] if vip_type not in valid_types: return json.dumps({ "error": f"VIP type must be one of: {', '.join(valid_types)}" }, indent=2) try: # Get current virtual IP configuration current = await opnsense_client.request( "GET", f"{API_INTERFACES_VIP_GET}/{uuid}", operation=f"get_current_vip_{uuid[:8]}" ) if not current or "vip" not in current: return json.dumps({"error": "Virtual IP not found"}, indent=2) # Update fields vip_data = current["vip"] if interface: vip_data["interface"] = interface if subnet: vip_data["subnet"] = subnet if vip_type: vip_data["type"] = vip_type if description is not None: # Allow empty string to clear description vip_data["descr"] = description if vhid > 0: vip_data["vhid"] = str(vhid) response = await opnsense_client.request( "POST", f"{API_INTERFACES_VIP_SET}/{uuid}", data={"vip": vip_data}, operation=f"update_virtual_ip_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_VIP_RECONFIGURE, operation="apply_vip_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_virtual_ip", e) @mcp.tool(name="delete_virtual_ip", description="Delete a virtual IP address") async def delete_virtual_ip( ctx: RequestContext, uuid: str ) -> str: """Delete a virtual IP address by UUID. Args: ctx: MCP context uuid: UUID of the virtual IP to delete Returns: JSON string of operation result """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." if not uuid or not is_valid_uuid(uuid): return json.dumps({"error": "Valid UUID is required"}, indent=2) try: response = await opnsense_client.request( "POST", f"{API_INTERFACES_VIP_DEL}/{uuid}", operation=f"delete_virtual_ip_{uuid[:8]}" ) # Apply configuration if successful if response and not response.get("error"): await opnsense_client.request( "POST", API_INTERFACES_VIP_RECONFIGURE, operation="apply_vip_config" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_virtual_ip", e) @mcp.tool(name="get_unused_vhid", description="Get an unused VHID for CARP configuration") async def get_unused_vhid(ctx: RequestContext) -> str: """Get an unused VHID (Virtual Host ID) for CARP configuration. Returns: JSON string with unused VHID """ if not opnsense_client: return "OPNsense client not initialized. Please configure the server first." try: response = await opnsense_client.request( "GET", API_INTERFACES_VIP_GET_UNUSED_VHID, operation="get_unused_vhid" ) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_unused_vhid", e) # ========== CERTIFICATE MANAGEMENT ========== @mcp.tool(name="list_certificate_authorities", description="List all Certificate Authorities (CAs)") async def list_certificate_authorities(ctx: RequestContext) -> str: """List all Certificate Authorities configured in OPNsense. Args: ctx: Request context Returns: JSON string with list of Certificate Authorities """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") response = await opnsense_client.request("POST", API_CERTIFICATES_CA_SEARCH) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_certificate_authorities", e) @mcp.tool(name="get_certificate_authority", description="Get detailed information about a specific Certificate Authority") async def get_certificate_authority(ctx: RequestContext, ca_uuid: str) -> str: """Get detailed information about a specific Certificate Authority. Args: ctx: Request context ca_uuid: UUID of the Certificate Authority Returns: JSON string with Certificate Authority details """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not ca_uuid: raise ValueError("Certificate Authority UUID is required") response = await opnsense_client.request("GET", f"{API_CERTIFICATES_CA_GET}/{ca_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_certificate_authority", e) @mcp.tool(name="create_certificate_authority", description="Create a new Certificate Authority") async def create_certificate_authority(ctx: RequestContext, descr: str, country: str, state: str, city: str, organization: str, organizational_unit: str = "", common_name: str = "", digest_alg: str = "sha256", key_length: int = 2048, lifetime: int = 3650, dn_email: str = "") -> str: """Create a new Certificate Authority. Args: ctx: Request context descr: Description/name for the Certificate Authority country: Two-letter country code (e.g., 'US') state: State or Province city: City or Locality organization: Organization name organizational_unit: Organizational Unit (optional) common_name: Common Name (optional, defaults to descr if not provided) digest_alg: Digest algorithm (sha256, sha384, sha512) key_length: RSA key length (2048, 4096) lifetime: Certificate lifetime in days (default: 3650 = 10 years) dn_email: Email address (optional) Returns: JSON string with creation result and new CA UUID """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") # Validation if not descr: raise ValueError("Description is required") if not country or len(country) != 2: raise ValueError("Country must be a 2-letter code (e.g., 'US')") if not state: raise ValueError("State/Province is required") if not city: raise ValueError("City/Locality is required") if not organization: raise ValueError("Organization is required") if digest_alg not in ["sha256", "sha384", "sha512"]: raise ValueError("Digest algorithm must be one of: sha256, sha384, sha512") if key_length not in [2048, 4096]: raise ValueError("Key length must be 2048 or 4096") if lifetime <= 0: raise ValueError("Lifetime must be positive") # Use description as common name if not provided if not common_name: common_name = descr ca_data = { "ca": { "descr": descr, "caref": "", # Will be generated "refid": "", # Will be generated "crt": "", # Will be generated "prv": "", # Will be generated "serial": "", # Will be generated "dn": { "countryName": country, "stateOrProvinceName": state, "localityName": city, "organizationName": organization, "organizationalUnitName": organizational_unit, "commonName": common_name, "emailAddress": dn_email }, "digest_alg": digest_alg, "keylen": str(key_length), "lifetime": str(lifetime) } } response = await opnsense_client.request("POST", API_CERTIFICATES_CA_ADD, json=ca_data) # Apply configuration if successful if response.get("result") == "saved": await opnsense_client.request("POST", API_CERTIFICATES_SERVICE_RECONFIGURE) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_certificate_authority", e) @mcp.tool(name="delete_certificate_authority", description="Delete a Certificate Authority") async def delete_certificate_authority(ctx: RequestContext, ca_uuid: str) -> str: """Delete a Certificate Authority. Args: ctx: Request context ca_uuid: UUID of the Certificate Authority to delete Returns: JSON string with deletion result """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not ca_uuid: raise ValueError("Certificate Authority UUID is required") response = await opnsense_client.request("POST", f"{API_CERTIFICATES_CA_DEL}/{ca_uuid}") # Apply configuration if successful if response.get("result") == "deleted": await opnsense_client.request("POST", API_CERTIFICATES_SERVICE_RECONFIGURE) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_certificate_authority", e) @mcp.tool(name="export_certificate_authority", description="Export a Certificate Authority certificate") async def export_certificate_authority(ctx: RequestContext, ca_uuid: str) -> str: """Export a Certificate Authority certificate in PEM format. Args: ctx: Request context ca_uuid: UUID of the Certificate Authority to export Returns: JSON string with certificate data """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not ca_uuid: raise ValueError("Certificate Authority UUID is required") response = await opnsense_client.request("GET", f"{API_CERTIFICATES_CA_EXPORT}/{ca_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "export_certificate_authority", e) @mcp.tool(name="list_certificates", description="List all certificates") async def list_certificates(ctx: RequestContext) -> str: """List all certificates configured in OPNsense. Args: ctx: Request context Returns: JSON string with list of certificates """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") response = await opnsense_client.request("POST", API_CERTIFICATES_CERT_SEARCH) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_certificates", e) @mcp.tool(name="get_certificate", description="Get detailed information about a specific certificate") async def get_certificate(ctx: RequestContext, cert_uuid: str) -> str: """Get detailed information about a specific certificate. Args: ctx: Request context cert_uuid: UUID of the certificate Returns: JSON string with certificate details """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("Certificate UUID is required") response = await opnsense_client.request("GET", f"{API_CERTIFICATES_CERT_GET}/{cert_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_certificate", e) @mcp.tool(name="import_certificate", description="Import an existing certificate") async def import_certificate(ctx: RequestContext, descr: str, crt: str, prv: str = "") -> str: """Import an existing certificate and private key. Args: ctx: Request context descr: Description/name for the certificate crt: Certificate in PEM format prv: Private key in PEM format (optional for certificate-only import) Returns: JSON string with import result and certificate UUID """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not descr: raise ValueError("Description is required") if not crt: raise ValueError("Certificate data is required") # Basic PEM format validation if not crt.strip().startswith("-----BEGIN CERTIFICATE-----"): raise ValueError("Certificate must be in PEM format starting with -----BEGIN CERTIFICATE-----") if prv and not prv.strip().startswith("-----BEGIN"): raise ValueError("Private key must be in PEM format") cert_data = { "cert": { "descr": descr, "crt": crt.strip(), "prv": prv.strip() if prv else "" } } response = await opnsense_client.request("POST", API_CERTIFICATES_CERT_ADD, json=cert_data) # Apply configuration if successful if response.get("result") == "saved": await opnsense_client.request("POST", API_CERTIFICATES_SERVICE_RECONFIGURE) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "import_certificate", e) @mcp.tool(name="delete_certificate", description="Delete a certificate") async def delete_certificate(ctx: RequestContext, cert_uuid: str) -> str: """Delete a certificate. Args: ctx: Request context cert_uuid: UUID of the certificate to delete Returns: JSON string with deletion result """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("Certificate UUID is required") response = await opnsense_client.request("POST", f"{API_CERTIFICATES_CERT_DEL}/{cert_uuid}") # Apply configuration if successful if response.get("result") == "deleted": await opnsense_client.request("POST", API_CERTIFICATES_SERVICE_RECONFIGURE) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_certificate", e) @mcp.tool(name="export_certificate", description="Export a certificate") async def export_certificate(ctx: RequestContext, cert_uuid: str) -> str: """Export a certificate in PEM format. Args: ctx: Request context cert_uuid: UUID of the certificate to export Returns: JSON string with certificate data """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("Certificate UUID is required") response = await opnsense_client.request("GET", f"{API_CERTIFICATES_CERT_EXPORT}/{cert_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "export_certificate", e) @mcp.tool(name="list_certificate_signing_requests", description="List all Certificate Signing Requests (CSRs)") async def list_certificate_signing_requests(ctx: RequestContext) -> str: """List all Certificate Signing Requests configured in OPNsense. Args: ctx: Request context Returns: JSON string with list of Certificate Signing Requests """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") response = await opnsense_client.request("POST", API_CERTIFICATES_CSR_SEARCH) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_certificate_signing_requests", e) @mcp.tool(name="get_certificate_signing_request", description="Get detailed information about a specific CSR") async def get_certificate_signing_request(ctx: RequestContext, csr_uuid: str) -> str: """Get detailed information about a specific Certificate Signing Request. Args: ctx: Request context csr_uuid: UUID of the Certificate Signing Request Returns: JSON string with CSR details """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not csr_uuid: raise ValueError("CSR UUID is required") response = await opnsense_client.request("GET", f"{API_CERTIFICATES_CSR_GET}/{csr_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_certificate_signing_request", e) @mcp.tool(name="create_certificate_signing_request", description="Create a new Certificate Signing Request") async def create_certificate_signing_request(ctx: RequestContext, descr: str, country: str, state: str, city: str, organization: str, common_name: str, organizational_unit: str = "", digest_alg: str = "sha256", key_length: int = 2048, dn_email: str = "") -> str: """Create a new Certificate Signing Request. Args: ctx: Request context descr: Description/name for the CSR country: Two-letter country code (e.g., 'US') state: State or Province city: City or Locality organization: Organization name common_name: Common Name (FQDN for server certificates) organizational_unit: Organizational Unit (optional) digest_alg: Digest algorithm (sha256, sha384, sha512) key_length: RSA key length (2048, 4096) dn_email: Email address (optional) Returns: JSON string with creation result and new CSR UUID """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") # Validation if not descr: raise ValueError("Description is required") if not country or len(country) != 2: raise ValueError("Country must be a 2-letter code (e.g., 'US')") if not state: raise ValueError("State/Province is required") if not city: raise ValueError("City/Locality is required") if not organization: raise ValueError("Organization is required") if not common_name: raise ValueError("Common Name is required") if digest_alg not in ["sha256", "sha384", "sha512"]: raise ValueError("Digest algorithm must be one of: sha256, sha384, sha512") if key_length not in [2048, 4096]: raise ValueError("Key length must be 2048 or 4096") csr_data = { "csr": { "descr": descr, "dn": { "countryName": country, "stateOrProvinceName": state, "localityName": city, "organizationName": organization, "organizationalUnitName": organizational_unit, "commonName": common_name, "emailAddress": dn_email }, "digest_alg": digest_alg, "keylen": str(key_length) } } response = await opnsense_client.request("POST", API_CERTIFICATES_CSR_ADD, json=csr_data) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_certificate_signing_request", e) @mcp.tool(name="delete_certificate_signing_request", description="Delete a Certificate Signing Request") async def delete_certificate_signing_request(ctx: RequestContext, csr_uuid: str) -> str: """Delete a Certificate Signing Request. Args: ctx: Request context csr_uuid: UUID of the CSR to delete Returns: JSON string with deletion result """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not csr_uuid: raise ValueError("CSR UUID is required") response = await opnsense_client.request("POST", f"{API_CERTIFICATES_CSR_DEL}/{csr_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_certificate_signing_request", e) @mcp.tool(name="list_acme_accounts", description="List all ACME (Let's Encrypt) accounts") async def list_acme_accounts(ctx: RequestContext) -> str: """List all ACME (Let's Encrypt) accounts configured in OPNsense. Args: ctx: Request context Returns: JSON string with list of ACME accounts """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") response = await opnsense_client.request("POST", API_CERTIFICATES_ACME_ACCOUNTS_SEARCH) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_acme_accounts", e) @mcp.tool(name="get_acme_account", description="Get detailed information about a specific ACME account") async def get_acme_account(ctx: RequestContext, account_uuid: str) -> str: """Get detailed information about a specific ACME account. Args: ctx: Request context account_uuid: UUID of the ACME account Returns: JSON string with ACME account details """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not account_uuid: raise ValueError("ACME account UUID is required") response = await opnsense_client.request("GET", f"{API_CERTIFICATES_ACME_ACCOUNTS_GET}/{account_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_acme_account", e) @mcp.tool(name="create_acme_account", description="Create a new ACME (Let's Encrypt) account") async def create_acme_account(ctx: RequestContext, name: str, email: str, ca_url: str = "https://acme-v02.api.letsencrypt.org/directory", key_length: int = 2048) -> str: """Create a new ACME (Let's Encrypt) account. Args: ctx: Request context name: Descriptive name for the ACME account email: Email address for the ACME account ca_url: ACME CA URL (default: Let's Encrypt production) key_length: RSA key length (2048, 4096) Returns: JSON string with creation result and new account UUID """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") # Validation if not name: raise ValueError("Account name is required") if not email: raise ValueError("Email address is required") if not ca_url: raise ValueError("CA URL is required") if key_length not in [2048, 4096]: raise ValueError("Key length must be 2048 or 4096") # Basic email validation if "@" not in email or "." not in email.split("@")[1]: raise ValueError("Invalid email address format") account_data = { "account": { "name": name, "email": email, "ca_url": ca_url, "key_length": str(key_length) } } response = await opnsense_client.request("POST", API_CERTIFICATES_ACME_ACCOUNTS_ADD, json=account_data) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_acme_account", e) @mcp.tool(name="delete_acme_account", description="Delete an ACME (Let's Encrypt) account") async def delete_acme_account(ctx: RequestContext, account_uuid: str) -> str: """Delete an ACME (Let's Encrypt) account. Args: ctx: Request context account_uuid: UUID of the ACME account to delete Returns: JSON string with deletion result """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not account_uuid: raise ValueError("ACME account UUID is required") response = await opnsense_client.request("POST", f"{API_CERTIFICATES_ACME_ACCOUNTS_DEL}/{account_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_acme_account", e) @mcp.tool(name="list_acme_certificates", description="List all ACME (Let's Encrypt) certificates") async def list_acme_certificates(ctx: RequestContext) -> str: """List all ACME (Let's Encrypt) certificates configured in OPNsense. Args: ctx: Request context Returns: JSON string with list of ACME certificates """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") response = await opnsense_client.request("POST", API_CERTIFICATES_ACME_CERTS_SEARCH) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_acme_certificates", e) @mcp.tool(name="get_acme_certificate", description="Get detailed information about a specific ACME certificate") async def get_acme_certificate(ctx: RequestContext, cert_uuid: str) -> str: """Get detailed information about a specific ACME certificate. Args: ctx: Request context cert_uuid: UUID of the ACME certificate Returns: JSON string with ACME certificate details """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("ACME certificate UUID is required") response = await opnsense_client.request("GET", f"{API_CERTIFICATES_ACME_CERTS_GET}/{cert_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_acme_certificate", e) @mcp.tool(name="create_acme_certificate", description="Create a new ACME (Let's Encrypt) certificate") async def create_acme_certificate(ctx: RequestContext, name: str, account_uuid: str, common_name: str, alternative_names: str = "", key_length: int = 2048, auto_renewal: bool = True) -> str: """Create a new ACME (Let's Encrypt) certificate. Args: ctx: Request context name: Descriptive name for the certificate account_uuid: UUID of the ACME account to use common_name: Primary domain name (e.g., example.com) alternative_names: Additional domain names (comma-separated, e.g., www.example.com,api.example.com) key_length: RSA key length (2048, 4096) auto_renewal: Enable automatic renewal (default: True) Returns: JSON string with creation result and new certificate UUID """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") # Validation if not name: raise ValueError("Certificate name is required") if not account_uuid: raise ValueError("ACME account UUID is required") if not common_name: raise ValueError("Common name (domain) is required") if key_length not in [2048, 4096]: raise ValueError("Key length must be 2048 or 4096") cert_data = { "cert": { "name": name, "account": account_uuid, "common_name": common_name, "alternative_names": alternative_names.strip(), "key_length": str(key_length), "auto_renewal": "1" if auto_renewal else "0" } } response = await opnsense_client.request("POST", API_CERTIFICATES_ACME_CERTS_ADD, json=cert_data) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_acme_certificate", e) @mcp.tool(name="sign_acme_certificate", description="Sign/issue an ACME (Let's Encrypt) certificate") async def sign_acme_certificate(ctx: RequestContext, cert_uuid: str) -> str: """Sign/issue an ACME (Let's Encrypt) certificate. Args: ctx: Request context cert_uuid: UUID of the ACME certificate to sign Returns: JSON string with signing result """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("ACME certificate UUID is required") response = await opnsense_client.request("POST", f"{API_CERTIFICATES_ACME_CERTS_SIGN}/{cert_uuid}") # Apply configuration if successful if response.get("result") == "ok": await opnsense_client.request("POST", API_CERTIFICATES_SERVICE_RECONFIGURE) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "sign_acme_certificate", e) @mcp.tool(name="revoke_acme_certificate", description="Revoke an ACME (Let's Encrypt) certificate") async def revoke_acme_certificate(ctx: RequestContext, cert_uuid: str) -> str: """Revoke an ACME (Let's Encrypt) certificate. Args: ctx: Request context cert_uuid: UUID of the ACME certificate to revoke Returns: JSON string with revocation result """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("ACME certificate UUID is required") response = await opnsense_client.request("POST", f"{API_CERTIFICATES_ACME_CERTS_REVOKE}/{cert_uuid}") # Apply configuration if successful if response.get("result") == "ok": await opnsense_client.request("POST", API_CERTIFICATES_SERVICE_RECONFIGURE) return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "revoke_acme_certificate", e) @mcp.tool(name="delete_acme_certificate", description="Delete an ACME (Let's Encrypt) certificate") async def delete_acme_certificate(ctx: RequestContext, cert_uuid: str) -> str: """Delete an ACME (Let's Encrypt) certificate. Args: ctx: Request context cert_uuid: UUID of the ACME certificate to delete Returns: JSON string with deletion result """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("ACME certificate UUID is required") response = await opnsense_client.request("POST", f"{API_CERTIFICATES_ACME_CERTS_DEL}/{cert_uuid}") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_acme_certificate", e) @mcp.tool(name="analyze_certificate_expiration", description="Analyze certificate expiration dates and provide alerts") async def analyze_certificate_expiration(ctx: RequestContext, warning_days: int = 30) -> str: """Analyze certificate expiration dates and provide expiration alerts. Args: ctx: Request context warning_days: Number of days before expiration to warn (default: 30) Returns: JSON string with certificate expiration analysis """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if warning_days < 0: raise ValueError("Warning days must be non-negative") # Get all certificates cert_response = await opnsense_client.request("POST", API_CERTIFICATES_CERT_SEARCH) ca_response = await opnsense_client.request("POST", API_CERTIFICATES_CA_SEARCH) acme_response = await opnsense_client.request("POST", API_CERTIFICATES_ACME_CERTS_SEARCH) analysis = { "analysis_date": "current_date", "warning_threshold_days": warning_days, "certificates": { "total": 0, "expired": [], "expiring_soon": [], "valid": [], "errors": [] }, "certificate_authorities": { "total": 0, "expired": [], "expiring_soon": [], "valid": [], "errors": [] }, "acme_certificates": { "total": 0, "expired": [], "expiring_soon": [], "valid": [], "errors": [], "auto_renewal_enabled": 0 }, "recommendations": [] } # Analyze certificates if cert_response.get("rows"): analysis["certificates"]["total"] = len(cert_response["rows"]) for cert in cert_response["rows"]: cert_info = { "uuid": cert.get("uuid"), "description": cert.get("descr", "Unknown"), "common_name": cert.get("CN", "Unknown"), "issuer": cert.get("issuer", "Unknown"), "not_after": cert.get("not_after", "Unknown") } if cert.get("not_after") == "Unknown" or not cert.get("not_after"): analysis["certificates"]["errors"].append({ **cert_info, "error": "Cannot determine expiration date" }) else: analysis["certificates"]["valid"].append(cert_info) # Analyze CAs if ca_response.get("rows"): analysis["certificate_authorities"]["total"] = len(ca_response["rows"]) for ca in ca_response["rows"]: ca_info = { "uuid": ca.get("uuid"), "description": ca.get("descr", "Unknown"), "common_name": ca.get("CN", "Unknown"), "not_after": ca.get("not_after", "Unknown") } if ca.get("not_after") == "Unknown" or not ca.get("not_after"): analysis["certificate_authorities"]["errors"].append({ **ca_info, "error": "Cannot determine expiration date" }) else: analysis["certificate_authorities"]["valid"].append(ca_info) # Analyze ACME certificates if acme_response.get("rows"): analysis["acme_certificates"]["total"] = len(acme_response["rows"]) for acme in acme_response["rows"]: acme_info = { "uuid": acme.get("uuid"), "name": acme.get("name", "Unknown"), "common_name": acme.get("common_name", "Unknown"), "status": acme.get("status", "Unknown"), "auto_renewal": acme.get("auto_renewal", "0") } if acme.get("auto_renewal") == "1": analysis["acme_certificates"]["auto_renewal_enabled"] += 1 analysis["acme_certificates"]["valid"].append(acme_info) # Generate recommendations recommendations = [] if analysis["certificates"]["errors"]: recommendations.append("Some certificates have invalid expiration dates. Review certificate configurations.") if analysis["certificate_authorities"]["errors"]: recommendations.append("Some Certificate Authorities have invalid expiration dates. Review CA configurations.") if analysis["acme_certificates"]["total"] > 0: auto_renewal_ratio = analysis["acme_certificates"]["auto_renewal_enabled"] / analysis["acme_certificates"]["total"] if auto_renewal_ratio < 1.0: recommendations.append(f"Only {analysis['acme_certificates']['auto_renewal_enabled']}/{analysis['acme_certificates']['total']} ACME certificates have auto-renewal enabled. Consider enabling auto-renewal for all Let's Encrypt certificates.") if not recommendations: recommendations.append("Certificate configuration appears healthy. Continue monitoring expiration dates.") analysis["recommendations"] = recommendations return json.dumps(analysis, indent=2) except Exception as e: return await handle_tool_error(ctx, "analyze_certificate_expiration", e) @mcp.tool(name="validate_certificate_chain", description="Validate certificate chain and trust relationships") async def validate_certificate_chain(ctx: RequestContext, cert_uuid: str) -> str: """Validate certificate chain and trust relationships for a specific certificate. Args: ctx: Request context cert_uuid: UUID of the certificate to validate Returns: JSON string with certificate chain validation results """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") if not cert_uuid: raise ValueError("Certificate UUID is required") # Get certificate details cert_response = await opnsense_client.request("GET", f"{API_CERTIFICATES_CERT_GET}/{cert_uuid}") if not cert_response.get("cert"): raise ValueError("Certificate not found") cert_data = cert_response["cert"] validation_result = { "certificate_uuid": cert_uuid, "certificate_info": { "description": cert_data.get("descr", "Unknown"), "common_name": cert_data.get("CN", "Unknown"), "issuer": cert_data.get("issuer", "Unknown"), "not_before": cert_data.get("not_before", "Unknown"), "not_after": cert_data.get("not_after", "Unknown"), "serial": cert_data.get("serial", "Unknown") }, "validation_checks": { "has_private_key": bool(cert_data.get("prv")), "has_certificate": bool(cert_data.get("crt")), "format_valid": True, # Basic assumption "chain_complete": False, "self_signed": False, "expired": False, "not_yet_valid": False }, "issues": [], "recommendations": [] } # Basic validation checks issues = [] recommendations = [] if not cert_data.get("crt"): issues.append("Certificate data is missing") if not cert_data.get("prv"): issues.append("Private key is missing - certificate cannot be used for SSL/TLS services") recommendations.append("Import the private key for this certificate to enable SSL/TLS usage") # Check if certificate appears to be self-signed if cert_data.get("issuer") == cert_data.get("CN"): validation_result["validation_checks"]["self_signed"] = True recommendations.append("Self-signed certificate detected. Consider using CA-signed certificates for production") # Get all CAs to check chain ca_response = await opnsense_client.request("POST", API_CERTIFICATES_CA_SEARCH) if ca_response.get("rows"): for ca in ca_response["rows"]: if ca.get("CN") == cert_data.get("issuer"): validation_result["validation_checks"]["chain_complete"] = True break if not validation_result["validation_checks"]["chain_complete"] and not validation_result["validation_checks"]["self_signed"]: issues.append("Certificate Authority for this certificate is not found in OPNsense") recommendations.append("Import the issuing Certificate Authority to complete the certificate chain") if not issues: recommendations.append("Certificate appears to be properly configured") validation_result["issues"] = issues validation_result["recommendations"] = recommendations return json.dumps(validation_result, indent=2) except Exception as e: return await handle_tool_error(ctx, "validate_certificate_chain", e) @mcp.tool(name="get_certificate_usage", description="Get information about where certificates are used in OPNsense") async def get_certificate_usage(ctx: RequestContext) -> str: """Get information about where certificates are currently used in OPNsense configuration. Args: ctx: Request context Returns: JSON string with certificate usage information """ try: if not opnsense_client: raise ValueError("OPNsense client not configured. Please run configure_opnsense_connection first.") # Get certificates and CAs cert_response = await opnsense_client.request("POST", API_CERTIFICATES_CERT_SEARCH) ca_response = await opnsense_client.request("POST", API_CERTIFICATES_CA_SEARCH) usage_info = { "certificates": [], "certificate_authorities": [], "usage_summary": { "total_certificates": 0, "total_cas": 0, "potentially_unused_certs": 0, "potentially_unused_cas": 0 }, "recommendations": [] } # Process certificates if cert_response.get("rows"): usage_info["usage_summary"]["total_certificates"] = len(cert_response["rows"]) for cert in cert_response["rows"]: cert_info = { "uuid": cert.get("uuid"), "description": cert.get("descr", "Unknown"), "common_name": cert.get("CN", "Unknown"), "has_private_key": bool(cert.get("prv")), "potential_uses": [] } # Determine potential uses based on certificate properties if cert.get("prv"): cert_info["potential_uses"].extend([ "Web GUI HTTPS", "OpenVPN Server", "IPsec VPN", "HAProxy SSL", "Captive Portal HTTPS" ]) else: cert_info["potential_uses"].append("Client authentication only (no private key)") # Simple heuristic for unused certificates if not cert.get("prv"): usage_info["usage_summary"]["potentially_unused_certs"] += 1 usage_info["certificates"].append(cert_info) # Process Certificate Authorities if ca_response.get("rows"): usage_info["usage_summary"]["total_cas"] = len(ca_response["rows"]) for ca in ca_response["rows"]: ca_info = { "uuid": ca.get("uuid"), "description": ca.get("descr", "Unknown"), "common_name": ca.get("CN", "Unknown"), "potential_uses": [ "Certificate chain validation", "Client certificate authority", "OpenVPN CA", "IPsec Certificate Authority" ] } usage_info["certificate_authorities"].append(ca_info) # Generate recommendations recommendations = [] if usage_info["usage_summary"]["potentially_unused_certs"] > 0: recommendations.append(f"{usage_info['usage_summary']['potentially_unused_certs']} certificates lack private keys and may be unused. Consider cleaning up unused certificates.") if usage_info["usage_summary"]["total_certificates"] == 0: recommendations.append("No certificates configured. Consider setting up SSL/TLS certificates for secure services.") if usage_info["usage_summary"]["total_cas"] == 0: recommendations.append("No Certificate Authorities configured. Consider setting up a CA for internal certificate management.") if not recommendations: recommendations.append("Certificate inventory appears reasonable. Review individual certificate usage as needed.") usage_info["recommendations"] = recommendations return json.dumps(usage_info, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_certificate_usage", e) # ========== USER MANAGEMENT ========== @mcp.tool(name="list_users", description="List all users in OPNsense") async def list_users(ctx: Context) -> str: """List all users configured in OPNsense. Args: ctx: MCP context Returns: JSON string with list of all users """ try: client = await get_opnsense_client() response = await client.request("POST", API_CORE_USER_SEARCH, operation="list_users") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_users", e) @mcp.tool(name="get_user", description="Get details of a specific user") async def get_user(ctx: Context, user_uuid: Optional[str] = None) -> str: """Get details of a specific user or all users. Args: ctx: MCP context user_uuid: UUID of specific user to retrieve (optional - if not provided, returns all users) Returns: JSON string with user details """ try: client = await get_opnsense_client() # Validate UUID if provided if user_uuid: validate_uuid(user_uuid, "user_uuid") endpoint = f"{API_CORE_USER_GET}/{user_uuid}" else: endpoint = API_CORE_USER_GET response = await client.request("GET", endpoint, operation="get_user") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_user", e) @mcp.tool(name="create_user", description="Create a new user account") async def create_user( ctx: Context, username: str, password: str, full_name: str = "", email: str = "", groups: Optional[str] = None, privileges: Optional[str] = None, enabled: bool = True, expires: Optional[str] = None, comment: str = "" ) -> str: """Create a new user account in OPNsense. Args: ctx: MCP context username: Unique username for the account password: Password for the user (will be hashed) full_name: Full name of the user (optional) email: Email address of the user (optional) groups: Comma-separated list of group names (optional) privileges: Comma-separated list of privilege names (optional) enabled: Whether the account should be enabled (default: True) expires: Expiration date in YYYY-MM-DD format (optional) comment: Additional comments about the user (optional) Returns: JSON string with creation result and new user UUID """ try: client = await get_opnsense_client() # Validate required parameters if not username or not password: raise ValidationError("Username and password are required", context={"username": username, "has_password": bool(password)}) if len(username) < 3: raise ValidationError("Username must be at least 3 characters long", context={"username": username}) if len(password) < 6: raise ValidationError("Password must be at least 6 characters long") # Prepare user data user_data = { "user": { "enabled": "1" if enabled else "0", "name": username, "password": password, "full_name": full_name, "email": email, "comment": comment } } # Add groups if specified if groups: # Convert comma-separated string to list and validate group names group_list = [g.strip() for g in groups.split(",") if g.strip()] user_data["user"]["groups"] = ",".join(group_list) # Add privileges if specified if privileges: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] user_data["user"]["priv"] = ",".join(priv_list) # Add expiration if specified if expires: # Basic date format validation import re if not re.match(r'^\d{4}-\d{2}-\d{2}$', expires): raise ValidationError("Expires must be in YYYY-MM-DD format", context={"expires": expires}) user_data["user"]["expires"] = expires # Create the user response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation="create_user") # Reload configuration if creation was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_create") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_user", e) @mcp.tool(name="update_user", description="Update an existing user account") async def update_user( ctx: Context, user_uuid: str, username: Optional[str] = None, password: Optional[str] = None, full_name: Optional[str] = None, email: Optional[str] = None, groups: Optional[str] = None, privileges: Optional[str] = None, enabled: Optional[bool] = None, expires: Optional[str] = None, comment: Optional[str] = None ) -> str: """Update an existing user account in OPNsense. Args: ctx: MCP context user_uuid: UUID of the user to update username: New username (optional) password: New password (optional) full_name: New full name (optional) email: New email address (optional) groups: Comma-separated list of group names (optional) privileges: Comma-separated list of privilege names (optional) enabled: Whether the account should be enabled (optional) expires: Expiration date in YYYY-MM-DD format (optional) comment: Additional comments about the user (optional) Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(user_uuid, "user_uuid") # Get current user configuration current_user_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_for_update") if "user" not in current_user_response: raise ResourceNotFoundError(f"User with UUID {user_uuid} not found") current_user = current_user_response["user"] # Update only provided fields if username is not None: if len(username) < 3: raise ValidationError("Username must be at least 3 characters long", context={"username": username}) current_user["name"] = username if password is not None: if len(password) < 6: raise ValidationError("Password must be at least 6 characters long") current_user["password"] = password if full_name is not None: current_user["full_name"] = full_name if email is not None: current_user["email"] = email if groups is not None: # Convert comma-separated string to list and validate group names group_list = [g.strip() for g in groups.split(",") if g.strip()] current_user["groups"] = ",".join(group_list) if privileges is not None: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] current_user["priv"] = ",".join(priv_list) if enabled is not None: current_user["enabled"] = "1" if enabled else "0" if expires is not None: if expires: # Only validate if not empty import re if not re.match(r'^\d{4}-\d{2}-\d{2}$', expires): raise ValidationError("Expires must be in YYYY-MM-DD format or empty", context={"expires": expires}) current_user["expires"] = expires if comment is not None: current_user["comment"] = comment # Prepare update data user_data = {"user": current_user} # Update the user response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=user_data, operation="update_user") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_update") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_user", e) @mcp.tool(name="delete_user", description="Delete a user account") async def delete_user(ctx: Context, user_uuid: str) -> str: """Delete a user account from OPNsense. This will remove the user account and all associated data including API keys. Args: ctx: MCP context user_uuid: UUID of the user to delete Returns: JSON string with deletion result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(user_uuid, "user_uuid") # Delete the user response = await client.request("POST", f"{API_CORE_USER_DEL}/{user_uuid}", operation="delete_user") # Reload configuration if deletion was successful if response.get("result") == "deleted": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_delete") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_user", e) @mcp.tool(name="toggle_user", description="Enable or disable a user account") async def toggle_user(ctx: Context, user_uuid: str, enabled: bool) -> str: """Enable or disable a user account. Args: ctx: MCP context user_uuid: UUID of the user to toggle enabled: True to enable, False to disable Returns: JSON string with toggle result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(user_uuid, "user_uuid") # Toggle the user enabled_int = 1 if enabled else 0 response = await client.request("POST", f"{API_CORE_USER_TOGGLE}/{user_uuid}/{enabled_int}", operation="toggle_user") # Reload configuration if toggle was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_toggle") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "toggle_user", e) # ========== GROUP MANAGEMENT ========== @mcp.tool(name="list_groups", description="List all groups in OPNsense") async def list_groups(ctx: Context) -> str: """List all groups configured in OPNsense. Args: ctx: MCP context Returns: JSON string with list of all groups """ try: client = await get_opnsense_client() response = await client.request("POST", API_CORE_GROUP_SEARCH, operation="list_groups") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_groups", e) @mcp.tool(name="get_group", description="Get details of a specific group") async def get_group(ctx: Context, group_uuid: Optional[str] = None) -> str: """Get details of a specific group or all groups. Args: ctx: MCP context group_uuid: UUID of specific group to retrieve (optional - if not provided, returns all groups) Returns: JSON string with group details """ try: client = await get_opnsense_client() # Validate UUID if provided if group_uuid: validate_uuid(group_uuid, "group_uuid") endpoint = f"{API_CORE_GROUP_GET}/{group_uuid}" else: endpoint = API_CORE_GROUP_GET response = await client.request("GET", endpoint, operation="get_group") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_group", e) @mcp.tool(name="create_group", description="Create a new group") async def create_group( ctx: Context, name: str, description: str = "", privileges: Optional[str] = None, members: Optional[str] = None ) -> str: """Create a new group in OPNsense. Args: ctx: MCP context name: Name of the group (must be unique) description: Description of the group (optional) privileges: Comma-separated list of privilege names (optional) members: Comma-separated list of usernames to add to group (optional) Returns: JSON string with creation result and new group UUID """ try: client = await get_opnsense_client() # Validate required parameters if not name: raise ValidationError("Group name is required", context={"name": name}) if len(name) < 2: raise ValidationError("Group name must be at least 2 characters long", context={"name": name}) # Prepare group data group_data = { "group": { "name": name, "description": description } } # Add privileges if specified if privileges: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] group_data["group"]["priv"] = ",".join(priv_list) # Add members if specified if members: # Convert comma-separated string to list and validate usernames member_list = [m.strip() for m in members.split(",") if m.strip()] group_data["group"]["member"] = ",".join(member_list) # Create the group response = await client.request("POST", API_CORE_GROUP_ADD, data=group_data, operation="create_group") # Reload configuration if creation was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_create") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_group", e) @mcp.tool(name="update_group", description="Update an existing group") async def update_group( ctx: Context, group_uuid: str, name: Optional[str] = None, description: Optional[str] = None, privileges: Optional[str] = None, members: Optional[str] = None ) -> str: """Update an existing group in OPNsense. Args: ctx: MCP context group_uuid: UUID of the group to update name: New name for the group (optional) description: New description for the group (optional) privileges: Comma-separated list of privilege names (optional) members: Comma-separated list of usernames in group (optional) Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") # Get current group configuration current_group_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_for_update") if "group" not in current_group_response: raise ResourceNotFoundError(f"Group with UUID {group_uuid} not found") current_group = current_group_response["group"] # Update only provided fields if name is not None: if len(name) < 2: raise ValidationError("Group name must be at least 2 characters long", context={"name": name}) current_group["name"] = name if description is not None: current_group["description"] = description if privileges is not None: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] current_group["priv"] = ",".join(priv_list) if members is not None: # Convert comma-separated string to list and validate usernames member_list = [m.strip() for m in members.split(",") if m.strip()] current_group["member"] = ",".join(member_list) # Prepare update data group_data = {"group": current_group} # Update the group response = await client.request("POST", f"{API_CORE_GROUP_SET}/{group_uuid}", data=group_data, operation="update_group") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_update") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_group", e) @mcp.tool(name="delete_group", description="Delete a group") async def delete_group(ctx: Context, group_uuid: str) -> str: """Delete a group from OPNsense. This will remove the group and update all users who were members of this group. Args: ctx: MCP context group_uuid: UUID of the group to delete Returns: JSON string with deletion result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") # Delete the group response = await client.request("POST", f"{API_CORE_GROUP_DEL}/{group_uuid}", operation="delete_group") # Reload configuration if deletion was successful if response.get("result") == "deleted": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_delete") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_group", e) @mcp.tool(name="add_user_to_group", description="Add a user to a group") async def add_user_to_group(ctx: Context, group_uuid: str, username: str) -> str: """Add a user to an existing group. Args: ctx: MCP context group_uuid: UUID of the group to modify username: Username to add to the group Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") if not username: raise ValidationError("Username is required", context={"username": username}) # Get current group configuration current_group_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_for_member_add") if "group" not in current_group_response: raise ResourceNotFoundError(f"Group with UUID {group_uuid} not found") current_group = current_group_response["group"] # Get current members current_members = [] if "member" in current_group and current_group["member"]: current_members = [m.strip() for m in current_group["member"].split(",") if m.strip()] # Check if user is already a member if username in current_members: return json.dumps({"result": "no_change", "message": f"User '{username}' is already a member of the group"}, indent=2) # Add the new member current_members.append(username) current_group["member"] = ",".join(current_members) # Prepare update data group_data = {"group": current_group} # Update the group response = await client.request("POST", f"{API_CORE_GROUP_SET}/{group_uuid}", data=group_data, operation="add_user_to_group") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_member_add") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "add_user_to_group", e) @mcp.tool(name="remove_user_from_group", description="Remove a user from a group") async def remove_user_from_group(ctx: Context, group_uuid: str, username: str) -> str: """Remove a user from an existing group. Args: ctx: MCP context group_uuid: UUID of the group to modify username: Username to remove from the group Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") if not username: raise ValidationError("Username is required", context={"username": username}) # Get current group configuration current_group_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_for_member_remove") if "group" not in current_group_response: raise ResourceNotFoundError(f"Group with UUID {group_uuid} not found") current_group = current_group_response["group"] # Get current members current_members = [] if "member" in current_group and current_group["member"]: current_members = [m.strip() for m in current_group["member"].split(",") if m.strip()] # Check if user is actually a member if username not in current_members: return json.dumps({"result": "no_change", "message": f"User '{username}' is not a member of the group"}, indent=2) # Remove the member current_members.remove(username) current_group["member"] = ",".join(current_members) # Prepare update data group_data = {"group": current_group} # Update the group response = await client.request("POST", f"{API_CORE_GROUP_SET}/{group_uuid}", data=group_data, operation="remove_user_from_group") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_member_remove") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "remove_user_from_group", e) # ========== AUTHENTICATION & PRIVILEGE MANAGEMENT ========== @mcp.tool(name="list_privileges", description="List all available privileges in OPNsense") async def list_privileges(ctx: Context) -> str: """List all available privileges and their descriptions in OPNsense. Args: ctx: MCP context Returns: JSON string with list of all available privileges """ try: client = await get_opnsense_client() response = await client.request("GET", API_CORE_AUTH_PRIVILEGES, operation="list_privileges") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_privileges", e) @mcp.tool(name="get_user_effective_privileges", description="Get effective privileges for a user") async def get_user_effective_privileges(ctx: Context, username: str) -> str: """Get the effective privileges for a specific user (combines user and group privileges). Args: ctx: MCP context username: Username to get privileges for Returns: JSON string with user's effective privileges """ try: client = await get_opnsense_client() if not username: raise ValidationError("Username is required", context={"username": username}) # First get the user details to find their UUID users_response = await client.request("POST", API_CORE_USER_SEARCH, operation="search_user_for_privileges") user_uuid = None if "rows" in users_response: for user in users_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") break if not user_uuid: raise ResourceNotFoundError(f"User '{username}' not found") # Get detailed user information including groups and privileges user_details_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_privileges") if "user" not in user_details_response: raise ResourceNotFoundError(f"User details for '{username}' not found") user_details = user_details_response["user"] # Collect all privileges effective_privileges = set() # Add direct user privileges if "priv" in user_details and user_details["priv"]: user_privs = [p.strip() for p in user_details["priv"].split(",") if p.strip()] effective_privileges.update(user_privs) # Add group privileges if "groups" in user_details and user_details["groups"]: group_names = [g.strip() for g in user_details["groups"].split(",") if g.strip()] # Get all groups to find UUIDs and privileges groups_response = await client.request("POST", API_CORE_GROUP_SEARCH, operation="search_groups_for_user_privileges") if "rows" in groups_response: for group in groups_response["rows"]: if group.get("name") in group_names: # Get detailed group information group_uuid = group.get("uuid") if group_uuid: group_details_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_privileges") if "group" in group_details_response: group_details = group_details_response["group"] if "priv" in group_details and group_details["priv"]: group_privs = [p.strip() for p in group_details["priv"].split(",") if p.strip()] effective_privileges.update(group_privs) # Format result result = { "username": username, "user_uuid": user_uuid, "direct_privileges": [p.strip() for p in user_details.get("priv", "").split(",") if p.strip()], "group_memberships": [g.strip() for g in user_details.get("groups", "").split(",") if g.strip()], "effective_privileges": sorted(list(effective_privileges)), "privilege_count": len(effective_privileges) } return json.dumps(result, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_user_effective_privileges", e) @mcp.tool(name="assign_privilege_to_user", description="Assign a privilege directly to a user") async def assign_privilege_to_user(ctx: Context, username: str, privilege: str) -> str: """Assign a specific privilege directly to a user. Args: ctx: MCP context username: Username to assign privilege to privilege: Privilege name to assign Returns: JSON string with assignment result """ try: client = await get_opnsense_client() if not username or not privilege: raise ValidationError("Username and privilege are required", context={"username": username, "privilege": privilege}) # Find the user users_response = await client.request("POST", API_CORE_USER_SEARCH, operation="search_user_for_privilege_assignment") user_uuid = None if "rows" in users_response: for user in users_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") break if not user_uuid: raise ResourceNotFoundError(f"User '{username}' not found") # Get current user details user_details_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_for_privilege_assignment") if "user" not in user_details_response: raise ResourceNotFoundError(f"User details for '{username}' not found") current_user = user_details_response["user"] # Get current privileges current_privileges = [] if "priv" in current_user and current_user["priv"]: current_privileges = [p.strip() for p in current_user["priv"].split(",") if p.strip()] # Check if privilege is already assigned if privilege in current_privileges: return json.dumps({"result": "no_change", "message": f"Privilege '{privilege}' is already assigned to user '{username}'"}, indent=2) # Add the new privilege current_privileges.append(privilege) current_user["priv"] = ",".join(current_privileges) # Update the user user_data = {"user": current_user} response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=user_data, operation="assign_privilege_to_user") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_privilege_assignment") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "assign_privilege_to_user", e) @mcp.tool(name="revoke_privilege_from_user", description="Revoke a privilege directly from a user") async def revoke_privilege_from_user(ctx: Context, username: str, privilege: str) -> str: """Revoke a specific privilege directly from a user. Args: ctx: MCP context username: Username to revoke privilege from privilege: Privilege name to revoke Returns: JSON string with revocation result """ try: client = await get_opnsense_client() if not username or not privilege: raise ValidationError("Username and privilege are required", context={"username": username, "privilege": privilege}) # Find the user users_response = await client.request("POST", API_CORE_USER_SEARCH, operation="search_user_for_privilege_revocation") user_uuid = None if "rows" in users_response: for user in users_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") break if not user_uuid: raise ResourceNotFoundError(f"User '{username}' not found") # Get current user details user_details_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_for_privilege_revocation") if "user" not in user_details_response: raise ResourceNotFoundError(f"User details for '{username}' not found") current_user = user_details_response["user"] # Get current privileges current_privileges = [] if "priv" in current_user and current_user["priv"]: current_privileges = [p.strip() for p in current_user["priv"].split(",") if p.strip()] # Check if privilege is actually assigned if privilege not in current_privileges: return json.dumps({"result": "no_change", "message": f"Privilege '{privilege}' is not assigned to user '{username}'"}, indent=2) # Remove the privilege current_privileges.remove(privilege) current_user["priv"] = ",".join(current_privileges) # Update the user user_data = {"user": current_user} response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=user_data, operation="revoke_privilege_from_user") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_privilege_revocation") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "revoke_privilege_from_user", e) @mcp.tool(name="list_auth_servers", description="List configured authentication servers") async def list_auth_servers(ctx: Context) -> str: """List all configured authentication servers (LDAP, RADIUS, etc.). Args: ctx: MCP context Returns: JSON string with list of authentication servers """ try: client = await get_opnsense_client() response = await client.request("GET", API_CORE_AUTH_SERVERS, operation="list_auth_servers") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_auth_servers", e) @mcp.tool(name="test_user_authentication", description="Test user authentication against configured servers") async def test_user_authentication(ctx: Context, username: str, auth_server: Optional[str] = None) -> str: """Test user authentication against a specific authentication server or all servers. Args: ctx: MCP context username: Username to test authentication for auth_server: Specific authentication server to test against (optional) Returns: JSON string with authentication test results """ try: client = await get_opnsense_client() if not username: raise ValidationError("Username is required", context={"username": username}) # Prepare test data test_data = { "username": username } if auth_server: test_data["auth_server"] = auth_server response = await client.request("POST", API_CORE_AUTH_TEST, data=test_data, operation="test_user_authentication") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "test_user_authentication", e) # ========== USER MANAGEMENT HELPER TOOLS ========== @mcp.tool() async def create_admin_user(ctx: RequestContext, username: str, password: str, full_name: str = "", email: str = "") -> str: """ Create a new administrative user with full system privileges. Args: username: Username for the new admin user password: Password for the user full_name: Full name of the user (optional) email: Email address of the user (optional) Returns: JSON response with creation status and user details """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Create the user first user_data = { "user": { "name": username, "password": password, "full_name": full_name or username, "email": email, "disabled": "0", "expires": "", "comment": "Administrative user created via MCP" } } response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation="create_admin_user") if response.get("result") != "saved": return json.dumps({"error": "Failed to create user", "response": response}, indent=2) user_uuid = response.get("uuid") if not user_uuid: return json.dumps({"error": "User created but UUID not returned", "response": response}, indent=2) # Get all available privileges privileges_response = await client.request("GET", API_CORE_AUTH_PRIVILEGES, operation="get_privileges_for_admin") if "privileges" not in privileges_response: return json.dumps({ "user_created": True, "uuid": user_uuid, "warning": "User created but could not retrieve privileges for assignment" }, indent=2) # Assign all privileges to make this a full admin all_privileges = list(privileges_response["privileges"].keys()) privilege_string = ",".join(all_privileges) # Update user with all privileges update_data = { "user": { "name": username, "password": password, "full_name": full_name or username, "email": email, "disabled": "0", "expires": "", "comment": "Administrative user created via MCP", "priv": privilege_string } } await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=update_data, operation="assign_admin_privileges") # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_admin_creation") return json.dumps({ "result": "success", "message": f"Administrative user '{username}' created successfully", "uuid": user_uuid, "privileges_assigned": len(all_privileges), "full_admin": True }, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_admin_user", e) @mcp.tool() async def create_readonly_user(ctx: RequestContext, username: str, password: str, full_name: str = "", email: str = "") -> str: """ Create a new read-only user with limited system access. Args: username: Username for the new read-only user password: Password for the user full_name: Full name of the user (optional) email: Email address of the user (optional) Returns: JSON response with creation status and user details """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Define read-only privileges (common monitoring/viewing privileges) readonly_privileges = [ "page-all", # Basic page access "page-status-system", # System status "page-status-interfaces", # Interface status "page-status-logs", # Log viewing "page-diagnostics-all", # Diagnostic tools "page-status-dashboard", # Dashboard access "page-firewall-rules", # Firewall rule viewing (read-only) "page-interfaces-overview" # Interface overview ] # Create user with read-only privileges user_data = { "user": { "name": username, "password": password, "full_name": full_name or username, "email": email, "disabled": "0", "expires": "", "comment": "Read-only user created via MCP", "priv": ",".join(readonly_privileges) } } response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation="create_readonly_user") if response.get("result") != "saved": return json.dumps({"error": "Failed to create read-only user", "response": response}, indent=2) # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_readonly_creation") return json.dumps({ "result": "success", "message": f"Read-only user '{username}' created successfully", "uuid": response.get("uuid"), "privileges_assigned": readonly_privileges, "access_level": "read-only" }, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_readonly_user", e) @mcp.tool() async def reset_user_password(ctx: RequestContext, username: str, new_password: str) -> str: """ Reset a user's password by username. Args: username: Username of the user whose password to reset new_password: New password to set Returns: JSON response with password reset status """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # First, find the user by username search_response = await client.request("GET", API_CORE_USER_SEARCH, operation="search_user_for_password_reset") if "rows" not in search_response: return json.dumps({"error": "Failed to retrieve user list"}, indent=2) user_uuid = None user_data = None for user in search_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") user_data = user break if not user_uuid: return json.dumps({"error": f"User '{username}' not found"}, indent=2) # Get full user details user_detail_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_details_for_password_reset") if "user" not in user_detail_response: return json.dumps({"error": "Failed to retrieve user details"}, indent=2) current_user = user_detail_response["user"] # Update user with new password (preserve all other settings) update_data = { "user": { "name": current_user.get("name", username), "password": new_password, "full_name": current_user.get("full_name", ""), "email": current_user.get("email", ""), "disabled": current_user.get("disabled", "0"), "expires": current_user.get("expires", ""), "comment": current_user.get("comment", ""), "priv": current_user.get("priv", ""), "groups": current_user.get("groups", "") } } response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=update_data, operation="reset_user_password") if response.get("result") != "saved": return json.dumps({"error": "Failed to reset password", "response": response}, indent=2) # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_password_reset") return json.dumps({ "result": "success", "message": f"Password successfully reset for user '{username}'", "uuid": user_uuid }, indent=2) except Exception as e: return await handle_tool_error(ctx, "reset_user_password", e) @mcp.tool() async def bulk_user_creation(ctx: RequestContext, user_template: str) -> str: """ Create multiple users from a template specification. Args: user_template: JSON string containing user template and list of users to create. Format: { "template": { "password": "default_password", "privileges": ["priv1", "priv2"], "groups": ["group1"], "expires": "", "disabled": "0" }, "users": [ {"username": "user1", "full_name": "User One", "email": "user1@example.com"}, {"username": "user2", "full_name": "User Two", "email": "user2@example.com"} ] } Returns: JSON response with bulk creation results """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Parse the template try: template_data = json.loads(user_template) except json.JSONDecodeError as e: return json.dumps({"error": f"Invalid JSON template: {str(e)}"}, indent=2) if "template" not in template_data or "users" not in template_data: return json.dumps({"error": "Template must contain 'template' and 'users' sections"}, indent=2) template = template_data["template"] users_to_create = template_data["users"] results = [] successful_creations = 0 for user_spec in users_to_create: try: username = user_spec.get("username") if not username: results.append({"error": "Username required for each user", "user_spec": user_spec}) continue # Build user data from template and user-specific overrides user_data = { "user": { "name": username, "password": user_spec.get("password", template.get("password", "")), "full_name": user_spec.get("full_name", template.get("full_name", username)), "email": user_spec.get("email", template.get("email", "")), "disabled": user_spec.get("disabled", template.get("disabled", "0")), "expires": user_spec.get("expires", template.get("expires", "")), "comment": user_spec.get("comment", template.get("comment", "Bulk created via MCP")), "priv": ",".join(user_spec.get("privileges", template.get("privileges", []))), "groups": ",".join(user_spec.get("groups", template.get("groups", []))) } } response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation=f"bulk_create_user_{username}") if response.get("result") == "saved": results.append({ "username": username, "status": "success", "uuid": response.get("uuid") }) successful_creations += 1 else: results.append({ "username": username, "status": "failed", "error": response.get("validations", "Unknown error") }) except Exception as user_error: results.append({ "username": user_spec.get("username", "unknown"), "status": "failed", "error": str(user_error) }) # Reload configuration if any users were created if successful_creations > 0: await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_bulk_creation") return json.dumps({ "result": "completed", "total_users": len(users_to_create), "successful_creations": successful_creations, "failed_creations": len(users_to_create) - successful_creations, "details": results }, indent=2) except Exception as e: return await handle_tool_error(ctx, "bulk_user_creation", e) @mcp.tool() async def setup_user_group_template(ctx: RequestContext, template_name: str, privileges: list, description: str = "") -> str: """ Create a user group template with predefined privileges for common roles. Args: template_name: Name for the group template privileges: List of privilege names to assign to the group description: Description of the group's purpose Returns: JSON response with group creation status """ try: client = get_opnsense_client() if not client: return "Error: OPNsense connection not configured. Use configure_opnsense_connection first." # Validate privileges exist privileges_response = await client.request("GET", API_CORE_AUTH_PRIVILEGES, operation="validate_privileges_for_template") if "privileges" not in privileges_response: return json.dumps({"error": "Could not retrieve available privileges"}, indent=2) available_privileges = set(privileges_response["privileges"].keys()) invalid_privileges = [p for p in privileges if p not in available_privileges] if invalid_privileges: return json.dumps({ "error": "Invalid privileges specified", "invalid_privileges": invalid_privileges, "available_privileges": list(available_privileges) }, indent=2) # Create the group group_data = { "group": { "name": template_name, "description": description or f"Template group: {template_name}", "priv": ",".join(privileges) } } response = await client.request("POST", API_CORE_GROUP_ADD, data=group_data, operation="create_group_template") if response.get("result") != "saved": return json.dumps({"error": "Failed to create group template", "response": response}, indent=2) # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_template_creation") return json.dumps({ "result": "success", "message": f"Group template '{template_name}' created successfully", "uuid": response.get("uuid"), "privileges_assigned": privileges, "privilege_count": len(privileges) }, indent=2) except Exception as e: return await handle_tool_error(ctx, "setup_user_group_template", e) # ========== TRAFFIC SHAPING AND QoS MANAGEMENT ========== @mcp.tool(name="traffic_shaper_get_status", description="Get traffic shaper service status and statistics") async def traffic_shaper_get_status(ctx: Context) -> str: """Get traffic shaper service status and detailed statistics. Args: ctx: MCP context Returns: JSON string containing traffic shaper status and statistics """ try: client = await get_opnsense_client() # Get service status and statistics statistics_response = await client.request("GET", API_TRAFFICSHAPER_SERVICE_STATISTICS, operation="get_traffic_shaper_statistics") return json.dumps(statistics_response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_get_status", e) @mcp.tool(name="traffic_shaper_reconfigure", description="Apply traffic shaper configuration changes") async def traffic_shaper_reconfigure(ctx: Context) -> str: """Reconfigure and apply all traffic shaper changes. This should be called after making configuration changes to pipes, queues, or rules to ensure the changes take effect. Args: ctx: MCP context Returns: JSON string with reconfiguration status """ try: client = await get_opnsense_client() # Reconfigure the traffic shaper service response = await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_traffic_shaper") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_reconfigure", e) @mcp.tool(name="traffic_shaper_get_settings", description="Get general traffic shaper settings and configuration") async def traffic_shaper_get_settings(ctx: Context) -> str: """Get general traffic shaper settings and configuration. Args: ctx: MCP context Returns: JSON string with general traffic shaper settings """ try: client = await get_opnsense_client() response = await client.request("GET", API_TRAFFICSHAPER_SETTINGS_GET, operation="get_traffic_shaper_settings") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_get_settings", e) # ========== PIPE MANAGEMENT ========== @mcp.tool(name="traffic_shaper_list_pipes", description="List all traffic shaper pipes with optional filtering") async def traffic_shaper_list_pipes(ctx: Context) -> str: """List all traffic shaper pipes with their configurations. Args: ctx: MCP context Returns: JSON string with list of all pipes """ try: client = await get_opnsense_client() response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_SEARCH_PIPES, operation="search_traffic_shaper_pipes") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_list_pipes", e) @mcp.tool(name="traffic_shaper_get_pipe", description="Get details of a specific traffic shaper pipe") async def traffic_shaper_get_pipe(ctx: Context, pipe_uuid: Optional[str] = None) -> str: """Get details of a specific traffic shaper pipe or all pipes. Args: ctx: MCP context pipe_uuid: UUID of specific pipe to retrieve (optional - if not provided, returns all pipes) Returns: JSON string with pipe details """ try: client = await get_opnsense_client() # Validate UUID if provided if pipe_uuid: validate_uuid(pipe_uuid, "pipe_uuid") endpoint = f"{API_TRAFFICSHAPER_SETTINGS_GET_PIPE}/{pipe_uuid}" else: endpoint = API_TRAFFICSHAPER_SETTINGS_GET_PIPE response = await client.request("GET", endpoint, operation="get_traffic_shaper_pipe") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_get_pipe", e) @mcp.tool(name="traffic_shaper_create_pipe", description="Create a new traffic shaper pipe for bandwidth limiting") async def traffic_shaper_create_pipe( ctx: Context, bandwidth: int, bandwidth_metric: str = "Mbit/s", queue_size: int = 50, scheduler: str = "FIFO", description: str = "", enabled: bool = True ) -> str: """Create a new traffic shaper pipe with specified bandwidth limits. Args: ctx: MCP context bandwidth: Bandwidth limit (positive integer) bandwidth_metric: Bandwidth unit (bit/s, Kbit/s, Mbit/s, Gbit/s) queue_size: Queue size in slots (2-100) scheduler: Scheduler algorithm (FIFO, DRR, QFQ, FQ-CoDel, FQ-PIE) description: Description for the pipe enabled: Whether the pipe should be enabled Returns: JSON string with creation result and new pipe UUID """ try: client = await get_opnsense_client() # Validate parameters if bandwidth <= 0: raise ValidationError("Bandwidth must be a positive integer", context={"bandwidth": bandwidth}) if bandwidth_metric not in ["bit/s", "Kbit/s", "Mbit/s", "Gbit/s"]: raise ValidationError("Invalid bandwidth metric", context={"bandwidth_metric": bandwidth_metric, "valid_options": ["bit/s", "Kbit/s", "Mbit/s", "Gbit/s"]}) if not (2 <= queue_size <= 100): raise ValidationError("Queue size must be between 2 and 100", context={"queue_size": queue_size}) if scheduler not in ["FIFO", "DRR", "QFQ", "FQ-CoDel", "FQ-PIE"]: raise ValidationError("Invalid scheduler", context={"scheduler": scheduler, "valid_options": ["FIFO", "DRR", "QFQ", "FQ-CoDel", "FQ-PIE"]}) # Prepare pipe data pipe_data = { "pipe": { "enabled": "1" if enabled else "0", "bandwidth": str(bandwidth), "bandwidthMetric": bandwidth_metric, "queue": str(queue_size), "scheduler": scheduler, "description": description } } # Create the pipe response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_PIPE, data=pipe_data, operation="create_traffic_shaper_pipe") # Apply configuration if creation was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_pipe_create") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_create_pipe", e) @mcp.tool(name="traffic_shaper_update_pipe", description="Update an existing traffic shaper pipe configuration") async def traffic_shaper_update_pipe( ctx: Context, pipe_uuid: str, bandwidth: Optional[int] = None, bandwidth_metric: Optional[str] = None, queue_size: Optional[int] = None, scheduler: Optional[str] = None, description: Optional[str] = None, enabled: Optional[bool] = None ) -> str: """Update an existing traffic shaper pipe configuration. Args: ctx: MCP context pipe_uuid: UUID of the pipe to update bandwidth: Bandwidth limit (positive integer, optional) bandwidth_metric: Bandwidth unit (bit/s, Kbit/s, Mbit/s, Gbit/s, optional) queue_size: Queue size in slots (2-100, optional) scheduler: Scheduler algorithm (FIFO, DRR, QFQ, FQ-CoDel, FQ-PIE, optional) description: Description for the pipe (optional) enabled: Whether the pipe should be enabled (optional) Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(pipe_uuid, "pipe_uuid") # Get current pipe configuration current_pipe_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_PIPE}/{pipe_uuid}", operation="get_pipe_for_update") if "pipe" not in current_pipe_response: raise ResourceNotFoundError(f"Pipe with UUID {pipe_uuid} not found") current_pipe = current_pipe_response["pipe"] # Update only provided fields if bandwidth is not None: if bandwidth <= 0: raise ValidationError("Bandwidth must be a positive integer", context={"bandwidth": bandwidth}) current_pipe["bandwidth"] = str(bandwidth) if bandwidth_metric is not None: if bandwidth_metric not in ["bit/s", "Kbit/s", "Mbit/s", "Gbit/s"]: raise ValidationError("Invalid bandwidth metric", context={"bandwidth_metric": bandwidth_metric}) current_pipe["bandwidthMetric"] = bandwidth_metric if queue_size is not None: if not (2 <= queue_size <= 100): raise ValidationError("Queue size must be between 2 and 100", context={"queue_size": queue_size}) current_pipe["queue"] = str(queue_size) if scheduler is not None: if scheduler not in ["FIFO", "DRR", "QFQ", "FQ-CoDel", "FQ-PIE"]: raise ValidationError("Invalid scheduler", context={"scheduler": scheduler}) current_pipe["scheduler"] = scheduler if description is not None: current_pipe["description"] = description if enabled is not None: current_pipe["enabled"] = "1" if enabled else "0" # Prepare update data pipe_data = {"pipe": current_pipe} # Update the pipe response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_SET_PIPE}/{pipe_uuid}", data=pipe_data, operation="update_traffic_shaper_pipe") # Apply configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_pipe_update") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_update_pipe", e) @mcp.tool(name="traffic_shaper_delete_pipe", description="Delete a traffic shaper pipe") async def traffic_shaper_delete_pipe(ctx: Context, pipe_uuid: str) -> str: """Delete a traffic shaper pipe. Note: This will also delete any queues and rules that reference this pipe. Args: ctx: MCP context pipe_uuid: UUID of the pipe to delete Returns: JSON string with deletion result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(pipe_uuid, "pipe_uuid") # Delete the pipe response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_DEL_PIPE}/{pipe_uuid}", operation="delete_traffic_shaper_pipe") # Apply configuration if deletion was successful if response.get("result") == "deleted": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_pipe_delete") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_delete_pipe", e) @mcp.tool(name="traffic_shaper_toggle_pipe", description="Enable or disable a traffic shaper pipe") async def traffic_shaper_toggle_pipe(ctx: Context, pipe_uuid: str, enabled: bool) -> str: """Enable or disable a traffic shaper pipe. Args: ctx: MCP context pipe_uuid: UUID of the pipe to toggle enabled: True to enable, False to disable Returns: JSON string with toggle result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(pipe_uuid, "pipe_uuid") # Toggle the pipe enabled_int = 1 if enabled else 0 response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_TOGGLE_PIPE}/{pipe_uuid}/{enabled_int}", operation="toggle_traffic_shaper_pipe") # Apply configuration if toggle was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_pipe_toggle") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_toggle_pipe", e) # ========== QUEUE MANAGEMENT ========== @mcp.tool(name="traffic_shaper_list_queues", description="List all traffic shaper queues with optional filtering") async def traffic_shaper_list_queues(ctx: Context) -> str: """List all traffic shaper queues with their configurations. Args: ctx: MCP context Returns: JSON string with list of all queues """ try: client = await get_opnsense_client() response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_SEARCH_QUEUES, operation="search_traffic_shaper_queues") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_list_queues", e) @mcp.tool(name="traffic_shaper_get_queue", description="Get details of a specific traffic shaper queue") async def traffic_shaper_get_queue(ctx: Context, queue_uuid: Optional[str] = None) -> str: """Get details of a specific traffic shaper queue or all queues. Args: ctx: MCP context queue_uuid: UUID of specific queue to retrieve (optional - if not provided, returns all queues) Returns: JSON string with queue details """ try: client = await get_opnsense_client() # Validate UUID if provided if queue_uuid: validate_uuid(queue_uuid, "queue_uuid") endpoint = f"{API_TRAFFICSHAPER_SETTINGS_GET_QUEUE}/{queue_uuid}" else: endpoint = API_TRAFFICSHAPER_SETTINGS_GET_QUEUE response = await client.request("GET", endpoint, operation="get_traffic_shaper_queue") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_get_queue", e) @mcp.tool(name="traffic_shaper_create_queue", description="Create a new traffic shaper queue for weighted bandwidth sharing") async def traffic_shaper_create_queue( ctx: Context, pipe_uuid: str, weight: int = 10, description: str = "", enabled: bool = True ) -> str: """Create a new traffic shaper queue for weighted bandwidth sharing within a pipe. Args: ctx: MCP context pipe_uuid: UUID of the parent pipe that this queue belongs to weight: Weight for bandwidth allocation within pipe (1-100) description: Description for the queue enabled: Whether the queue should be enabled Returns: JSON string with creation result and new queue UUID """ try: client = await get_opnsense_client() # Validate parameters validate_uuid(pipe_uuid, "pipe_uuid") if not (1 <= weight <= 100): raise ValidationError("Weight must be between 1 and 100", context={"weight": weight}) # Verify the pipe exists pipe_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_PIPE}/{pipe_uuid}", operation="verify_pipe_exists") if "pipe" not in pipe_response: raise ResourceNotFoundError(f"Parent pipe with UUID {pipe_uuid} not found") # Prepare queue data queue_data = { "queue": { "enabled": "1" if enabled else "0", "pipe": pipe_uuid, "weight": str(weight), "description": description } } # Create the queue response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE, data=queue_data, operation="create_traffic_shaper_queue") # Apply configuration if creation was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_queue_create") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_create_queue", e) @mcp.tool(name="traffic_shaper_update_queue", description="Update an existing traffic shaper queue configuration") async def traffic_shaper_update_queue( ctx: Context, queue_uuid: str, pipe_uuid: Optional[str] = None, weight: Optional[int] = None, description: Optional[str] = None, enabled: Optional[bool] = None ) -> str: """Update an existing traffic shaper queue configuration. Args: ctx: MCP context queue_uuid: UUID of the queue to update pipe_uuid: UUID of the parent pipe (optional) weight: Weight for bandwidth allocation within pipe (1-100, optional) description: Description for the queue (optional) enabled: Whether the queue should be enabled (optional) Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUIDs validate_uuid(queue_uuid, "queue_uuid") if pipe_uuid: validate_uuid(pipe_uuid, "pipe_uuid") # Get current queue configuration current_queue_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_QUEUE}/{queue_uuid}", operation="get_queue_for_update") if "queue" not in current_queue_response: raise ResourceNotFoundError(f"Queue with UUID {queue_uuid} not found") current_queue = current_queue_response["queue"] # Update only provided fields if pipe_uuid is not None: # Verify the new pipe exists pipe_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_PIPE}/{pipe_uuid}", operation="verify_new_pipe_exists") if "pipe" not in pipe_response: raise ResourceNotFoundError(f"Parent pipe with UUID {pipe_uuid} not found") current_queue["pipe"] = pipe_uuid if weight is not None: if not (1 <= weight <= 100): raise ValidationError("Weight must be between 1 and 100", context={"weight": weight}) current_queue["weight"] = str(weight) if description is not None: current_queue["description"] = description if enabled is not None: current_queue["enabled"] = "1" if enabled else "0" # Prepare update data queue_data = {"queue": current_queue} # Update the queue response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_SET_QUEUE}/{queue_uuid}", data=queue_data, operation="update_traffic_shaper_queue") # Apply configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_queue_update") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_update_queue", e) @mcp.tool(name="traffic_shaper_delete_queue", description="Delete a traffic shaper queue") async def traffic_shaper_delete_queue(ctx: Context, queue_uuid: str) -> str: """Delete a traffic shaper queue. Note: This will also delete any rules that reference this queue. Args: ctx: MCP context queue_uuid: UUID of the queue to delete Returns: JSON string with deletion result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(queue_uuid, "queue_uuid") # Delete the queue response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_DEL_QUEUE}/{queue_uuid}", operation="delete_traffic_shaper_queue") # Apply configuration if deletion was successful if response.get("result") == "deleted": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_queue_delete") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_delete_queue", e) @mcp.tool(name="traffic_shaper_toggle_queue", description="Enable or disable a traffic shaper queue") async def traffic_shaper_toggle_queue(ctx: Context, queue_uuid: str, enabled: bool) -> str: """Enable or disable a traffic shaper queue. Args: ctx: MCP context queue_uuid: UUID of the queue to toggle enabled: True to enable, False to disable Returns: JSON string with toggle result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(queue_uuid, "queue_uuid") # Toggle the queue enabled_int = 1 if enabled else 0 response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_TOGGLE_QUEUE}/{queue_uuid}/{enabled_int}", operation="toggle_traffic_shaper_queue") # Apply configuration if toggle was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_queue_toggle") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_toggle_queue", e) # ========== RULE MANAGEMENT ========== @mcp.tool(name="traffic_shaper_list_rules", description="List all traffic shaper rules with optional filtering") async def traffic_shaper_list_rules(ctx: Context) -> str: """List all traffic shaper rules with their configurations. Args: ctx: MCP context Returns: JSON string with list of all rules """ try: client = await get_opnsense_client() response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_SEARCH_RULES, operation="search_traffic_shaper_rules") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_list_rules", e) @mcp.tool(name="traffic_shaper_get_rule", description="Get details of a specific traffic shaper rule") async def traffic_shaper_get_rule(ctx: Context, rule_uuid: Optional[str] = None) -> str: """Get details of a specific traffic shaper rule or all rules. Args: ctx: MCP context rule_uuid: UUID of specific rule to retrieve (optional - if not provided, returns all rules) Returns: JSON string with rule details """ try: client = await get_opnsense_client() # Validate UUID if provided if rule_uuid: validate_uuid(rule_uuid, "rule_uuid") endpoint = f"{API_TRAFFICSHAPER_SETTINGS_GET_RULE}/{rule_uuid}" else: endpoint = API_TRAFFICSHAPER_SETTINGS_GET_RULE response = await client.request("GET", endpoint, operation="get_traffic_shaper_rule") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_get_rule", e) @mcp.tool(name="traffic_shaper_create_rule", description="Create a new traffic shaper rule to apply QoS policies") async def traffic_shaper_create_rule( ctx: Context, target_uuid: str, interface: str, protocol: str = "IP", source: str = "any", destination: str = "any", sequence: int = 1, description: str = "", enabled: bool = True ) -> str: """Create a new traffic shaper rule to apply QoS policies to specific traffic flows. Args: ctx: MCP context target_uuid: UUID of the target pipe or queue to apply traffic shaping interface: Interface name where rule applies (e.g., 'wan', 'lan') protocol: Protocol to match (IP, TCP, UDP, ICMP, etc.) source: Source network/address (default: 'any') destination: Destination network/address (default: 'any') sequence: Rule evaluation order (1-1000000) description: Description for the rule enabled: Whether the rule should be enabled Returns: JSON string with creation result and new rule UUID """ try: client = await get_opnsense_client() # Validate parameters validate_uuid(target_uuid, "target_uuid") if not (1 <= sequence <= 1000000): raise ValidationError("Sequence must be between 1 and 1000000", context={"sequence": sequence}) # Verify the target (pipe or queue) exists target_exists = False try: pipe_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_PIPE}/{target_uuid}", operation="verify_target_pipe") if "pipe" in pipe_response: target_exists = True except ResourceNotFoundError: pass if not target_exists: try: queue_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_QUEUE}/{target_uuid}", operation="verify_target_queue") if "queue" in queue_response: target_exists = True except ResourceNotFoundError: pass if not target_exists: raise ResourceNotFoundError(f"Target pipe or queue with UUID {target_uuid} not found") # Prepare rule data rule_data = { "rule": { "enabled": "1" if enabled else "0", "sequence": str(sequence), "interface": interface, "protocol": protocol, "source": source, "destination": destination, "target": target_uuid, "description": description } } # Create the rule response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=rule_data, operation="create_traffic_shaper_rule") # Apply configuration if creation was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_rule_create") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_create_rule", e) @mcp.tool(name="traffic_shaper_update_rule", description="Update an existing traffic shaper rule configuration") async def traffic_shaper_update_rule( ctx: Context, rule_uuid: str, target_uuid: Optional[str] = None, interface: Optional[str] = None, protocol: Optional[str] = None, source: Optional[str] = None, destination: Optional[str] = None, sequence: Optional[int] = None, description: Optional[str] = None, enabled: Optional[bool] = None ) -> str: """Update an existing traffic shaper rule configuration. Args: ctx: MCP context rule_uuid: UUID of the rule to update target_uuid: UUID of the target pipe or queue (optional) interface: Interface name (optional) protocol: Protocol to match (optional) source: Source network/address (optional) destination: Destination network/address (optional) sequence: Rule evaluation order (1-1000000, optional) description: Description for the rule (optional) enabled: Whether the rule should be enabled (optional) Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUIDs validate_uuid(rule_uuid, "rule_uuid") if target_uuid: validate_uuid(target_uuid, "target_uuid") # Get current rule configuration current_rule_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_RULE}/{rule_uuid}", operation="get_rule_for_update") if "rule" not in current_rule_response: raise ResourceNotFoundError(f"Rule with UUID {rule_uuid} not found") current_rule = current_rule_response["rule"] # Update only provided fields if target_uuid is not None: # Verify the new target exists target_exists = False try: pipe_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_PIPE}/{target_uuid}", operation="verify_new_target_pipe") if "pipe" in pipe_response: target_exists = True except ResourceNotFoundError: pass if not target_exists: try: queue_response = await client.request("GET", f"{API_TRAFFICSHAPER_SETTINGS_GET_QUEUE}/{target_uuid}", operation="verify_new_target_queue") if "queue" in queue_response: target_exists = True except ResourceNotFoundError: pass if not target_exists: raise ResourceNotFoundError(f"Target pipe or queue with UUID {target_uuid} not found") current_rule["target"] = target_uuid if interface is not None: current_rule["interface"] = interface if protocol is not None: current_rule["protocol"] = protocol if source is not None: current_rule["source"] = source if destination is not None: current_rule["destination"] = destination if sequence is not None: if not (1 <= sequence <= 1000000): raise ValidationError("Sequence must be between 1 and 1000000", context={"sequence": sequence}) current_rule["sequence"] = str(sequence) if description is not None: current_rule["description"] = description if enabled is not None: current_rule["enabled"] = "1" if enabled else "0" # Prepare update data rule_data = {"rule": current_rule} # Update the rule response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_SET_RULE}/{rule_uuid}", data=rule_data, operation="update_traffic_shaper_rule") # Apply configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_rule_update") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_update_rule", e) @mcp.tool(name="traffic_shaper_delete_rule", description="Delete a traffic shaper rule") async def traffic_shaper_delete_rule(ctx: Context, rule_uuid: str) -> str: """Delete a traffic shaper rule. Args: ctx: MCP context rule_uuid: UUID of the rule to delete Returns: JSON string with deletion result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(rule_uuid, "rule_uuid") # Delete the rule response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_DEL_RULE}/{rule_uuid}", operation="delete_traffic_shaper_rule") # Apply configuration if deletion was successful if response.get("result") == "deleted": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_rule_delete") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_delete_rule", e) @mcp.tool(name="traffic_shaper_toggle_rule", description="Enable or disable a traffic shaper rule") async def traffic_shaper_toggle_rule(ctx: Context, rule_uuid: str, enabled: bool) -> str: """Enable or disable a traffic shaper rule. Args: ctx: MCP context rule_uuid: UUID of the rule to toggle enabled: True to enable, False to disable Returns: JSON string with toggle result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(rule_uuid, "rule_uuid") # Toggle the rule enabled_int = 1 if enabled else 0 response = await client.request("POST", f"{API_TRAFFICSHAPER_SETTINGS_TOGGLE_RULE}/{rule_uuid}/{enabled_int}", operation="toggle_traffic_shaper_rule") # Apply configuration if toggle was successful if response.get("result") == "saved": await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_rule_toggle") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_toggle_rule", e) # ========== COMMON QOS USE CASE HELPERS ========== @mcp.tool(name="traffic_shaper_limit_user_bandwidth", description="Helper to create per-user bandwidth limiting setup") async def traffic_shaper_limit_user_bandwidth( ctx: Context, user_ip: str, download_limit_mbps: int, upload_limit_mbps: int, interface: str = "lan", description: str = "" ) -> str: """Helper tool to quickly set up per-user bandwidth limiting. This creates a complete bandwidth limiting setup for a specific user: - Download pipe (for traffic TO the user) - Upload pipe (for traffic FROM the user) - Download rule (targeting user as destination) - Upload rule (targeting user as source) Args: ctx: MCP context user_ip: IP address of the user to limit download_limit_mbps: Download bandwidth limit in Mbps upload_limit_mbps: Upload bandwidth limit in Mbps interface: Interface where rules apply (default: 'lan') description: Description prefix for created objects Returns: JSON string with created objects (pipes and rules) """ try: client = await get_opnsense_client() results = { "download_pipe": None, "upload_pipe": None, "download_rule": None, "upload_rule": None } desc_prefix = description or f"User {user_ip} bandwidth limit" # Create download pipe (traffic TO user) download_pipe_data = { "pipe": { "enabled": "1", "bandwidth": str(download_limit_mbps), "bandwidthMetric": "Mbit/s", "queue": "50", "scheduler": "FQ-CoDel", "description": f"{desc_prefix} - Download" } } download_pipe_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_PIPE, data=download_pipe_data, operation="create_user_download_pipe") results["download_pipe"] = download_pipe_response # Create upload pipe (traffic FROM user) upload_pipe_data = { "pipe": { "enabled": "1", "bandwidth": str(upload_limit_mbps), "bandwidthMetric": "Mbit/s", "queue": "50", "scheduler": "FQ-CoDel", "description": f"{desc_prefix} - Upload" } } upload_pipe_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_PIPE, data=upload_pipe_data, operation="create_user_upload_pipe") results["upload_pipe"] = upload_pipe_response # Get the UUIDs of the created pipes download_pipe_uuid = download_pipe_response.get("uuid") upload_pipe_uuid = upload_pipe_response.get("uuid") if download_pipe_uuid and upload_pipe_uuid: # Create download rule (traffic TO user as destination) download_rule_data = { "rule": { "enabled": "1", "sequence": "1000", "interface": interface, "protocol": "IP", "source": "any", "destination": user_ip, "target": download_pipe_uuid, "description": f"{desc_prefix} - Download Rule" } } download_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=download_rule_data, operation="create_user_download_rule") results["download_rule"] = download_rule_response # Create upload rule (traffic FROM user as source) upload_rule_data = { "rule": { "enabled": "1", "sequence": "1001", "interface": interface, "protocol": "IP", "source": user_ip, "destination": "any", "target": upload_pipe_uuid, "description": f"{desc_prefix} - Upload Rule" } } upload_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=upload_rule_data, operation="create_user_upload_rule") results["upload_rule"] = upload_rule_response # Apply configuration await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_user_bandwidth_setup") return json.dumps(results, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_limit_user_bandwidth", e) @mcp.tool(name="traffic_shaper_prioritize_voip", description="Helper to set up VoIP traffic prioritization") async def traffic_shaper_prioritize_voip( ctx: Context, total_bandwidth_mbps: int, voip_bandwidth_mbps: int, voip_ports: str = "5060,10000-20000", interface: str = "wan" ) -> str: """Helper tool to set up VoIP traffic prioritization with guaranteed bandwidth. This creates a complete VoIP QoS setup: - Main bandwidth pipe for total connection - High-priority queue for VoIP traffic - Best-effort queue for other traffic - Rules to classify VoIP traffic by port Args: ctx: MCP context total_bandwidth_mbps: Total connection bandwidth in Mbps voip_bandwidth_mbps: Guaranteed bandwidth for VoIP in Mbps voip_ports: VoIP ports to prioritize (default: SIP + RTP range) interface: Interface where rules apply (default: 'wan') Returns: JSON string with created objects (pipe, queues, rules) """ try: client = await get_opnsense_client() results = { "main_pipe": None, "voip_queue": None, "data_queue": None, "voip_rule": None, "data_rule": None } # Create main pipe with total bandwidth main_pipe_data = { "pipe": { "enabled": "1", "bandwidth": str(total_bandwidth_mbps), "bandwidthMetric": "Mbit/s", "queue": "100", "scheduler": "FQ-CoDel", "description": "VoIP Main Bandwidth Pipe" } } main_pipe_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_PIPE, data=main_pipe_data, operation="create_voip_main_pipe") results["main_pipe"] = main_pipe_response main_pipe_uuid = main_pipe_response.get("uuid") if main_pipe_uuid: # Create high-priority VoIP queue (weight 90) voip_queue_data = { "queue": { "enabled": "1", "pipe": main_pipe_uuid, "weight": "90", "description": f"VoIP Priority Queue ({voip_bandwidth_mbps} Mbps guaranteed)" } } voip_queue_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE, data=voip_queue_data, operation="create_voip_priority_queue") results["voip_queue"] = voip_queue_response voip_queue_uuid = voip_queue_response.get("uuid") # Create best-effort data queue (weight 10) data_queue_data = { "queue": { "enabled": "1", "pipe": main_pipe_uuid, "weight": "10", "description": "Best Effort Data Queue" } } data_queue_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE, data=data_queue_data, operation="create_data_best_effort_queue") results["data_queue"] = data_queue_response data_queue_uuid = data_queue_response.get("uuid") if voip_queue_uuid and data_queue_uuid: # Create VoIP rule (high priority - sequence 100) voip_rule_data = { "rule": { "enabled": "1", "sequence": "100", "interface": interface, "protocol": "UDP", "source": "any", "destination": f"any:{voip_ports}", "target": voip_queue_uuid, "description": f"VoIP Traffic Priority (ports {voip_ports})" } } voip_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=voip_rule_data, operation="create_voip_priority_rule") results["voip_rule"] = voip_rule_response # Create catch-all data rule (low priority - sequence 9999) data_rule_data = { "rule": { "enabled": "1", "sequence": "9999", "interface": interface, "protocol": "IP", "source": "any", "destination": "any", "target": data_queue_uuid, "description": "Default Data Traffic (Best Effort)" } } data_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=data_rule_data, operation="create_data_best_effort_rule") results["data_rule"] = data_rule_response # Apply configuration await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_voip_setup") return json.dumps(results, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_prioritize_voip", e) @mcp.tool(name="traffic_shaper_setup_gaming_priority", description="Helper to optimize traffic for gaming") async def traffic_shaper_setup_gaming_priority( ctx: Context, total_bandwidth_mbps: int, gaming_ports: str = "3478-3480,27000-27050", interface: str = "wan" ) -> str: """Helper tool to set up gaming traffic optimization with low latency priority. This creates a gaming-optimized QoS setup: - Main bandwidth pipe with optimized scheduler - High-priority queue for gaming traffic (low latency) - Medium-priority queue for interactive traffic - Low-priority queue for bulk downloads - Rules to classify traffic appropriately Args: ctx: MCP context total_bandwidth_mbps: Total connection bandwidth in Mbps gaming_ports: Gaming ports to prioritize (default: Steam, Xbox Live, etc.) interface: Interface where rules apply (default: 'wan') Returns: JSON string with created objects (pipe, queues, rules) """ try: client = await get_opnsense_client() results = { "main_pipe": None, "gaming_queue": None, "interactive_queue": None, "bulk_queue": None, "gaming_rule": None, "interactive_rule": None, "bulk_rule": None } # Create main pipe optimized for gaming (low latency scheduler) main_pipe_data = { "pipe": { "enabled": "1", "bandwidth": str(total_bandwidth_mbps), "bandwidthMetric": "Mbit/s", "queue": "25", # Smaller queue for lower latency "scheduler": "FQ-CoDel", # Best for gaming latency "description": "Gaming Optimized Main Pipe" } } main_pipe_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_PIPE, data=main_pipe_data, operation="create_gaming_main_pipe") results["main_pipe"] = main_pipe_response main_pipe_uuid = main_pipe_response.get("uuid") if main_pipe_uuid: # Create gaming queue (weight 70 - highest priority) gaming_queue_data = { "queue": { "enabled": "1", "pipe": main_pipe_uuid, "weight": "70", "description": "Gaming Traffic - Highest Priority" } } gaming_queue_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE, data=gaming_queue_data, operation="create_gaming_priority_queue") results["gaming_queue"] = gaming_queue_response gaming_queue_uuid = gaming_queue_response.get("uuid") # Create interactive queue (weight 25 - medium priority for web, SSH, etc.) interactive_queue_data = { "queue": { "enabled": "1", "pipe": main_pipe_uuid, "weight": "25", "description": "Interactive Traffic - Medium Priority" } } interactive_queue_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE, data=interactive_queue_data, operation="create_interactive_priority_queue") results["interactive_queue"] = interactive_queue_response interactive_queue_uuid = interactive_queue_response.get("uuid") # Create bulk download queue (weight 5 - lowest priority) bulk_queue_data = { "queue": { "enabled": "1", "pipe": main_pipe_uuid, "weight": "5", "description": "Bulk Downloads - Lowest Priority" } } bulk_queue_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE, data=bulk_queue_data, operation="create_bulk_download_queue") results["bulk_queue"] = bulk_queue_response bulk_queue_uuid = bulk_queue_response.get("uuid") if gaming_queue_uuid and interactive_queue_uuid and bulk_queue_uuid: # Create gaming rule (highest priority - sequence 50) gaming_rule_data = { "rule": { "enabled": "1", "sequence": "50", "interface": interface, "protocol": "UDP", "source": "any", "destination": f"any:{gaming_ports}", "target": gaming_queue_uuid, "description": f"Gaming Traffic Priority (ports {gaming_ports})" } } gaming_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=gaming_rule_data, operation="create_gaming_priority_rule") results["gaming_rule"] = gaming_rule_response # Create interactive rule (medium priority - sequence 500) interactive_rule_data = { "rule": { "enabled": "1", "sequence": "500", "interface": interface, "protocol": "TCP", "source": "any", "destination": "any:22,53,80,443", "target": interactive_queue_uuid, "description": "Interactive Traffic (SSH, DNS, HTTP, HTTPS)" } } interactive_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=interactive_rule_data, operation="create_interactive_priority_rule") results["interactive_rule"] = interactive_rule_response # Create bulk download rule (lowest priority - sequence 9999) bulk_rule_data = { "rule": { "enabled": "1", "sequence": "9999", "interface": interface, "protocol": "IP", "source": "any", "destination": "any", "target": bulk_queue_uuid, "description": "Bulk Downloads and Default Traffic" } } bulk_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=bulk_rule_data, operation="create_bulk_download_rule") results["bulk_rule"] = bulk_rule_response # Apply configuration await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_gaming_setup") return json.dumps(results, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_setup_gaming_priority", e) @mcp.tool(name="traffic_shaper_create_guest_limits", description="Helper to set up guest network bandwidth limitations") async def traffic_shaper_create_guest_limits( ctx: Context, guest_network: str, max_bandwidth_mbps: int, per_user_limit_mbps: Optional[int] = None, interface: str = "lan" ) -> str: """Helper tool to set up bandwidth limitations for guest networks. This creates guest network QoS setup: - Total bandwidth pipe for the entire guest network - Per-user limitation queue (if specified) - Rules to apply limits to guest network traffic Args: ctx: MCP context guest_network: Guest network CIDR (e.g., "192.168.100.0/24") max_bandwidth_mbps: Maximum total bandwidth for guest network in Mbps per_user_limit_mbps: Optional per-user bandwidth limit in Mbps interface: Interface where rules apply (default: 'lan') Returns: JSON string with created objects (pipes, queues, rules) """ try: client = await get_opnsense_client() results = { "guest_pipe": None, "guest_queue": None, "guest_download_rule": None, "guest_upload_rule": None } # Create main guest network pipe guest_pipe_data = { "pipe": { "enabled": "1", "bandwidth": str(max_bandwidth_mbps), "bandwidthMetric": "Mbit/s", "queue": "50", "scheduler": "FQ-CoDel", "description": f"Guest Network Bandwidth Limit ({guest_network})" } } guest_pipe_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_PIPE, data=guest_pipe_data, operation="create_guest_network_pipe") results["guest_pipe"] = guest_pipe_response guest_pipe_uuid = guest_pipe_response.get("uuid") target_uuid = guest_pipe_uuid # If per-user limits are specified, create a queue if per_user_limit_mbps and guest_pipe_uuid: guest_queue_data = { "queue": { "enabled": "1", "pipe": guest_pipe_uuid, "weight": "50", "description": f"Guest Per-User Queue ({per_user_limit_mbps} Mbps limit)" } } guest_queue_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_QUEUE, data=guest_queue_data, operation="create_guest_user_queue") results["guest_queue"] = guest_queue_response target_uuid = guest_queue_response.get("uuid") or guest_pipe_uuid if target_uuid: # Create guest download rule (traffic TO guest network) guest_download_rule_data = { "rule": { "enabled": "1", "sequence": "200", "interface": interface, "protocol": "IP", "source": "any", "destination": guest_network, "target": target_uuid, "description": f"Guest Network Download Limit ({guest_network})" } } guest_download_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=guest_download_rule_data, operation="create_guest_download_rule") results["guest_download_rule"] = guest_download_rule_response # Create guest upload rule (traffic FROM guest network) guest_upload_rule_data = { "rule": { "enabled": "1", "sequence": "201", "interface": interface, "protocol": "IP", "source": guest_network, "destination": "any", "target": target_uuid, "description": f"Guest Network Upload Limit ({guest_network})" } } guest_upload_rule_response = await client.request("POST", API_TRAFFICSHAPER_SETTINGS_ADD_RULE, data=guest_upload_rule_data, operation="create_guest_upload_rule") results["guest_upload_rule"] = guest_upload_rule_response # Apply configuration await client.request("POST", API_TRAFFICSHAPER_SERVICE_RECONFIGURE, operation="reconfigure_after_guest_setup") return json.dumps(results, indent=2) except Exception as e: return await handle_tool_error(ctx, "traffic_shaper_create_guest_limits", e) # --- End Traffic Shaping and QoS Management --- # Entry point if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/floriangrousset/opnsense-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server