Skip to main content
Glama
firewall.py15.6 kB
""" OPNsense MCP Server - Firewall Domain This module provides tools for managing OPNsense firewall rules and aliases. It handles firewall rule creation, modification, deletion, and alias management for IP address grouping and simplified rule management. """ import json import logging import urllib.parse from typing import Optional from mcp.server.fastmcp import Context from ..main import mcp from ..core import OPNsenseClient, ValidationError from ..shared.constants import ( API_FIREWALL_FILTER_SEARCH_RULE, API_FIREWALL_FILTER_ADD_RULE, API_FIREWALL_FILTER_DEL_RULE, API_FIREWALL_FILTER_TOGGLE_RULE, API_FIREWALL_FILTER_APPLY, API_FIREWALL_ALIAS_SEARCH_ITEM, API_FIREWALL_ALIAS_UTIL_ADD, API_FIREWALL_ALIAS_UTIL_DELETE, API_FIREWALL_ALIAS_RECONFIGURE, API_DIAGNOSTICS_LOG_FIREWALL, ) from ..shared.error_handlers import ( handle_tool_error, validate_uuid, validate_firewall_parameters, ErrorSeverity, ) from ..core.retry import RetryConfig from .configuration import get_opnsense_client logger = logging.getLogger("opnsense-mcp") def validate_port_specification(port_spec: str, operation: str) -> None: """ Validate port specification (single, range, or comma-separated list). Validates that: - Port numbers are between 1-65535 - Ranges have start < end - Format is correct (single, range, or comma-separated) Args: port_spec: Port specification string (e.g., "80", "80-443", "80,443,8080") operation: Operation name for error context Raises: ValidationError: If port specification is invalid Examples: validate_port_specification("80", "firewall_add_rule") # Single port - OK validate_port_specification("80-443", "firewall_add_rule") # Range - OK validate_port_specification("80,443,8080", "firewall_add_rule") # List - OK validate_port_specification("70000", "firewall_add_rule") # Invalid - raises validate_port_specification("443-80", "firewall_add_rule") # Invalid range - raises """ if not port_spec or not port_spec.strip(): return # Empty is valid for "any" import re # Pattern: single port, range, or comma-separated list # Allows digits, hyphens (for ranges), and commas (for lists) port_pattern = re.compile(r'^[\d,\-\s]+$') if not port_pattern.match(port_spec): raise ValidationError( f"Invalid port format: {port_spec}. Use single port (80), " f"range (80-443), or comma-separated list (80,443,8080)", context={"operation": operation, "port_spec": port_spec} ) # Validate each port/range component for part in port_spec.split(","): part = part.strip() if not part: continue if "-" in part: # Validate port range try: range_parts = part.split("-") if len(range_parts) != 2: raise ValueError("Range must have exactly two parts") start, end = map(int, range_parts) if start < 1 or end > 65535: raise ValidationError( f"Invalid port range: {part}. Ports must be 1-65535", context={"operation": operation, "range": part, "start": start, "end": end} ) if start >= end: raise ValidationError( f"Invalid port range: {part}. Start port must be less than end port", context={"operation": operation, "range": part, "start": start, "end": end} ) except ValueError as e: raise ValidationError( f"Invalid port range format: {part}. Must be START-END with valid numbers", context={"operation": operation, "range": part, "error": str(e)} ) else: # Validate single port try: port = int(part) if port < 1 or port > 65535: raise ValidationError( f"Invalid port number: {port}. Must be 1-65535", context={"operation": operation, "port": port} ) except ValueError: raise ValidationError( f"Invalid port number: {part}. Must be a valid integer", context={"operation": operation, "port": part} ) # ========== FIREWALL RULE TOOLS ========== @mcp.tool(name="firewall_get_rules", description="Get OPNsense firewall rules") async def firewall_get_rules( ctx: Context, search_phrase: str = "", page: int = 1, rows_per_page: int = 20 ) -> str: """Get OPNsense firewall rules. Args: ctx: MCP context search_phrase: Optional search phrase to filter rules page: Page number for pagination rows_per_page: Number of rows per page Returns: JSON string of firewall rules """ try: client = await get_opnsense_client() response = await client.request( "POST", API_FIREWALL_FILTER_SEARCH_RULE, data={ "current": page, "rowCount": rows_per_page, "searchPhrase": search_phrase } ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in firewall_get_rules: {str(e)}", exc_info=True) await ctx.error(f"Error fetching firewall rules: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="firewall_add_rule", description="Add a new firewall rule") async def firewall_add_rule( ctx: Context, description: str, action: str = "pass", interface: str = "lan", direction: str = "in", ipprotocol: str = "inet", protocol: str = "any", source_net: str = "any", destination_net: str = "any", destination_port: str = "", enabled: bool = True ) -> str: """Add a new firewall rule with comprehensive validation and error handling. Args: ctx: MCP context description: Rule description action: Rule action (pass, block, reject) interface: Network interface direction: Traffic direction (in, out) ipprotocol: IP protocol (inet for IPv4, inet6 for IPv6) protocol: Transport protocol (tcp, udp, icmp, any) source_net: Source network/host destination_net: Destination network/host destination_port: Destination port(s) enabled: Whether the rule is enabled Returns: JSON string with the result """ try: client = await get_opnsense_client() # Validate firewall rule parameters validate_firewall_parameters(action, direction, ipprotocol, protocol, "firewall_add_rule") # Validate description if not description or len(description.strip()) == 0: raise ValidationError("Rule description is required", context={"operation": "firewall_add_rule", "parameter": "description"}) # Validate port specification if provided if protocol in ["tcp", "udp"] and destination_port: validate_port_specification(destination_port, "firewall_add_rule") # Prepare rule data rule_data = { "rule": { "description": description, "action": action, "interface": interface, "direction": direction, "ipprotocol": ipprotocol, "protocol": protocol, "source_net": source_net, "destination_net": destination_net, "destination_port": destination_port, "enabled": "1" if enabled else "0" } } # Add the rule add_result = await client.request( "POST", API_FIREWALL_FILTER_ADD_RULE, data=rule_data ) # Apply changes await ctx.info("Rule added, applying changes...") apply_result = await client.request( "POST", API_FIREWALL_FILTER_APPLY ) return json.dumps({ "add_result": add_result, "apply_result": apply_result }, indent=2) except Exception as e: return await handle_tool_error(ctx, "firewall_add_rule", e, ErrorSeverity.HIGH) @mcp.tool(name="firewall_delete_rule", description="Delete a firewall rule by UUID") async def firewall_delete_rule(ctx: Context, uuid: str) -> str: """Delete a firewall rule by UUID with enhanced validation and error handling. Args: ctx: MCP context uuid: UUID of the rule to delete Returns: JSON string with the result """ try: client = await get_opnsense_client() # Validate UUID format validate_uuid(uuid, "firewall_delete_rule") # Delete the rule delete_result = await client.request( "POST", f"{API_FIREWALL_FILTER_DEL_RULE}/{uuid}", operation="delete_firewall_rule" ) # Apply changes with retry for reliability await ctx.info("Rule deleted, applying changes...") retry_config = RetryConfig(max_attempts=2, base_delay=1.0) apply_result = await client.request( "POST", API_FIREWALL_FILTER_APPLY, operation="apply_firewall_changes", retry_config=retry_config ) return json.dumps({ "delete_result": delete_result, "apply_result": apply_result }, indent=2) except Exception as e: return await handle_tool_error(ctx, "firewall_delete_rule", e, ErrorSeverity.HIGH) @mcp.tool(name="firewall_toggle_rule", description="Enable or disable a firewall rule") async def firewall_toggle_rule(ctx: Context, uuid: str, enabled: bool) -> str: """Enable or disable a firewall rule. Args: ctx: MCP context uuid: UUID of the rule to toggle enabled: Whether to enable or disable the rule Returns: JSON string with the result """ try: client = await get_opnsense_client() # Toggle the rule toggle_result = await client.request( "POST", f"{API_FIREWALL_FILTER_TOGGLE_RULE}/{uuid}/{1 if enabled else 0}" ) # Apply changes await ctx.info(f"Rule {'enabled' if enabled else 'disabled'}, applying changes...") apply_result = await client.request( "POST", API_FIREWALL_FILTER_APPLY ) return json.dumps({ "toggle_result": toggle_result, "apply_result": apply_result }, indent=2) except Exception as e: logger.error(f"Error in firewall_toggle_rule (uuid: {uuid}, enabled: {enabled}): {str(e)}", exc_info=True) await ctx.error(f"Error toggling firewall rule: {str(e)}") return f"Error: {str(e)}" # ========== FIREWALL ALIAS TOOLS ========== @mcp.tool(name="get_firewall_aliases", description="Get firewall aliases") async def get_firewall_aliases( ctx: Context, search_phrase: str = "", page: int = 1, rows_per_page: int = 20 ) -> str: """Get firewall aliases. Args: ctx: MCP context search_phrase: Optional search phrase to filter aliases page: Page number for pagination rows_per_page: Number of rows per page Returns: JSON string of firewall aliases """ try: client = await get_opnsense_client() response = await client.request( "POST", API_FIREWALL_ALIAS_SEARCH_ITEM, data={ "current": page, "rowCount": rows_per_page, "searchPhrase": search_phrase } ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_firewall_aliases: {str(e)}", exc_info=True) await ctx.error(f"Error fetching firewall aliases: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="add_to_alias", description="Add an entry to a firewall alias") async def add_to_alias(ctx: Context, alias_name: str, address: str) -> str: """Add an entry to a firewall alias. Args: ctx: MCP context alias_name: Name of the alias address: IP address, network, or hostname to add Returns: JSON string with the result """ try: client = await get_opnsense_client() # Add to alias add_result = await client.request( "POST", f"{API_FIREWALL_ALIAS_UTIL_ADD}/{alias_name}/{urllib.parse.quote_plus(address)}" ) # Reconfigure aliases await ctx.info("Entry added, applying changes...") reconfigure_result = await client.request( "POST", API_FIREWALL_ALIAS_RECONFIGURE ) return json.dumps({ "add_result": add_result, "reconfigure_result": reconfigure_result }, indent=2) except Exception as e: logger.error(f"Error in add_to_alias (alias: {alias_name}, address: {address}): {str(e)}", exc_info=True) await ctx.error(f"Error adding to alias: {str(e)}") return f"Error: {str(e)}" @mcp.tool(name="delete_from_alias", description="Delete an entry from a firewall alias") async def delete_from_alias(ctx: Context, alias_name: str, address: str) -> str: """Delete an entry from a firewall alias. Args: ctx: MCP context alias_name: Name of the alias address: IP address, network, or hostname to delete Returns: JSON string with the result """ try: client = await get_opnsense_client() # Delete from alias delete_result = await client.request( "POST", f"{API_FIREWALL_ALIAS_UTIL_DELETE}/{alias_name}/{urllib.parse.quote_plus(address)}" ) # Reconfigure aliases await ctx.info("Entry deleted, applying changes...") reconfigure_result = await client.request( "POST", API_FIREWALL_ALIAS_RECONFIGURE ) return json.dumps({ "delete_result": delete_result, "reconfigure_result": reconfigure_result }, indent=2) except Exception as e: logger.error(f"Error in delete_from_alias (alias: {alias_name}, address: {address}): {str(e)}", exc_info=True) await ctx.error(f"Error deleting from alias: {str(e)}") return f"Error: {str(e)}" # ========== FIREWALL LOG TOOLS ========== @mcp.tool(name="get_firewall_logs", description="Get firewall log entries") async def get_firewall_logs( ctx: Context, count: int = 100, filter_text: str = "" ) -> str: """Get firewall log entries. Args: ctx: MCP context count: Number of log entries to retrieve filter_text: Optional text to filter log entries Returns: JSON string of log entries """ try: client = await get_opnsense_client() response = await client.request( "GET", API_DIAGNOSTICS_LOG_FIREWALL, params={"limit": count, "filter": filter_text} ) return json.dumps(response, indent=2) except Exception as e: logger.error(f"Error in get_firewall_logs: {str(e)}", exc_info=True) await ctx.error(f"Error fetching firewall logs: {str(e)}") return f"Error: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/floriangrousset/opnsense-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server