Skip to main content
Glama
JoshDoesIT

Network Security Control Reviews

by JoshDoesIT
aws_network_acls.py13.6 kB
""" Parser for AWS Network ACLs Configuration This module provides classes and functions to parse and query AWS Network ACLs configurations as an example of Network Security Control (NSC) implementations. AWS Network ACLs are stateless, subnet-level firewalls that act as NSCs. Network ACLs operate at the subnet level, complementing Security Groups which operate at the instance level. Both are examples of Network Security Controls (NSCs). This parser demonstrates NSC parsing using AWS Network ACLs as an example. It can be extended or used as a reference for implementing parsers for other NSC types: - Azure Network Security Groups (NSGs) - Google Cloud Platform (GCP) Firewall Rules - Oracle Cloud Infrastructure (OCI) Security Lists - Traditional on-premises firewalls (Palo Alto, Check Point, Fortinet, etc.) Classes: NetworkACLRule: Represents a single NSC rule (AWS Network ACL rule - ingress or egress) NetworkACL: Represents an NSC (AWS Network ACL) with its rules and metadata AWSNetworkACLParser: Main parser class for loading and querying NSC configurations Key Features: - Direct AWS API integration via boto3 - Support for CIDR blocks - Tag-based filtering for environment/network identification - Port and protocol matching - Read-only operations (no configuration modification) """ import json from typing import Dict, List, Any, Optional from netaddr import IPNetwork, IPAddress # Conditional import for AWS SDK try: import boto3 from botocore.exceptions import ClientError, NoCredentialsError AWS_AVAILABLE = True except ImportError: AWS_AVAILABLE = False class NetworkACLRule: """ Represents a single Network ACL rule. Network ACL rules are stateless and operate at the subnet level. They control traffic flow in and out of subnets. """ def __init__(self, rule_data: Dict[str, Any], acl_id: str, vpc_id: str, is_egress: bool = False): """ Initialize a Network ACL rule from AWS API response. Args: rule_data: Dictionary from AWS describe_network_acls API response acl_id: Network ACL ID this rule belongs to vpc_id: VPC ID this rule belongs to is_egress: True if this is an egress rule, False for ingress """ self.acl_id = acl_id self.vpc_id = vpc_id self.rule_number = rule_data.get('RuleNumber', 0) self.protocol = rule_data.get('Protocol', '-1') self.rule_action = rule_data.get('RuleAction', 'deny') # 'allow' or 'deny' self.cidr_block = rule_data.get('CidrBlock', '') self.ipv6_cidr_block = rule_data.get('Ipv6CidrBlock', '') self.port_range = rule_data.get('PortRange', {}) self.from_port = self.port_range.get('From') if self.port_range else None self.to_port = self.port_range.get('To') if self.port_range else None self.is_egress = is_egress self.icmp_type_code = rule_data.get('IcmpTypeCode', {}) def to_dict(self) -> Dict[str, Any]: """Convert rule to dictionary representation.""" return { 'acl_id': self.acl_id, 'vpc_id': self.vpc_id, 'rule_number': self.rule_number, 'protocol': self.protocol, 'rule_action': self.rule_action, 'cidr_block': self.cidr_block, 'ipv6_cidr_block': self.ipv6_cidr_block, 'from_port': self.from_port, 'to_port': self.to_port, 'is_egress': self.is_egress, } def matches_cidr(self, cidr: str) -> bool: """Check if rule matches a CIDR block.""" if not self.cidr_block: return False try: rule_network = IPNetwork(self.cidr_block) query_network = IPNetwork(cidr) return query_network in rule_network or rule_network in query_network except Exception: return False def matches_port(self, port: int) -> bool: """Check if rule matches a port number.""" if self.from_port is None or self.to_port is None: return False return self.from_port <= port <= self.to_port class NetworkACL: """ Represents an AWS Network ACL. A Network ACL is a stateless firewall that operates at the subnet level. It contains ingress (inbound) and egress (outbound) rules. """ def __init__(self, acl_data: Dict[str, Any]): """ Initialize a Network ACL from AWS API response. Args: acl_data: Dictionary from AWS describe_network_acls API response """ self.acl_id = acl_data['NetworkAclId'] self.vpc_id = acl_data.get('VpcId', '') self.is_default = acl_data.get('IsDefault', False) self.associations = acl_data.get('Associations', []) self.tags = {tag['Key']: tag['Value'] for tag in acl_data.get('Tags', [])} # Parse ingress and egress rules self.ingress_rules = [ NetworkACLRule(rule, self.acl_id, self.vpc_id, is_egress=False) for rule in acl_data.get('Entries', []) if not rule.get('Egress', False) ] self.egress_rules = [ NetworkACLRule(rule, self.acl_id, self.vpc_id, is_egress=True) for rule in acl_data.get('Entries', []) if rule.get('Egress', False) ] def to_dict(self) -> Dict[str, Any]: """Convert Network ACL to dictionary representation.""" return { 'acl_id': self.acl_id, 'vpc_id': self.vpc_id, 'is_default': self.is_default, 'tags': self.tags, 'associations': self.associations, 'ingress_rules': [rule.to_dict() for rule in self.ingress_rules], 'egress_rules': [rule.to_dict() for rule in self.egress_rules], } def get_all_rules(self) -> List[NetworkACLRule]: """Get all rules (ingress + egress).""" return self.ingress_rules + self.egress_rules class AWSNetworkACLParser: """ Parser for AWS Network ACLs configurations. Loads Network ACLs directly from AWS EC2 API and provides querying capabilities. """ def __init__(self, aws_region: Optional[str] = None, aws_profile: Optional[str] = None): """ Initialize parser with AWS credentials configuration. Args: aws_region: AWS region (defaults to AWS_DEFAULT_REGION or us-east-1) aws_profile: AWS profile name for credentials """ if not AWS_AVAILABLE: raise ImportError("boto3 is required for AWS Network ACL parsing") self.aws_region = aws_region or self._get_default_region(aws_profile) self.aws_profile = aws_profile self.network_acls: Dict[str, NetworkACL] = {} self.aws_source: Optional[str] = None def _get_default_region(self, aws_profile: Optional[str] = None) -> str: """Get default AWS region from environment or config.""" import os # Try environment variable first region = os.getenv('AWS_DEFAULT_REGION') or os.getenv('AWS_REGION') if region: return region # Try AWS config file try: import boto3 session = boto3.Session(profile_name=aws_profile) if aws_profile else boto3.Session() if session.region_name: return session.region_name except Exception: pass # Default fallback return 'us-east-1' def _get_ec2_client(self): """Get boto3 EC2 client with configured credentials.""" if not AWS_AVAILABLE: raise ImportError("boto3 is required for AWS Network ACL parsing") session_kwargs = {} if self.aws_profile: session_kwargs['profile_name'] = self.aws_profile session = boto3.Session(**session_kwargs) return session.client('ec2', region_name=self.aws_region) def load_from_aws(self, vpc_id: Optional[str] = None, network_acl_ids: Optional[List[str]] = None): """ Load Network ACLs directly from AWS EC2 API. Args: vpc_id: Load all Network ACLs from a specific VPC network_acl_ids: Load specific Network ACLs by their IDs Raises: NoCredentialsError: If AWS credentials are not configured ClientError: If AWS API call fails """ ec2_client = self._get_ec2_client() # Build filters filters = [] if vpc_id: filters.append({'Name': 'vpc-id', 'Values': [vpc_id]}) # Describe Network ACLs if network_acl_ids: response = ec2_client.describe_network_acls(NetworkAclIds=network_acl_ids) else: response = ec2_client.describe_network_acls(Filters=filters if filters else None) # Parse Network ACLs self.network_acls = {} for acl_data in response.get('NetworkAcls', []): acl = NetworkACL(acl_data) self.network_acls[acl.acl_id] = acl # Set source description if vpc_id: self.aws_source = f"AWS VPC: {vpc_id}" elif network_acl_ids: self.aws_source = f"AWS Network ACLs: {', '.join(network_acl_ids)}" else: self.aws_source = "AWS (all Network ACLs)" def get_summary(self) -> Dict[str, Any]: """ Get high-level summary of loaded Network ACL configuration. Returns: Dictionary with summary information: - total_network_acls: Count of Network ACLs - total_rules: Count of all rules - vpcs: List of VPC IDs - network_acls: List of Network ACL summaries - aws_source: Source description """ all_rules = [] vpcs = set() acl_summaries = [] for acl in self.network_acls.values(): vpcs.add(acl.vpc_id) rules = acl.get_all_rules() all_rules.extend(rules) acl_summaries.append({ 'acl_id': acl.acl_id, 'vpc_id': acl.vpc_id, 'is_default': acl.is_default, 'tags': acl.tags, 'ingress_rule_count': len(acl.ingress_rules), 'egress_rule_count': len(acl.egress_rules), 'total_rules': len(rules), }) return { 'total_network_acls': len(self.network_acls), 'total_rules': len(all_rules), 'vpcs': sorted(list(vpcs)), 'network_acls': acl_summaries, 'aws_source': self.aws_source or 'Not loaded', } def get_all_rules(self) -> List[Dict[str, Any]]: """ Get all rules from all Network ACLs. Returns: List of rule dictionaries """ all_rules = [] for acl in self.network_acls.values(): all_rules.extend([rule.to_dict() for rule in acl.get_all_rules()]) return all_rules def query_rules( self, cidr: Optional[str] = None, port: Optional[int] = None, protocol: Optional[str] = None, rule_action: Optional[str] = None, is_egress: Optional[bool] = None, tag_key: Optional[str] = None, tag_value: Optional[str] = None, ) -> List[Dict[str, Any]]: """ Query Network ACL rules by various criteria. Args: cidr: CIDR block to match port: Port number to match protocol: Protocol to match ('tcp', 'udp', 'icmp', '-1') rule_action: Rule action to match ('allow', 'deny') is_egress: Filter by egress (True) or ingress (False) tag_key: Network ACL tag key to filter by tag_value: Network ACL tag value to filter by (requires tag_key) Returns: List of matching rule dictionaries """ results = [] for acl in self.network_acls.values(): # Filter by tags if tag_key: if tag_key not in acl.tags: continue if tag_value and acl.tags[tag_key] != tag_value: continue # Get rules to check rules_to_check = [] if is_egress is None: rules_to_check = acl.get_all_rules() elif is_egress: rules_to_check = acl.egress_rules else: rules_to_check = acl.ingress_rules for rule in rules_to_check: # Filter by rule_action if rule_action and rule.rule_action != rule_action: continue # Filter by protocol if protocol and rule.protocol != protocol: continue # Filter by CIDR if cidr and not rule.matches_cidr(cidr): continue # Filter by port if port is not None and not rule.matches_port(port): continue # Rule matches all criteria rule_dict = rule.to_dict() rule_dict['acl_tags'] = acl.tags results.append(rule_dict) return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JoshDoesIT/Network-Security-Control-Reviews-with-MCP-and-LLMs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server