Skip to main content
Glama

AWS Security MCP

ecs_tools.py12.5 kB
"""MCP tools for AWS ECS (Elastic Container Service) security.""" import logging from typing import Any, Dict, List, Optional from aws_security_mcp.services import ecs from aws_security_mcp.tools import register_tool from aws_security_mcp.formatters.ecs_formatter import ( format_ecs_service, format_ecs_task_definition, format_ecs_task, format_ecs_container_instance, format_ecs_cluster ) logger = logging.getLogger(__name__) @register_tool() async def list_ecs_clusters(session_context: Optional[str] = None) -> Dict[str, Any]: """List all ECS clusters in the AWS account. This tool retrieves all ECS clusters and provides information about their configuration and security settings. Args: session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing ECS clusters information """ logger.info(f"Listing ECS clusters (session_context={session_context})") result = await ecs.get_clusters(session_context=session_context) if result["success"]: # Apply security-focused formatting to reduce response size formatted_clusters = [format_ecs_cluster(cluster) for cluster in result.get("clusters", [])] result["clusters"] = formatted_clusters return result @register_tool() async def list_ecs_task_definitions(family_prefix: Optional[str] = None, status: str = "ACTIVE", session_context: Optional[str] = None) -> Dict[str, Any]: """List ECS task definitions with their security configurations. This tool retrieves all task definitions and extracts security-relevant configurations such as IAM roles, secrets, volumes, and network settings. Args: family_prefix: Optional family name prefix to filter task definitions status: Task definition status to filter by (ACTIVE or INACTIVE) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing task definitions with security information """ logger.info(f"Listing ECS task definitions with status {status} (session_context={session_context})") result = await ecs.get_task_definitions(family_prefix, status, session_context=session_context) if result["success"]: # Apply security-focused formatting to reduce response size formatted_task_defs = [format_ecs_task_definition(task_def) for task_def in result.get("taskDefinitions", [])] result["taskDefinitions"] = formatted_task_defs return result @register_tool() async def get_ecs_task_definition(task_definition: str, cluster: Optional[str] = None, session_context: Optional[str] = None) -> Dict[str, Any]: """Get detailed information about the latest active ECS task definition with running tasks. This tool focuses on finding task definitions that are currently in use. When a family name is provided without a specific revision, it will find the latest active revision that has running tasks. If no running tasks are found, it falls back to the latest revision. Args: task_definition: The task definition family name, ARN, or family:revision cluster: Optional cluster name to check for running tasks (if not provided, checks all clusters) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing detailed task definition information with a flag indicating if it has running tasks """ logger.info(f"Getting active ECS task definition: {task_definition}" + (f" in cluster: {cluster}" if cluster else "") + f" (session_context={session_context})") result = await ecs.get_task_definition(task_definition, cluster, session_context=session_context) if result["success"] and result.get("taskDefinition"): # Apply security-focused formatting to reduce response size result["taskDefinition"] = format_ecs_task_definition(result["taskDefinition"]) return result @register_tool() async def list_ecs_services(cluster: str, session_context: Optional[str] = None) -> Dict[str, Any]: """List ECS services for a specific cluster with security details. This tool retrieves all services in the specified cluster and extracts security-relevant information such as network configuration, IAM roles, and load balancer settings. Args: cluster: The ECS cluster ARN or name session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing services with security information """ logger.info(f"Listing ECS services for cluster: {cluster} (session_context={session_context})") result = await ecs.get_services(cluster, session_context=session_context) if result["success"]: # Apply security-focused formatting to reduce response size formatted_services = [format_ecs_service(service) for service in result.get("services", [])] result["services"] = formatted_services return result @register_tool() async def list_ecs_tasks(cluster: str, service: Optional[str] = None, session_context: Optional[str] = None) -> Dict[str, Any]: """List ECS tasks for a specific cluster or service with security details. This tool retrieves all tasks in the specified cluster or service and extracts security-relevant information such as task IAM roles, network configuration, and container details. Args: cluster: The ECS cluster ARN or name service: Optional service name to filter tasks session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing tasks with security information """ logger.info(f"Listing ECS tasks for cluster: {cluster}" + (f" and service: {service}" if service else "") + f" (session_context={session_context})") result = await ecs.get_tasks(cluster, service, session_context=session_context) if result["success"]: # Apply security-focused formatting to reduce response size formatted_tasks = [format_ecs_task(task) for task in result.get("tasks", [])] result["tasks"] = formatted_tasks return result @register_tool() async def list_ecs_container_instances(cluster: str, session_context: Optional[str] = None) -> Dict[str, Any]: """List ECS container instances for a specific cluster with security details. This tool retrieves all container instances in the specified cluster and extracts security-relevant information such as EC2 instance details, agent status, and attached resources. Args: cluster: The ECS cluster ARN or name session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing container instances with security information """ logger.info(f"Listing ECS container instances for cluster: {cluster} (session_context={session_context})") result = await ecs.get_container_instances(cluster, session_context=session_context) if result["success"]: # Apply security-focused formatting to reduce response size formatted_instances = [format_ecs_container_instance(instance) for instance in result.get("containerInstances", [])] result["containerInstances"] = formatted_instances return result @register_tool() async def get_ecs_service(cluster: str, service: str, session_context: Optional[str] = None) -> Dict[str, Any]: """Get detailed information about a specific ECS service. This tool retrieves comprehensive details about a specified service in a cluster including its network configuration, IAM roles, and security settings. Args: cluster: The ECS cluster ARN or name service: The service ARN or name session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing detailed service information """ logger.info(f"Getting ECS service: {service} in cluster: {cluster} (session_context={session_context})") result = await ecs.get_service(cluster, service, session_context=session_context) if result["success"] and result.get("service"): # Apply security-focused formatting to reduce response size result["service"] = format_ecs_service(result["service"]) return result @register_tool() async def get_ecs_task(cluster: str, task: str, session_context: Optional[str] = None) -> Dict[str, Any]: """Get detailed information about a specific ECS task. This tool retrieves comprehensive details about a specified task in a cluster including its network interfaces, security groups, and container details. Args: cluster: The ECS cluster ARN or name task: The task ARN or ID session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing detailed task information """ logger.info(f"Getting ECS task: {task} in cluster: {cluster} (session_context={session_context})") result = await ecs.get_task(cluster, task, session_context=session_context) if result["success"] and result.get("task"): # Apply security-focused formatting to reduce response size result["task"] = format_ecs_task(result["task"]) return result @register_tool() async def search_ecs_services_by_ecr_repository(repository_name: str, max_clusters: int = 10, max_services_per_cluster: int = 50, session_context: Optional[str] = None) -> Dict[str, Any]: """Search for ECS services that use a specific ECR repository (optimized version). This tool searches through services and checks their task definitions only when needed. Includes limits to prevent timeouts in large environments. This is essential for security analysis to understand which services might be affected by vulnerabilities in a specific container image. Args: repository_name: Name of the ECR repository to search for (e.g., "test-repo") max_clusters: Maximum number of clusters to search (default: 10, helps prevent timeouts) max_services_per_cluster: Maximum services per cluster to check (default: 50, helps prevent timeouts) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing services using the specified ECR repository with security information """ logger.info(f"Searching for ECS services using ECR repository: {repository_name} (max_clusters={max_clusters}, max_services_per_cluster={max_services_per_cluster}) (session_context={session_context})") result = await ecs.search_services_by_ecr_repository(repository_name, max_clusters, max_services_per_cluster, session_context=session_context) if result["success"] and result.get("services_using_repository"): # Apply security-focused formatting to reduce response size formatted_services = [] for service in result["services_using_repository"]: # Keep all the essential security information but ensure consistent formatting formatted_service = { "service_name": service.get("service_name"), "service_arn": service.get("service_arn"), "cluster_name": service.get("cluster_name"), "cluster_arn": service.get("cluster_arn"), "task_definition": { "arn": service.get("task_definition_arn"), "family": service.get("task_definition_family"), "revision": service.get("task_definition_revision") }, "container": { "name": service.get("container_name"), "image_uri": service.get("image_uri") }, "status": service.get("status"), "running_count": service.get("running_count", 0), "desired_count": service.get("desired_count", 0), "created_at": service.get("created_at") } formatted_services.append(formatted_service) result["services_using_repository"] = formatted_services return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server