Skip to main content
Glama

AWS Security MCP

ec2.py39.1 kB
"""AWS EC2 service client module. This module provides functions for interacting with AWS EC2 services. """ import logging from typing import Any, Dict, List, Optional, Union, Iterator import boto3 from botocore.exceptions import ClientError from botocore.paginate import PageIterator from aws_security_mcp.services.base import get_client, handle_aws_error, handle_pagination # Configure logging logger = logging.getLogger(__name__) def get_ec2_client(session_context: Optional[str] = None, **kwargs: Any) -> boto3.client: """Get AWS EC2 client. Args: session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the boto3 client constructor Returns: boto3.client: An initialized EC2 client """ return get_client('ec2', session_context=session_context, **kwargs) def get_paginator(operation_name: str, session_context: Optional[str] = None, **kwargs: Any) -> PageIterator: """Get a paginator for the specified EC2 operation. Args: operation_name: Name of the EC2 operation to paginate session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the paginator Returns: PageIterator: A boto3 paginator for the operation """ client = get_ec2_client(session_context=session_context, **kwargs) return client.get_paginator(operation_name) def create_instance_filters(states: Optional[List[str]] = None, vpc_ids: Optional[List[str]] = None, subnet_ids: Optional[List[str]] = None, additional_filters: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: """Create properly formatted filters for EC2 instance API calls. Args: states: Optional list of instance states to filter by (e.g., ['running', 'stopped']) vpc_ids: Optional list of VPC IDs to filter by subnet_ids: Optional list of subnet IDs to filter by additional_filters: Optional list of additional filters to include Returns: List[Dict[str, Any]]: Properly formatted filters list for EC2 API calls """ filters = [] # Add instance state filter if provided if states: filters.append({ 'Name': 'instance-state-name', 'Values': states }) # Add VPC filter if provided if vpc_ids: filters.append({ 'Name': 'vpc-id', 'Values': vpc_ids }) # Add subnet filter if provided if subnet_ids: filters.append({ 'Name': 'subnet-id', 'Values': subnet_ids }) # Add any additional filters if additional_filters: filters.extend(additional_filters) return filters def describe_instances( filters: Optional[List[Dict[str, Any]]] = None, instance_ids: Optional[List[str]] = None, max_results: Optional[int] = None, next_token: Optional[str] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe EC2 instances with filtering options. Args: filters: List of filters to apply instance_ids: List of instance IDs to describe max_results: Maximum number of instances to return per page (None for all, API limits: 5-1000) next_token: Token for pagination session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_instances API call Returns: Dict[str, Any]: Response containing instances grouped by reservation """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if filters: params['Filters'] = filters if instance_ids: params['InstanceIds'] = instance_ids if next_token: params['NextToken'] = next_token try: return client.describe_instances(**params) except ClientError as e: logger.error(f"Error describing EC2 instances: {e}") raise def get_all_instances( filters: Optional[List[Dict[str, Any]]] = None, instance_ids: Optional[List[str]] = None, max_items: Optional[int] = None, states: Optional[List[str]] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Get all EC2 instances with pagination handling using boto3 paginators. Args: filters: List of filters to apply instance_ids: List of instance IDs to describe max_items: Maximum number of instances to return (None for all) states: Optional list of instance states to filter by (e.g., ['running', 'stopped']) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_instances API call Returns: List[Dict[str, Any]]: List of EC2 instances """ # Create filters including state filters if provided if states: if filters is None: filters = [] filters = create_instance_filters(states=states, additional_filters=filters) # Set up parameters params = {} if filters: params['Filters'] = filters if instance_ids: params['InstanceIds'] = instance_ids # Add additional parameters params.update(kwargs) # Get paginator paginator = get_paginator('describe_instances', session_context=session_context) # Configure pagination page_config = {} # Get all instances all_instances = [] try: page_iterator = paginator.paginate(**params, PaginationConfig=page_config) for page in page_iterator: # Process instances reservations = page.get('Reservations', []) for reservation in reservations: all_instances.extend(reservation.get('Instances', [])) except ClientError as e: logger.error(f"Error getting EC2 instances: {e}") raise return all_instances def filter_instances_by_text(instances: List[Dict[str, Any]], search_term: str) -> List[Dict[str, Any]]: """Filter instances based on text search. Args: instances: List of instances to filter search_term: Text to search for in instance attributes Returns: List[Dict[str, Any]]: Filtered list of instances """ search_term = search_term.lower() # Check if we're specifically searching for state state_search = None if search_term in ['running', 'stopped', 'stopping', 'pending', 'terminated', 'shutting-down']: state_search = search_term elif search_term.startswith('state:'): state_search = search_term.split(':', 1)[1].strip() # If we're only searching for state, use a direct filter if state_search and search_term == state_search or search_term == f"state:{state_search}": return [ instance for instance in instances if instance.get('State', {}).get('Name', '').lower() == state_search ] filtered_instances = [] for instance in instances: # Get relevant fields to search in instance_id = instance.get('InstanceId', '').lower() instance_type = instance.get('InstanceType', '').lower() private_ip = instance.get('PrivateIpAddress', '').lower() public_ip = instance.get('PublicIpAddress', '').lower() vpc_id = instance.get('VpcId', '').lower() subnet_id = instance.get('SubnetId', '').lower() image_id = instance.get('ImageId', '').lower() state = instance.get('State', {}).get('Name', '').lower() # Check for name tag name_tag = "" for tag in instance.get('Tags', []): if tag.get('Key') == 'Name': name_tag = tag.get('Value', '').lower() break # If state search is active, filter out non-matching instances if state_search and state != state_search: continue # Check if any field matches the search term if (search_term in instance_id or search_term in instance_type or search_term in private_ip or search_term in public_ip or search_term in vpc_id or search_term in subnet_id or search_term in image_id or search_term in name_tag or search_term in state): filtered_instances.append(instance) return filtered_instances def describe_security_groups( group_ids: Optional[List[str]] = None, group_names: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe EC2 security groups. Args: group_ids: List of security group IDs to describe group_names: List of security group names to describe filters: List of filters to apply next_token: Token for pagination max_results: Maximum number of results to return per page (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_security_groups API call Returns: Dict containing security groups and pagination information """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if group_ids: params['GroupIds'] = group_ids if group_names: params['GroupNames'] = group_names if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token try: response = client.describe_security_groups(**params) security_groups = response.get('SecurityGroups', []) next_token_value = response.get('NextToken') return { 'SecurityGroups': security_groups, 'NextToken': next_token_value } except ClientError as e: logger.error(f"Error describing security groups: {e}") raise def get_all_security_groups( filters: Optional[List[Dict[str, Any]]] = None, group_ids: Optional[List[str]] = None, group_names: Optional[List[str]] = None, max_items: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Get all security groups with pagination handling using boto3 paginators. Args: filters: List of filters to apply group_ids: List of security group IDs to describe group_names: List of security group names to describe max_items: Maximum number of security groups to return (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_security_groups API call Returns: List[Dict[str, Any]]: List of security groups """ # Set up parameters params = {} if filters: params['Filters'] = filters if group_ids: params['GroupIds'] = group_ids if group_names: params['GroupNames'] = group_names # Add additional parameters params.update(kwargs) # Get paginator paginator = get_paginator('describe_security_groups', session_context=session_context) # Configure pagination page_config = {} # Get all security groups all_security_groups = [] try: page_iterator = paginator.paginate(**params, PaginationConfig=page_config) for page in page_iterator: all_security_groups.extend(page.get('SecurityGroups', [])) except ClientError as e: logger.error(f"Error getting security groups: {e}") raise return all_security_groups def filter_security_groups_by_text( search_term: str, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Filter security groups by matching text in name, description, or tags. Args: search_term: Text to search for in security group name, description or tags Special syntax: - port:XX - Find security groups with specific port open - protocol:XX - Find security groups allowing specific protocol - public:true - Find security groups open to the internet - cidr:X.X.X.X/X - Find security groups allowing specific CIDR range session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the get_all_security_groups function Returns: List[Dict[str, Any]]: List of matching security groups """ # Get all security groups without any limit security_groups = get_all_security_groups(session_context=session_context, **kwargs) # If empty search term, return all if not search_term: return security_groups # Handle special search syntax if search_term.startswith("port:"): try: # Extract port number and convert to int port_str = search_term.split(':', 1)[1] port = int(port_str) # Filter security groups with this port open matching_groups = [] for sg in security_groups: for rule in sg.get('IpPermissions', []): rule_protocol = rule.get('IpProtocol', '') from_port = rule.get('FromPort') to_port = rule.get('ToPort') # Check if port is in range port_match = False if rule_protocol == '-1': # All traffic port_match = True elif from_port is not None and to_port is not None: if from_port <= port <= to_port: port_match = True if port_match: matching_groups.append(sg) break # No need to check other rules return matching_groups except (ValueError, TypeError) as e: logger.warning(f"Invalid port number in search: {search_term}") # Fall back to text search pass elif search_term.startswith("protocol:"): protocol = search_term.split(':', 1)[1].lower() matching_groups = [] for sg in security_groups: for rule in sg.get('IpPermissions', []): rule_protocol = rule.get('IpProtocol', '').lower() # Handle protocol matching if rule_protocol == '-1' or rule_protocol == protocol: matching_groups.append(sg) break elif protocol == 'tcp' and rule_protocol == '6': matching_groups.append(sg) break elif protocol == 'udp' and rule_protocol == '17': matching_groups.append(sg) break return matching_groups elif search_term.startswith("public:true"): matching_groups = [] for sg in security_groups: for rule in sg.get('IpPermissions', []): is_public = False # Check for public CIDR ranges for ip_range in rule.get('IpRanges', []): cidr = ip_range.get('CidrIp', '') if cidr in ('0.0.0.0/0', '::/0'): is_public = True break # Check IPv6 ranges too if not is_public: for ipv6_range in rule.get('Ipv6Ranges', []): cidr = ipv6_range.get('CidrIpv6', '') if cidr == '::/0': is_public = True break if is_public: matching_groups.append(sg) break return matching_groups elif search_term.startswith("cidr:"): cidr = search_term.split(':', 1)[1] matching_groups = [] for sg in security_groups: for rule in sg.get('IpPermissions', []): cidr_match = False # Check IPv4 CIDR ranges for ip_range in rule.get('IpRanges', []): if ip_range.get('CidrIp', '') == cidr: cidr_match = True break # Check IPv6 ranges if not cidr_match: for ipv6_range in rule.get('Ipv6Ranges', []): if ipv6_range.get('CidrIpv6', '') == cidr: cidr_match = True break if cidr_match: matching_groups.append(sg) break return matching_groups # Standard text search for everything else # Normalize search term search_term_lower = search_term.lower() # Filter security groups by search term matching_groups = [] for sg in security_groups: # Check if search term is in name or description if search_term_lower in sg.get('GroupName', '').lower() or search_term_lower in sg.get('Description', '').lower(): matching_groups.append(sg) continue # Check if search term is in tags tags = sg.get('Tags', []) for tag in tags: if (search_term_lower in tag.get('Key', '').lower() or search_term_lower in tag.get('Value', '').lower()): matching_groups.append(sg) break return matching_groups def describe_vpcs( vpc_ids: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe VPCs with filtering options. Args: vpc_ids: List of VPC IDs to describe filters: List of filters to apply next_token: Token for pagination max_results: Maximum number of results to return (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_vpcs API call Returns: Dict[str, Any]: Response containing VPCs """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if vpc_ids: params['VpcIds'] = vpc_ids if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token try: return client.describe_vpcs(**params) except ClientError as e: logger.error(f"Error describing VPCs: {e}") raise def get_all_vpcs( filters: Optional[List[Dict[str, Any]]] = None, vpc_ids: Optional[List[str]] = None, max_items: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Get all VPCs with pagination handling using boto3 paginators. Args: filters: List of filters to apply vpc_ids: List of VPC IDs to describe max_items: Maximum number of VPCs to return (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_vpcs API call Returns: List[Dict[str, Any]]: List of VPCs """ # Set up parameters params = {} if filters: params['Filters'] = filters if vpc_ids: params['VpcIds'] = vpc_ids # Add additional parameters params.update(kwargs) # Get paginator paginator = get_paginator('describe_vpcs', session_context=session_context) # Configure pagination page_config = {} # Get all VPCs all_vpcs = [] try: page_iterator = paginator.paginate(**params, PaginationConfig=page_config) for page in page_iterator: all_vpcs.extend(page.get('Vpcs', [])) except ClientError as e: logger.error(f"Error getting VPCs: {e}") raise return all_vpcs def filter_vpcs_by_text(vpcs: List[Dict[str, Any]], search_term: str) -> List[Dict[str, Any]]: """Filter a list of VPCs by search term. Args: vpcs: List of VPC dictionaries search_term: Term to search for in VPC fields Returns: List[Dict[str, Any]]: Filtered list of VPCs """ if not search_term: return vpcs search_term_lower = search_term.lower() filtered_vpcs = [] for vpc in vpcs: # Get relevant fields to search in vpc_id = vpc.get('VpcId', '').lower() cidr_block = vpc.get('CidrBlock', '').lower() # Check name tag and other tags tag_match = False for tag in vpc.get('Tags', []): tag_value = tag.get('Value', '').lower() if search_term_lower in tag_value: tag_match = True break # Check if any field matches the search term if (search_term_lower in vpc_id or search_term_lower in cidr_block or tag_match): filtered_vpcs.append(vpc) return filtered_vpcs def describe_route_tables( filters: Optional[List[Dict[str, Any]]] = None, route_table_ids: Optional[List[str]] = None, next_token: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe route tables with filtering options. Args: filters: List of filters to apply route_table_ids: List of route table IDs to describe next_token: Token for pagination max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_route_tables API call Returns: Dict[str, Any]: Response containing route tables """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if filters: params['Filters'] = filters if route_table_ids: params['RouteTableIds'] = route_table_ids if next_token: params['NextToken'] = next_token try: return client.describe_route_tables(**params) except ClientError as e: logger.error(f"Error describing route tables: {e}") raise def get_all_route_tables( filters: Optional[List[Dict[str, Any]]] = None, route_table_ids: Optional[List[str]] = None, max_items: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Get all route tables with pagination handling using boto3 paginators. Args: filters: List of filters to apply route_table_ids: List of route table IDs to describe max_items: Maximum number of route tables to return (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_route_tables API call Returns: List[Dict[str, Any]]: List of route tables """ # Set up parameters params = {} if filters: params['Filters'] = filters if route_table_ids: params['RouteTableIds'] = route_table_ids # Add additional parameters params.update(kwargs) # Get paginator paginator = get_paginator('describe_route_tables', session_context=session_context) # Configure pagination page_config = {} # Get all route tables all_route_tables = [] try: page_iterator = paginator.paginate(**params, PaginationConfig=page_config) for page in page_iterator: all_route_tables.extend(page.get('RouteTables', [])) except ClientError as e: logger.error(f"Error getting route tables: {e}") raise return all_route_tables def describe_images( image_ids: Optional[List[str]] = None, owners: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe AMIs with filtering options. Args: image_ids: List of AMI IDs to describe owners: List of AMI owners (e.g., 'self', 'amazon') filters: List of filters to apply next_token: Token for pagination session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_images API call Returns: Dict[str, Any]: Response containing AMIs """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if image_ids: params['ImageIds'] = image_ids if owners: params['Owners'] = owners if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token try: return client.describe_images(**params) except ClientError as e: logger.error(f"Error describing AMIs: {e}") raise def describe_volumes( volume_ids: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe EBS volumes with filtering options. Args: volume_ids: List of volume IDs to describe filters: List of filters to apply next_token: Token for pagination session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_volumes API call Returns: Dict[str, Any]: Response containing volumes """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if volume_ids: params['VolumeIds'] = volume_ids if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token try: return client.describe_volumes(**params) except ClientError as e: logger.error(f"Error describing EBS volumes: {e}") raise def describe_internet_gateways( internet_gateway_ids: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe internet gateways with filtering options. Args: internet_gateway_ids: List of internet gateway IDs to describe filters: List of filters to apply next_token: Token for pagination session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_internet_gateways API call Returns: Dict[str, Any]: Response containing internet gateways """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if internet_gateway_ids: params['InternetGatewayIds'] = internet_gateway_ids if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token try: return client.describe_internet_gateways(**params) except ClientError as e: logger.error(f"Error describing internet gateways: {e}") raise def describe_subnets( subnet_ids: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe subnets with filtering options. Args: subnet_ids: List of subnet IDs to describe filters: List of filters to apply next_token: Token for pagination max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_subnets API call Returns: Dict[str, Any]: Response containing subnets """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if subnet_ids: params['SubnetIds'] = subnet_ids if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token try: return client.describe_subnets(**params) except ClientError as e: logger.error(f"Error describing subnets: {e}") raise def get_all_subnets( filters: Optional[List[Dict[str, Any]]] = None, subnet_ids: Optional[List[str]] = None, max_items: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Get all subnets with pagination handling using boto3 paginators. Args: filters: List of filters to apply subnet_ids: List of subnet IDs to describe max_items: Maximum number of subnets to return (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_subnets API call Returns: List[Dict[str, Any]]: List of subnets """ # Set up parameters params = {} if filters: params['Filters'] = filters if subnet_ids: params['SubnetIds'] = subnet_ids # Add additional parameters params.update(kwargs) # Get paginator paginator = get_paginator('describe_subnets', session_context=session_context) # Configure pagination page_config = {} # Get all subnets all_subnets = [] try: page_iterator = paginator.paginate(**params, PaginationConfig=page_config) for page in page_iterator: all_subnets.extend(page.get('Subnets', [])) except ClientError as e: logger.error(f"Error getting subnets: {e}") raise return all_subnets def filter_subnets_by_text(subnets: List[Dict[str, Any]], search_term: str) -> List[Dict[str, Any]]: """Filter a list of subnets by search term. Args: subnets: List of subnet dictionaries search_term: Term to search for in subnet fields Returns: List[Dict[str, Any]]: Filtered list of subnets """ if not search_term: return subnets search_term_lower = search_term.lower() filtered_subnets = [] for subnet in subnets: # Get relevant fields to search in subnet_id = subnet.get('SubnetId', '').lower() vpc_id = subnet.get('VpcId', '').lower() cidr_block = subnet.get('CidrBlock', '').lower() availability_zone = subnet.get('AvailabilityZone', '').lower() # Check name tag and other tags tag_match = False for tag in subnet.get('Tags', []): tag_key = tag.get('Key', '').lower() tag_value = tag.get('Value', '').lower() if search_term_lower in tag_key or search_term_lower in tag_value: tag_match = True break # Check if any field matches the search term if (search_term_lower in subnet_id or search_term_lower in vpc_id or search_term_lower in cidr_block or search_term_lower in availability_zone or tag_match): filtered_subnets.append(subnet) return filtered_subnets def describe_network_acls( network_acl_ids: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: """Describe network ACLs with filtering options. Args: network_acl_ids: List of network ACL IDs to describe filters: List of filters to apply next_token: Token for pagination max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_network_acls API call Returns: Dict[str, Any]: Response containing network ACLs """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if network_acl_ids: params['NetworkAclIds'] = network_acl_ids if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token try: return client.describe_network_acls(**params) except ClientError as e: logger.error(f"Error describing network ACLs: {e}") raise def describe_addresses( allocation_ids: Optional[List[str]] = None, public_ips: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Describe Elastic IP addresses. Args: allocation_ids: List of allocation IDs to describe public_ips: List of public IP addresses to describe filters: List of filters to apply session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_addresses API call Returns: List of Elastic IP address data """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if allocation_ids: params['AllocationIds'] = allocation_ids if public_ips: params['PublicIps'] = public_ips if filters: params['Filters'] = filters try: response = client.describe_addresses(**params) return response.get('Addresses', []) except ClientError as e: logger.error(f"Error describing Elastic IP addresses: {e}") raise def describe_network_interfaces( network_interface_ids: Optional[List[str]] = None, filters: Optional[List[Dict[str, Any]]] = None, next_token: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Describe EC2 network interfaces. Args: network_interface_ids: List of network interface IDs to describe filters: List of filters to apply next_token: Token for pagination max_results: Maximum number of results to return per page (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_network_interfaces API call Returns: List of network interface data """ client = get_ec2_client(session_context=session_context) params = { **kwargs } if network_interface_ids: params['NetworkInterfaceIds'] = network_interface_ids if filters: params['Filters'] = filters if next_token: params['NextToken'] = next_token if max_results: params['MaxResults'] = max_results try: response = client.describe_network_interfaces(**params) return response.get('NetworkInterfaces', []) except ClientError as e: logger.error(f"Error describing network interfaces: {e}") raise def get_all_network_interfaces( filters: Optional[List[Dict[str, Any]]] = None, network_interface_ids: Optional[List[str]] = None, max_items: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Get all network interfaces with pagination handling using boto3 paginators. Args: filters: List of filters to apply network_interface_ids: List of network interface IDs to describe max_items: Maximum number of network interfaces to return (None for all) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the describe_network_interfaces API call Returns: List[Dict[str, Any]]: List of network interfaces """ # Set up parameters params = {} if filters: params['Filters'] = filters if network_interface_ids: params['NetworkInterfaceIds'] = network_interface_ids # Add additional parameters params.update(kwargs) # Get paginator paginator = get_paginator('describe_network_interfaces', session_context=session_context) # Configure pagination page_config = {} if max_items: page_config['MaxItems'] = max_items # Get all network interfaces all_interfaces = [] try: page_iterator = paginator.paginate(**params, PaginationConfig=page_config) for page in page_iterator: all_interfaces.extend(page.get('NetworkInterfaces', [])) except ClientError as e: logger.error(f"Error getting network interfaces: {e}") raise return all_interfaces

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server