Skip to main content
Glama

AWS Security MCP

ecs_formatter.py13.3 kB
"""Formatters for AWS ECS (Elastic Container Service) resources. This module provides security-focused formatting functions for AWS ECS resources to make them more suitable for API responses and LLM consumption by filtering out only security-relevant information. """ from typing import Dict, List, Any, Optional def format_ecs_service(service: Dict[str, Any]) -> Dict[str, Any]: """Format an ECS service into a security-focused representation. Args: service: The raw ECS service data dictionary Returns: Dictionary with only security-relevant service information """ # Extract network configuration (security-relevant) network_config = service.get('networkConfiguration', {}).get('awsvpcConfiguration', {}) security_groups = network_config.get('securityGroups', []) subnets = network_config.get('subnets', []) assign_public_ip = network_config.get('assignPublicIp', 'DISABLED') == 'ENABLED' # Extract IAM role information execution_role = service.get('executionRoleArn', None) task_role = service.get('taskDefinition', '').split('/')[-1] if service.get('taskDefinition') else None # Extract load balancer configuration (security-relevant) load_balancers = [] for lb in service.get('loadBalancers', []): load_balancers.append({ 'targetGroupArn': lb.get('targetGroupArn', ''), 'loadBalancerName': lb.get('loadBalancerName', ''), 'containerName': lb.get('containerName', ''), 'containerPort': lb.get('containerPort', 0) }) # Extract deployment configuration (partially security-relevant) deployment_config = service.get('deploymentConfiguration', {}) deployment_controller = service.get('deploymentController', {}).get('type', 'ECS') # Extract service discovery information (security-relevant for network exposure) service_discovery = [] for registry in service.get('serviceRegistries', []): service_discovery.append({ 'registryArn': registry.get('registryArn', ''), 'containerName': registry.get('containerName', ''), 'containerPort': registry.get('containerPort', 0) }) # Build the formatted output with only security-relevant information return { "service_name": service.get('serviceName', ''), "service_arn": service.get('serviceArn', ''), "cluster_arn": service.get('clusterArn', ''), "task_definition": task_role, "security": { "execution_role": execution_role, "network": { "vpc_enabled": bool(network_config), "assign_public_ip": assign_public_ip, "security_groups": security_groups, "subnet_count": len(subnets), "subnets": subnets }, "service_discovery": service_discovery, "load_balancers": load_balancers }, "deployment": { "controller_type": deployment_controller, "circuit_breaker_enabled": deployment_config.get('deploymentCircuitBreaker', {}).get('enable', False), "rollback_on_failure": deployment_config.get('deploymentCircuitBreaker', {}).get('rollback', False) }, "enable_execute_command": service.get('enableExecuteCommand', False) } def format_ecs_task_definition(task_definition: Dict[str, Any]) -> Dict[str, Any]: """Format an ECS task definition into a security-focused representation. Args: task_definition: The raw ECS task definition data dictionary Returns: Dictionary with only security-relevant task definition information """ # Extract container definitions with security focus secure_containers = [] for container in task_definition.get('containerDefinitions', []): # Get only security-relevant container properties secure_container = { "name": container.get('name', ''), "image": container.get('image', ''), "privileged": container.get('privileged', False), "user": container.get('user', 'None'), "readonly_rootfs": container.get('readonlyRootFilesystem', False), "secure_environment": bool(container.get('secrets', [])), "has_environment_vars": bool(container.get('environment', [])), "port_mappings_count": len(container.get('portMappings', [])), "mount_points_count": len(container.get('mountPoints', [])), "ulimits": bool(container.get('ulimits', [])), "linux_parameters": bool(container.get('linuxParameters', {})), "log_configuration": bool(container.get('logConfiguration', {})) } # If there are secrets, list them by name only (not values) if container.get('secrets'): secure_container['secrets'] = [ secret.get('name', '') for secret in container.get('secrets', []) ] # Include all environment variables with their actual values if container.get('environment'): env_vars = {} for env in container.get('environment', []): env_name = env.get('name', '') env_value = env.get('value', '') env_vars[env_name] = env_value secure_container['environment'] = env_vars secure_containers.append(secure_container) # Extract volumes with security focus secure_volumes = [] for volume in task_definition.get('volumes', []): secure_volume = { "name": volume.get('name', ''), "efs_volume": bool(volume.get('efsVolumeConfiguration', {})), "docker_volume": bool(volume.get('dockerVolumeConfiguration', {})), "host_path": volume.get('host', {}).get('sourcePath', None) } # Add EFS volume details if present if volume.get('efsVolumeConfiguration'): efs_config = volume.get('efsVolumeConfiguration', {}) secure_volume['efs'] = { "file_system_id": efs_config.get('fileSystemId', ''), "root_directory": efs_config.get('rootDirectory', '/'), "transit_encryption": efs_config.get('transitEncryption', 'DISABLED'), "iam_auth": efs_config.get('authorizationConfig', {}).get('iam', 'DISABLED') } secure_volumes.append(secure_volume) # Build the formatted output with only security-relevant information return { "family": task_definition.get('family', ''), "revision": task_definition.get('revision', 0), "task_definition_arn": task_definition.get('taskDefinitionArn', ''), "security": { "task_role_arn": task_definition.get('taskRoleArn', 'None'), "execution_role_arn": task_definition.get('executionRoleArn', 'None'), "network_mode": task_definition.get('networkMode', 'bridge'), "pid_mode": task_definition.get('pidMode', 'host') if task_definition.get('pidMode') else 'None', "ipc_mode": task_definition.get('ipcMode', 'host') if task_definition.get('ipcMode') else 'None', "containers": secure_containers, "volumes": secure_volumes }, "requires_compatibilities": task_definition.get('requiresCompatibilities', []), "cpu": task_definition.get('cpu', 'None'), "memory": task_definition.get('memory', 'None'), "status": task_definition.get('status', '') } def format_ecs_task(task: Dict[str, Any]) -> Dict[str, Any]: """Format an ECS task into a security-focused representation. Args: task: The raw ECS task data dictionary Returns: Dictionary with only security-relevant task information """ # Extract attachments (like ENIs) with security focus attachments = [] for attachment in task.get('attachments', []): if attachment.get('type') == 'ElasticNetworkInterface': eni_details = {} for detail in attachment.get('details', []): eni_details[detail.get('name')] = detail.get('value') attachments.append({ "id": attachment.get('id', ''), "type": attachment.get('type', ''), "status": attachment.get('status', ''), "subnet_id": eni_details.get('subnetId', ''), "security_groups": [eni_details.get('networkInterfaceSecurityGroups', '')], "private_ipv4_address": eni_details.get('privateIPv4Address', ''), "ipv6_address": eni_details.get('privateIPv6Address', '') }) # Extract container details with security focus containers = [] for container in task.get('containers', []): containers.append({ "name": container.get('name', ''), "image": container.get('image', ''), "image_digest": container.get('imageDigest', '').split(':')[-1] if container.get('imageDigest') else '', "last_status": container.get('lastStatus', ''), "exit_code": container.get('exitCode', None), "reason": container.get('reason', ''), "health_status": container.get('healthStatus', 'UNKNOWN') }) # Build the formatted output with only security-relevant information return { "task_arn": task.get('taskArn', ''), "cluster_arn": task.get('clusterArn', ''), "task_definition_arn": task.get('taskDefinitionArn', ''), "security": { "enableExecuteCommand": task.get('enableExecuteCommand', False), "group": task.get('group', ''), "launch_type": task.get('launchType', ''), "network_interfaces": attachments, "containers": containers, "platform_version": task.get('platformVersion', '') }, "started_by": task.get('startedBy', ''), "version": task.get('version', 0), "connectivity": task.get('connectivity', ''), "last_status": task.get('lastStatus', '') } def format_ecs_container_instance(container_instance: Dict[str, Any]) -> Dict[str, Any]: """Format an ECS container instance into a security-focused representation. Args: container_instance: The raw ECS container instance data dictionary Returns: Dictionary with only security-relevant container instance information """ # Extract instance attributes with security focus attributes = {} for attr in container_instance.get('attributes', []): # Only include security-relevant attributes attr_name = attr.get('name', '') if any(keyword in attr_name.lower() for keyword in ['ecs.capability', 'ecs.vpc', 'ecs.subnet', 'ecs.security-group', 'ecs.instance-type', 'ecs.ami-id', 'ecs.instance-role']): attributes[attr_name] = attr.get('value', '') # Build the formatted output with only security-relevant information return { "container_instance_arn": container_instance.get('containerInstanceArn', ''), "ec2_instance_id": container_instance.get('ec2InstanceId', ''), "security": { "status": container_instance.get('status', ''), "agent_connected": container_instance.get('agentConnected', False), "agent_version": container_instance.get('versionInfo', {}).get('agentVersion', ''), "security_attributes": attributes }, "running_tasks_count": container_instance.get('runningTasksCount', 0), "pending_tasks_count": container_instance.get('pendingTasksCount', 0) } def format_ecs_cluster(cluster: Dict[str, Any]) -> Dict[str, Any]: """Format an ECS cluster into a security-focused representation. Args: cluster: The raw ECS cluster data dictionary Returns: Dictionary with only security-relevant cluster information """ # Extract settings with security focus settings = {} for setting in cluster.get('settings', []): # Only include security-relevant settings setting_name = setting.get('name', '') if setting_name in ['containerInsights', 'fargateEphemeralStorageKMSKey']: settings[setting_name] = setting.get('value', '') # Build the formatted output with only security-relevant information return { "cluster_name": cluster.get('clusterName', ''), "cluster_arn": cluster.get('clusterArn', ''), "security": { "status": cluster.get('status', ''), "security_settings": settings, "capacity_providers": cluster.get('capacityProviders', []), "default_capacity_provider_strategy": cluster.get('defaultCapacityProviderStrategy', []) }, "active_services_count": cluster.get('activeServicesCount', 0), "running_tasks_count": cluster.get('runningTasksCount', 0), "pending_tasks_count": cluster.get('pendingTasksCount', 0), "registered_container_instances_count": cluster.get('registeredContainerInstancesCount', 0) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server