"""
OPNsense MCP Server - System Domain
This module provides system management tools for OPNsense including system status monitoring,
health metrics, service management, plugin management, backups, and security auditing.
Tools included:
- get_system_status: Get OPNsense system status
- get_system_health: Get system health metrics (CPU, memory, disk, temperature)
- get_system_routes: Get system routing table
- restart_service: Restart an OPNsense service
- backup_config: Create a backup of the OPNsense configuration
- list_plugins: List installed plugins
- install_plugin: Install a plugin
- perform_firewall_audit: Perform comprehensive security audit
"""
import json
import logging
from typing import List, Dict, Any
from mcp.server.fastmcp import Context
from ..main import mcp, server_state
from ..core import OPNsenseClient
from ..core.exceptions import (
ConfigurationError,
AuthenticationError,
NetworkError,
APIError,
)
from ..shared.constants import (
API_CORE_FIRMWARE_STATUS,
API_CORE_SYSTEM_INFO,
API_CORE_SERVICE_SEARCH,
API_CORE_SERVICE_RESTART,
API_CORE_BACKUP_DOWNLOAD,
API_ROUTES_GET,
API_DIAGNOSTICS_SYSTEM_PROCESSOR,
API_DIAGNOSTICS_SYSTEM_MEMORY,
API_DIAGNOSTICS_SYSTEM_STORAGE,
API_DIAGNOSTICS_SYSTEM_TEMPERATURE,
API_CORE_FIRMWARE_PLUGINS,
API_CORE_FIRMWARE_INSTALL,
API_FIREWALL_FILTER_SEARCH_RULE,
API_INTERFACES_OVERVIEW_INFO,
)
from .configuration import get_opnsense_client
logger = logging.getLogger("opnsense-mcp")
# ========== HELPER FUNCTIONS ==========
async def _get_all_rules(client: OPNsenseClient) -> List[Dict[str, Any]]:
"""Helper to fetch all firewall rules using pagination."""
all_rules = []
current_page = 1
rows_per_page = 500 # Fetch in larger batches
while True:
try:
response = await client.request(
"POST",
API_FIREWALL_FILTER_SEARCH_RULE,
data={
"current": current_page,
"rowCount": rows_per_page,
"searchPhrase": ""
}
)
rules = response.get("rows", [])
if not rules:
break
all_rules.extend(rules)
if len(rules) < rows_per_page:
break # Last page
current_page += 1
except Exception as e:
logger.error(f"Error fetching page {current_page} of firewall rules: {e}", exc_info=True)
# Return what we have so far, audit can proceed with partial data
break
return all_rules
async def _get_wan_interfaces(client: OPNsenseClient) -> List[str]:
"""Helper to identify WAN interfaces."""
wan_interfaces = []
try:
interfaces_info = await client.request("GET", API_INTERFACES_OVERVIEW_INFO)
for if_name, if_data in interfaces_info.items():
# Heuristic: Interface is likely WAN if it has a gateway and isn't loopback/internal
# OPNsense often names the default WAN 'wan' but users can rename it.
# Checking for a non-empty gateway field is a common indicator.
if if_data.get("gateway") and if_data.get("gateway") != "none":
wan_interfaces.append(if_name)
# Fallback: Explicitly check for common WAN names if gateway check fails
elif if_name.lower() == 'wan' and not wan_interfaces:
wan_interfaces.append(if_name)
except Exception as e:
logger.error(f"Error fetching interfaces info for audit: {e}", exc_info=True)
# If still no WAN identified, maybe return a default guess? For now, return empty.
if not wan_interfaces:
logger.warning("Could not reliably identify WAN interfaces for audit.")
return wan_interfaces
# ========== SYSTEM TOOLS ==========
@mcp.tool(name="get_system_status", description="Get OPNsense system status")
async def get_system_status(ctx: Context) -> str:
"""Get OPNsense system status.
Args:
ctx: MCP context
Returns:
Formatted system status information
"""
try:
client = await get_opnsense_client()
# Get firmware status
firmware = await client.request("GET", API_CORE_FIRMWARE_STATUS)
# Get system information
system_info = await client.request("GET", API_CORE_SYSTEM_INFO)
# Get service status
services = await client.request(
"POST",
API_CORE_SERVICE_SEARCH,
data={"current": 1, "rowCount": -1, "searchPhrase": ""}
)
# Format and return the combined status
status = {
"firmware": firmware,
"system": system_info,
"services": services.get("rows", [])
}
return json.dumps(status, indent=2)
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except (AuthenticationError, NetworkError, APIError) as e:
logger.error(f"Error in get_system_status: {str(e)}", exc_info=True)
await ctx.error(f"Error fetching system status: {str(e)}")
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error in get_system_status: {str(e)}", exc_info=True)
await ctx.error(f"Unexpected error: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool(name="get_system_health", description="Get system health metrics")
async def get_system_health(ctx: Context) -> str:
"""Get system health metrics.
Args:
ctx: MCP context
Returns:
JSON string of system health metrics
"""
try:
client = await get_opnsense_client()
# Get multiple health metrics
cpu = await client.request("GET", API_DIAGNOSTICS_SYSTEM_PROCESSOR)
memory = await client.request("GET", API_DIAGNOSTICS_SYSTEM_MEMORY)
disk = await client.request("GET", API_DIAGNOSTICS_SYSTEM_STORAGE)
temperature = await client.request("GET", API_DIAGNOSTICS_SYSTEM_TEMPERATURE)
# Combine results
return json.dumps({
"cpu": cpu,
"memory": memory,
"disk": disk,
"temperature": temperature
}, indent=2)
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except (AuthenticationError, NetworkError, APIError) as e:
logger.error(f"Error in get_system_health: {str(e)}", exc_info=True)
await ctx.error(f"Error fetching system health: {str(e)}")
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error in get_system_health: {str(e)}", exc_info=True)
await ctx.error(f"Error fetching system health: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool(name="get_system_routes", description="Get system routing table")
async def get_system_routes(ctx: Context) -> str:
"""Get system routing table.
Args:
ctx: MCP context
Returns:
JSON string of system routes
"""
try:
client = await get_opnsense_client()
response = await client.request("GET", API_ROUTES_GET)
return json.dumps(response, indent=2)
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except (AuthenticationError, NetworkError, APIError) as e:
logger.error(f"Error in get_system_routes: {str(e)}", exc_info=True)
await ctx.error(f"Error fetching system routes: {str(e)}")
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error in get_system_routes: {str(e)}", exc_info=True)
await ctx.error(f"Error fetching system routes: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool(name="restart_service", description="Restart an OPNsense service")
async def restart_service(ctx: Context, service_name: str) -> str:
"""Restart an OPNsense service.
Args:
ctx: MCP context
service_name: Name of the service to restart
Returns:
JSON string with the result
"""
try:
client = await get_opnsense_client()
response = await client.request(
"POST",
f"{API_CORE_SERVICE_RESTART}/{service_name}"
)
return json.dumps(response, indent=2)
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except (AuthenticationError, NetworkError, APIError) as e:
logger.error(f"Error in restart_service (service: {service_name}): {str(e)}", exc_info=True)
await ctx.error(f"Error restarting service: {str(e)}")
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error in restart_service (service: {service_name}): {str(e)}", exc_info=True)
await ctx.error(f"Error restarting service: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool(name="backup_config", description="Create a backup of the OPNsense configuration")
async def backup_config(ctx: Context) -> str:
"""Create a backup of the OPNsense configuration.
Args:
ctx: MCP context
Returns:
JSON string with the result
"""
try:
client = await get_opnsense_client()
response = await client.request("POST", API_CORE_BACKUP_DOWNLOAD)
return json.dumps(response, indent=2)
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except (AuthenticationError, NetworkError, APIError) as e:
logger.error(f"Error in backup_config: {str(e)}", exc_info=True)
await ctx.error(f"Error creating backup: {str(e)}")
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error in backup_config: {str(e)}", exc_info=True)
await ctx.error(f"Error creating backup: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool(name="list_plugins", description="List installed plugins")
async def list_plugins(ctx: Context) -> str:
"""List installed plugins.
Args:
ctx: MCP context
Returns:
JSON string of installed plugins
"""
try:
client = await get_opnsense_client()
response = await client.request("GET", API_CORE_FIRMWARE_PLUGINS)
return json.dumps(response, indent=2)
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except (AuthenticationError, NetworkError, APIError) as e:
logger.error(f"Error in list_plugins: {str(e)}", exc_info=True)
await ctx.error(f"Error listing plugins: {str(e)}")
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error in list_plugins: {str(e)}", exc_info=True)
await ctx.error(f"Error listing plugins: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool(name="install_plugin", description="Install a plugin")
async def install_plugin(ctx: Context, plugin_name: str) -> str:
"""Install a plugin.
Args:
ctx: MCP context
plugin_name: Name of the plugin to install
Returns:
JSON string with the result
"""
try:
client = await get_opnsense_client()
response = await client.request(
"POST",
f"{API_CORE_FIRMWARE_INSTALL}/{plugin_name}"
)
return json.dumps(response, indent=2)
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except (AuthenticationError, NetworkError, APIError) as e:
logger.error(f"Error in install_plugin (plugin: {plugin_name}): {str(e)}", exc_info=True)
await ctx.error(f"Error installing plugin: {str(e)}")
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error in install_plugin (plugin: {plugin_name}): {str(e)}", exc_info=True)
await ctx.error(f"Error installing plugin: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool(name="perform_firewall_audit", description="Performs a basic security audit of the OPNsense configuration.")
async def perform_firewall_audit(ctx: Context) -> str:
"""Performs a basic security audit of the OPNsense configuration.
Checks for common potential security issues like outdated firmware/plugins,
management access from WAN, overly permissive rules, etc.
Args:
ctx: MCP context
Returns:
JSON string containing a list of audit findings.
"""
try:
client = await get_opnsense_client()
findings = []
await ctx.info("Starting OPNsense firewall audit...")
# --- Fetch Data ---
await ctx.info("Fetching required data (firmware, rules, interfaces, services)...")
firmware_status = await client.request("GET", API_CORE_FIRMWARE_STATUS)
all_rules = await _get_all_rules(client)
wan_interfaces = await _get_wan_interfaces(client)
services_response = await client.request(
"POST",
API_CORE_SERVICE_SEARCH,
data={"current": 1, "rowCount": -1, "searchPhrase": ""} # Fetch all services
)
running_services = {svc['name']: svc for svc in services_response.get("rows", []) if svc.get('running') == 1}
await ctx.info(f"Identified WAN interfaces: {wan_interfaces or 'None'}")
await ctx.info(f"Fetched {len(all_rules)} firewall rules.")
# --- Perform Checks ---
# 1. Firmware Update Check
if firmware_status.get("status") == "update_available":
findings.append({
"check": "Firmware Update",
"severity": "Medium",
"description": f"Firmware update available. Current: {firmware_status.get('product_version', 'N/A')}, New: {firmware_status.get('product_new_version', 'N/A')}",
"recommendation": "Consider updating OPNsense firmware via the GUI (System -> Firmware -> Updates)."
})
else:
findings.append({
"check": "Firmware Update",
"severity": "Info",
"description": "Firmware appears to be up-to-date.",
"recommendation": None
})
# 2. Plugin Update Check
plugin_updates = firmware_status.get("upgrade_packages", [])
if plugin_updates:
plugin_names = [p.get('name', 'N/A') for p in plugin_updates]
findings.append({
"check": "Plugin Updates",
"severity": "Medium",
"description": f"Updates available for {len(plugin_updates)} plugins: {', '.join(plugin_names)}",
"recommendation": "Consider updating plugins via the GUI (System -> Firmware -> Updates)."
})
else:
findings.append({
"check": "Plugin Updates",
"severity": "Info",
"description": "Installed plugins appear to be up-to-date.",
"recommendation": None
})
# 3. WAN Management Access Check
management_ports = {'80', '443', '22'} # HTTP, HTTPS, SSH
insecure_protocols = {'21', '23'} # FTP, Telnet
wan_mgmt_rules = []
wan_insecure_proto_rules = []
wan_any_any_rules = []
block_rules_no_log = []
for rule in all_rules:
# Skip disabled rules
if not rule.get('enabled', '0') == '1':
continue
interface = rule.get('interface')
is_wan_rule = interface in wan_interfaces
# Check logging on block/reject rules
if rule.get('action') in ['block', 'reject'] and not rule.get('log', '0') == '1':
block_rules_no_log.append(rule.get("descr", rule.get("uuid", "N/A")))
if not is_wan_rule:
continue # Only check WAN rules for the following
# Basic parsing - assumes 'any' if specific fields are missing/empty
src_net = rule.get("source_net", "any")
dst_net = rule.get("destination_net", "any")
dst_port = rule.get("destination_port", "any")
protocol = rule.get("protocol", "any").lower()
action = rule.get('action')
# Check Any-Any rule
if action == 'pass' and src_net == 'any' and dst_net == 'any' and dst_port == 'any':
wan_any_any_rules.append(rule.get("descr", rule.get("uuid", "N/A")))
# Check Management Access
# Simplified: Checks if dest port is one of the management ports
# Doesn't check destination address (assumes firewall itself)
if action == 'pass' and dst_port in management_ports:
wan_mgmt_rules.append(rule.get("descr", rule.get("uuid", "N/A")))
# Check Insecure Protocols
if action == 'pass' and dst_port in insecure_protocols:
wan_insecure_proto_rules.append(rule.get("descr", rule.get("uuid", "N/A")))
if wan_mgmt_rules:
findings.append({
"check": "WAN Management Access",
"severity": "High",
"description": f"Potential firewall rules allowing management access (HTTP/HTTPS/SSH) from WAN found: {', '.join(wan_mgmt_rules)}",
"recommendation": "Review these rules. Exposing management interfaces to the WAN is highly discouraged. Use VPNs for remote access."
})
if wan_any_any_rules:
findings.append({
"check": "WAN Allow Any-Any",
"severity": "High",
"description": f"Potential 'allow any source to any destination' rules found on WAN interface(s): {', '.join(wan_any_any_rules)}",
"recommendation": "Review these rules. 'Allow any-any' rules on WAN are extremely dangerous and likely misconfigured."
})
if wan_insecure_proto_rules:
findings.append({
"check": "WAN Insecure Protocols",
"severity": "High",
"description": f"Potential rules allowing insecure protocols (e.g., Telnet, FTP) from WAN found: {', '.join(wan_insecure_proto_rules)}",
"recommendation": "Review these rules. Avoid using insecure protocols, especially over the WAN."
})
if block_rules_no_log:
findings.append({
"check": "Firewall Log Settings",
"severity": "Low",
"description": f"{len(block_rules_no_log)} firewall rule(s) that block or reject traffic do not have logging enabled (Examples: {', '.join(block_rules_no_log[:3])}{'...' if len(block_rules_no_log) > 3 else ''}).",
"recommendation": "Consider enabling logging on block/reject rules (especially the default deny, if applicable) to monitor potential malicious activity."
})
else:
findings.append({
"check": "Firewall Log Settings",
"severity": "Info",
"description": "Block/reject rules checked appear to have logging enabled.",
"recommendation": None
})
# 4. Check for enabled UPnP service
if "miniupnpd" in running_services:
findings.append({
"check": "UPnP Service",
"severity": "Low",
"description": "The UPnP (Universal Plug and Play) service is enabled and running.",
"recommendation": "Ensure UPnP is intentionally enabled and configured securely if needed. Disable it if unused, as it can potentially open ports automatically."
})
await ctx.info("Firewall audit checks complete.")
except ConfigurationError as e:
await ctx.error(str(e))
return f"Configuration Error: {str(e)}"
except Exception as e:
logger.error(f"Error during firewall audit: {str(e)}", exc_info=True)
await ctx.error(f"Error performing firewall audit: {str(e)}")
# Return partial findings if any were collected before the error
if findings:
findings.append({
"check": "Audit Error",
"severity": "Critical",
"description": f"An error occurred during the audit: {str(e)}. Results may be incomplete.",
"recommendation": "Check server logs for details."
})
return json.dumps({"audit_findings": findings}, indent=2)
else:
return json.dumps({"error": f"Failed to perform audit: {str(e)}"}, indent=2)
return json.dumps({"audit_findings": findings}, indent=2)