Skip to main content
Glama
dkruyt

Hetzner Cloud MCP Server

by dkruyt

create_firewall

Create a firewall on Hetzner Cloud to control network traffic by defining rules and applying them to servers or resources.

Instructions

Create a new firewall.

Creates a new firewall with the specified name, rules, and resources.

Examples:
- Basic firewall: {"name": "web-firewall"}
- With rules: {"name": "web-firewall", "rules": [{"direction": "in", "protocol": "tcp", "port": "80", "source_ips": ["0.0.0.0/0"]}]}
- With resources: {"name": "web-firewall", "rules": [...], "resources": [{"type": "server", "server_id": 123}]}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
paramsYes

Implementation Reference

  • The primary handler function for the 'create_firewall' MCP tool. It processes input parameters, constructs Hetzner Cloud FirewallRule and FirewallResource objects, calls the hcloud client to create the firewall, and returns a formatted response with firewall details and associated actions.
    @mcp.tool()
    def create_firewall(params: CreateFirewallParams) -> Dict[str, Any]:
        """
        Create a new firewall.
        
        Creates a new firewall with the specified name, rules, and resources.
        
        Examples:
        - Basic firewall: {"name": "web-firewall"}
        - With rules: {"name": "web-firewall", "rules": [{"direction": "in", "protocol": "tcp", "port": "80", "source_ips": ["0.0.0.0/0"]}]}
        - With resources: {"name": "web-firewall", "rules": [...], "resources": [{"type": "server", "server_id": 123}]}
        """
        try:
            # Prepare rules if provided
            rules = None
            if params.rules:
                rules = []
                for rule_param in params.rules:
                    rule = FirewallRule(
                        direction=rule_param.direction,
                        protocol=rule_param.protocol,
                        source_ips=rule_param.source_ips,
                        port=rule_param.port,
                        destination_ips=rule_param.destination_ips,
                        description=rule_param.description
                    )
                    rules.append(rule)
            
            # Prepare resources if provided
            resources = None
            if params.resources:
                resources = []
                for resource_param in params.resources:
                    if resource_param.type == "server":
                        if not resource_param.server_id:
                            return {"error": "Server ID is required when resource type is 'server'"}
                        server = client.servers.get_by_id(resource_param.server_id)
                        if not server:
                            return {"error": f"Server with ID {resource_param.server_id} not found"}
                        resource = FirewallResource(type=resource_param.type, server=server)
                    elif resource_param.type == "label_selector":
                        if not resource_param.label_selector:
                            return {"error": "Label selector is required when resource type is 'label_selector'"}
                        label_selector = FirewallResourceLabelSelector(selector=resource_param.label_selector)
                        resource = FirewallResource(type=resource_param.type, label_selector=label_selector)
                    else:
                        return {"error": f"Invalid resource type: {resource_param.type}. Must be 'server' or 'label_selector'"}
                    resources.append(resource)
            
            # Create the firewall
            response = client.firewalls.create(
                name=params.name,
                rules=rules,
                labels=params.labels,
                resources=resources
            )
            
            # Extract firewall and action information
            firewall = response.firewall
            actions = response.actions
            
            # Format the response
            return {
                "firewall": firewall_to_dict(firewall),
                "actions": [
                    {
                        "id": action.id,
                        "status": action.status,
                        "command": action.command,
                        "progress": action.progress,
                        "error": action.error,
                        "started": action.started.isoformat() if action.started else None,
                        "finished": action.finished.isoformat() if action.finished else None,
                    }
                    for action in actions
                ] if actions else None,
            }
        except Exception as e:
            return {"error": f"Failed to create firewall: {str(e)}"}
  • Pydantic BaseModel schemas defining the structured input for the create_firewall tool, including individual rule (FirewallRuleParam), resource (FirewallResourceParam), and top-level parameters (CreateFirewallParams).
    class FirewallRuleParam(BaseModel):
        direction: str = Field(..., description="Direction of the rule (in or out)")
        protocol: str = Field(..., description="Protocol (tcp, udp, icmp, esp, or gre)")
        source_ips: List[str] = Field(..., description="List of source IPs in CIDR notation")
        port: Optional[str] = Field(None, description="Port or port range (e.g., '80' or '80-85'), only for TCP/UDP")
        destination_ips: Optional[List[str]] = Field(None, description="List of destination IPs in CIDR notation")
        description: Optional[str] = Field(None, description="Description of the rule")
    
    # Firewall Resource Parameter Model
    class FirewallResourceParam(BaseModel):
        type: str = Field(..., description="Type of resource ('server' or 'label_selector')")
        server_id: Optional[int] = Field(None, description="Server ID (required when type is 'server')")
        label_selector: Optional[str] = Field(None, description="Label selector (required when type is 'label_selector')")
    
    # Create Firewall Parameter Model
    class CreateFirewallParams(BaseModel):
        name: str = Field(..., description="Name of the firewall")
        rules: Optional[List[FirewallRuleParam]] = Field(None, description="List of firewall rules")
        resources: Optional[List[FirewallResourceParam]] = Field(None, description="List of resources to apply the firewall to")
        labels: Optional[Dict[str, str]] = Field(None, description="User-defined labels (key-value pairs)")
  • Utility helper function to serialize Hetzner Cloud Firewall domain objects into plain dictionaries for inclusion in the tool's JSON response.
    # Helper function to convert Firewall object to dict
    def firewall_to_dict(firewall: Firewall) -> Dict[str, Any]:
        """Convert a Firewall object to a dictionary with relevant information."""
        # Convert rules to dict
        rules = []
        if firewall.rules:
            for rule in firewall.rules:
                rule_dict = {
                    "direction": rule.direction,
                    "protocol": rule.protocol,
                    "source_ips": rule.source_ips,
                }
                if rule.port:
                    rule_dict["port"] = rule.port
                if rule.destination_ips:
                    rule_dict["destination_ips"] = rule.destination_ips
                if rule.description:
                    rule_dict["description"] = rule.description
                rules.append(rule_dict)
        
        # Convert applied_to resources to dict
        applied_to = []
        if firewall.applied_to:
            for resource in firewall.applied_to:
                resource_dict = {"type": resource.type}
                if resource.server:
                    resource_dict["server"] = {"id": resource.server.id, "name": resource.server.name}
                if resource.label_selector:
                    resource_dict["label_selector"] = {"selector": resource.label_selector.selector}
                if getattr(resource, 'applied_to_resources', None):
                    applied_resources = []
                    for applied_resource in resource.applied_to_resources:
                        applied_resource_dict = {"type": applied_resource.type}
                        if applied_resource.server:
                            applied_resource_dict["server"] = {"id": applied_resource.server.id, "name": applied_resource.server.name}
                        applied_resources.append(applied_resource_dict)
                    resource_dict["applied_to_resources"] = applied_resources
                applied_to.append(resource_dict)
        
        return {
            "id": firewall.id,
            "name": firewall.name,
            "rules": rules,
            "applied_to": applied_to,
            "labels": firewall.labels,
            "created": firewall.created.isoformat() if firewall.created else None,
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dkruyt/mcp-hetzner'

If you have feedback or need assistance with the MCP directory API, please join our Discord server