Skip to main content
Glama
firewall_nat.py14 kB
from typing import Optional from ..connector import execute_mikrotik_command from ..logger import app_logger def mikrotik_create_nat_rule( chain: str, action: str, src_address: Optional[str] = None, dst_address: Optional[str] = None, src_port: Optional[str] = None, dst_port: Optional[str] = None, protocol: Optional[str] = None, in_interface: Optional[str] = None, out_interface: Optional[str] = None, to_addresses: Optional[str] = None, to_ports: Optional[str] = None, comment: Optional[str] = None, disabled: bool = False, log: bool = False, log_prefix: Optional[str] = None, place_before: Optional[str] = None ) -> str: """ Creates a NAT rule on MikroTik device. Args: chain: NAT chain (srcnat, dstnat) action: Action to take (accept, drop, masquerade, dst-nat, src-nat, etc.) src_address: Source IP address or range dst_address: Destination IP address or range src_port: Source port or range dst_port: Destination port or range protocol: Protocol (tcp, udp, icmp, etc.) in_interface: Input interface out_interface: Output interface to_addresses: Target address for translation to_ports: Target port for translation comment: Optional comment for the rule disabled: Whether to disable the rule after creation log: Whether to log packets matching this rule log_prefix: Prefix for log entries place_before: Place this rule before another rule (by number) Returns: Command output or error message """ app_logger.info(f"Creating NAT rule: chain={chain}, action={action}") # Validate chain valid_chains = ["srcnat", "dstnat"] if chain not in valid_chains: return f"Error: Invalid chain '{chain}'. Must be one of: {', '.join(valid_chains)}" # Validate action based on chain srcnat_actions = ["accept", "drop", "masquerade", "src-nat", "same", "netmap", "jump", "return", "log", "passthrough"] dstnat_actions = ["accept", "drop", "dst-nat", "jump", "return", "log", "passthrough", "redirect", "netmap", "same"] if chain == "srcnat" and action not in srcnat_actions: return f"Error: Invalid action '{action}' for srcnat. Must be one of: {', '.join(srcnat_actions)}" elif chain == "dstnat" and action not in dstnat_actions: return f"Error: Invalid action '{action}' for dstnat. Must be one of: {', '.join(dstnat_actions)}" # Build the command cmd = f"/ip firewall nat add chain={chain} action={action}" # Add optional parameters if src_address: cmd += f" src-address={src_address}" if dst_address: cmd += f" dst-address={dst_address}" if src_port: cmd += f" src-port={src_port}" if dst_port: cmd += f" dst-port={dst_port}" if protocol: cmd += f" protocol={protocol}" if in_interface: cmd += f' in-interface="{in_interface}"' if out_interface: cmd += f' out-interface="{out_interface}"' if to_addresses: cmd += f" to-addresses={to_addresses}" if to_ports: cmd += f" to-ports={to_ports}" if comment: cmd += f' comment="{comment}"' if disabled: cmd += " disabled=yes" if log: cmd += " log=yes" if log_prefix: cmd += f' log-prefix="{log_prefix}"' if place_before: cmd += f" place-before={place_before}" result = execute_mikrotik_command(cmd) # Check if creation was successful if result.strip(): # MikroTik returns the ID of created item on success if "*" in result or result.strip().isdigit(): # Success - get the details rule_id = result.strip() details_cmd = f"/ip firewall nat print detail where .id={rule_id}" details = execute_mikrotik_command(details_cmd) if details.strip(): return f"NAT rule created successfully:\n\n{details}" else: return f"NAT rule created with ID: {result}" else: # Error occurred return f"Failed to create NAT rule: {result}" else: # No output might mean success, let's check by getting the last rule details_cmd = "/ip firewall nat print detail count-only" count = execute_mikrotik_command(details_cmd) if count.strip().isdigit() and int(count.strip()) > 0: # Get the last rule last_rule_cmd = f"/ip firewall nat print detail from={int(count.strip())-1}" details = execute_mikrotik_command(last_rule_cmd) return f"NAT rule created successfully:\n\n{details}" else: return "NAT rule creation completed but unable to verify." def mikrotik_list_nat_rules( chain_filter: Optional[str] = None, action_filter: Optional[str] = None, src_address_filter: Optional[str] = None, dst_address_filter: Optional[str] = None, protocol_filter: Optional[str] = None, interface_filter: Optional[str] = None, disabled_only: bool = False, invalid_only: bool = False ) -> str: """ Lists NAT rules on MikroTik device. Args: chain_filter: Filter by chain (srcnat, dstnat) action_filter: Filter by action src_address_filter: Filter by source address dst_address_filter: Filter by destination address protocol_filter: Filter by protocol interface_filter: Filter by interface (in or out) disabled_only: Show only disabled rules invalid_only: Show only invalid rules Returns: List of NAT rules """ app_logger.info(f"Listing NAT rules with filters: chain={chain_filter}, action={action_filter}") # Build the command cmd = "/ip firewall nat print" # Add filters filters = [] if chain_filter: filters.append(f"chain={chain_filter}") if action_filter: filters.append(f"action={action_filter}") if src_address_filter: filters.append(f'src-address~"{src_address_filter}"') if dst_address_filter: filters.append(f'dst-address~"{dst_address_filter}"') if protocol_filter: filters.append(f"protocol={protocol_filter}") if interface_filter: filters.append(f'(in-interface~"{interface_filter}" or out-interface~"{interface_filter}")') if disabled_only: filters.append("disabled=yes") if invalid_only: filters.append("invalid=yes") if filters: cmd += " where " + " ".join(filters) result = execute_mikrotik_command(cmd) # Check for empty result if not result or result.strip() == "" or result.strip() == "no such item": return "No NAT rules found matching the criteria." return f"NAT RULES:\n\n{result}" def mikrotik_get_nat_rule(rule_id: str) -> str: """ Gets detailed information about a specific NAT rule. Args: rule_id: ID of the NAT rule (can be number or *number format) Returns: Detailed information about the NAT rule """ app_logger.info(f"Getting NAT rule details: rule_id={rule_id}") cmd = f"/ip firewall nat print detail where .id={rule_id}" result = execute_mikrotik_command(cmd) if not result or result.strip() == "": return f"NAT rule with ID '{rule_id}' not found." return f"NAT RULE DETAILS:\n\n{result}" def mikrotik_update_nat_rule( rule_id: str, chain: Optional[str] = None, action: Optional[str] = None, src_address: Optional[str] = None, dst_address: Optional[str] = None, src_port: Optional[str] = None, dst_port: Optional[str] = None, protocol: Optional[str] = None, in_interface: Optional[str] = None, out_interface: Optional[str] = None, to_addresses: Optional[str] = None, to_ports: Optional[str] = None, comment: Optional[str] = None, disabled: Optional[bool] = None, log: Optional[bool] = None, log_prefix: Optional[str] = None ) -> str: """ Updates an existing NAT rule on MikroTik device. Args: rule_id: ID of the NAT rule to update chain: New chain action: New action src_address: New source address dst_address: New destination address src_port: New source port dst_port: New destination port protocol: New protocol in_interface: New input interface out_interface: New output interface to_addresses: New target address to_ports: New target port comment: New comment disabled: Enable/disable the rule log: Enable/disable logging log_prefix: New log prefix Returns: Command output or error message """ app_logger.info(f"Updating NAT rule: rule_id={rule_id}") # Build the command cmd = f"/ip firewall nat set {rule_id}" # Add parameters to update updates = [] if chain: updates.append(f"chain={chain}") if action: updates.append(f"action={action}") if src_address is not None: if src_address == "": updates.append("!src-address") else: updates.append(f"src-address={src_address}") if dst_address is not None: if dst_address == "": updates.append("!dst-address") else: updates.append(f"dst-address={dst_address}") if src_port is not None: if src_port == "": updates.append("!src-port") else: updates.append(f"src-port={src_port}") if dst_port is not None: if dst_port == "": updates.append("!dst-port") else: updates.append(f"dst-port={dst_port}") if protocol is not None: if protocol == "": updates.append("!protocol") else: updates.append(f"protocol={protocol}") if in_interface is not None: if in_interface == "": updates.append("!in-interface") else: updates.append(f'in-interface="{in_interface}"') if out_interface is not None: if out_interface == "": updates.append("!out-interface") else: updates.append(f'out-interface="{out_interface}"') if to_addresses is not None: if to_addresses == "": updates.append("!to-addresses") else: updates.append(f"to-addresses={to_addresses}") if to_ports is not None: if to_ports == "": updates.append("!to-ports") else: updates.append(f"to-ports={to_ports}") if comment is not None: updates.append(f'comment="{comment}"') if disabled is not None: updates.append(f'disabled={"yes" if disabled else "no"}') if log is not None: updates.append(f'log={"yes" if log else "no"}') if log and log_prefix: updates.append(f'log-prefix="{log_prefix}"') if not updates: return "No updates specified." cmd += " " + " ".join(updates) result = execute_mikrotik_command(cmd) # Check if update was successful if "failure:" in result.lower() or "error" in result.lower(): return f"Failed to update NAT rule: {result}" # Get the updated rule details details_cmd = f"/ip firewall nat print detail where .id={rule_id}" details = execute_mikrotik_command(details_cmd) return f"NAT rule updated successfully:\n\n{details}" def mikrotik_remove_nat_rule(rule_id: str) -> str: """ Removes a NAT rule from MikroTik device. Args: rule_id: ID of the NAT rule to remove Returns: Command output or error message """ app_logger.info(f"Removing NAT rule: rule_id={rule_id}") # First check if the rule exists check_cmd = f"/ip firewall nat print count-only where .id={rule_id}" count = execute_mikrotik_command(check_cmd) if count.strip() == "0": return f"NAT rule with ID '{rule_id}' not found." # Remove the rule cmd = f"/ip firewall nat remove {rule_id}" result = execute_mikrotik_command(cmd) if "failure:" in result.lower() or "error" in result.lower(): return f"Failed to remove NAT rule: {result}" return f"NAT rule with ID '{rule_id}' removed successfully." def mikrotik_move_nat_rule(rule_id: str, destination: int) -> str: """ Moves a NAT rule to a different position in the chain. Args: rule_id: ID of the NAT rule to move destination: Destination position (0-based index) Returns: Command output or error message """ app_logger.info(f"Moving NAT rule: rule_id={rule_id} to position {destination}") # Check if the rule exists check_cmd = f"/ip firewall nat print count-only where .id={rule_id}" count = execute_mikrotik_command(check_cmd) if count.strip() == "0": return f"NAT rule with ID '{rule_id}' not found." # Move the rule cmd = f"/ip firewall nat move {rule_id} destination={destination}" result = execute_mikrotik_command(cmd) if "failure:" in result.lower() or "error" in result.lower(): return f"Failed to move NAT rule: {result}" return f"NAT rule with ID '{rule_id}' moved to position {destination}." def mikrotik_enable_nat_rule(rule_id: str) -> str: """ Enables a NAT rule. Args: rule_id: ID of the NAT rule to enable Returns: Command output or error message """ return mikrotik_update_nat_rule(rule_id, disabled=False) def mikrotik_disable_nat_rule(rule_id: str) -> str: """ Disables a NAT rule. Args: rule_id: ID of the NAT rule to disable Returns: Command output or error message """ return mikrotik_update_nat_rule(rule_id, disabled=True)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tarcisiodier/mcp-mikrotik'

If you have feedback or need assistance with the MCP directory API, please join our Discord server