Skip to main content
Glama

AWS Security MCP

ecr_tools.py7.04 kB
"""ECR tools module for AWS Security MCP. This module provides tools for retrieving and analyzing ECR information for security assessment purposes. """ import json import logging from datetime import datetime from typing import Any, Dict, List, Optional from aws_security_mcp.services import ecr from aws_security_mcp.formatters.ecr import extract_repository_uris, extract_repository_names, format_repository_search_results from aws_security_mcp.tools import register_tool # Configure logging logger = logging.getLogger(__name__) @register_tool() async def list_ecr_repositories(session_context: Optional[str] = None) -> Dict[str, Any]: """List all ECR repositories in the AWS account. Args: session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing list of ECR repository names """ try: logger.info(f"Listing ECR repositories (session_context={session_context})") # Get repositories from the service result = await ecr.get_repositories(session_context=session_context) if not result.get("success", False): return { "error": result.get("error", "Unknown error"), "repository_names": [], "count": 0, "scan_timestamp": datetime.utcnow().isoformat() } # Extract only the repository names as requested repository_names = extract_repository_names(result.get("repositories", [])) return { "repository_names": repository_names, "count": len(repository_names), "scan_timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error listing ECR repositories: {str(e)}") return { "repository_names": [], "count": 0, "scan_timestamp": datetime.utcnow().isoformat(), "error": str(e) } @register_tool() async def get_ecr_repository_policy(repository_name: str, session_context: Optional[str] = None) -> Dict[str, Any]: """Get the IAM policy for an ECR repository. This tool retrieves the repository policy for the specified ECR repository. Args: repository_name: Name of the ECR repository session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing repository policy information """ logger.info(f"Getting policy for ECR repository: {repository_name} (session_context={session_context})") result = await ecr.get_repository_policy(repository_name, session_context=session_context) return result @register_tool() async def get_ecr_image_scan_findings(repository_name: str, image_tag: str = 'latest', session_context: Optional[str] = None) -> Dict[str, Any]: """Get vulnerability scan findings for a container image. This tool retrieves scan findings for the specified container image. Args: repository_name: Name of the ECR repository image_tag: Tag of the image to check, defaults to 'latest' session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing vulnerability scan findings information """ logger.info(f"Getting scan findings for ECR image: {repository_name}:{image_tag} (session_context={session_context})") result = await ecr.get_repository_scan_findings(repository_name, image_tag, session_context=session_context) return result @register_tool() async def get_ecr_repository_images(repository_name: str, session_context: Optional[str] = None) -> Dict[str, Any]: """Get information about all images in an ECR repository. This tool retrieves details about all container images in the specified repository. Args: repository_name: Name of the ECR repository session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing repository images information """ logger.info(f"Getting images for ECR repository: {repository_name} (session_context={session_context})") result = await ecr.get_repository_images(repository_name, session_context=session_context) return result @register_tool() async def search_ecr_repositories(repository_name: Optional[str] = None, repository_names: Optional[List[str]] = None, session_context: Optional[str] = None) -> Dict[str, Any]: """Search for ECR repositories and get detailed information. This tool allows searching for repositories by exact name match and returns detailed information about the matched repositories, including policy information and recent images. Args: repository_name: Optional single repository name to search for exactly repository_names: Optional list of repository names to search for exactly If neither parameter is provided, details for all repositories will be returned. session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Dict containing detailed information about matching repositories """ try: if repository_name: logger.info(f"Searching ECR repository with exact name: {repository_name} (session_context={session_context})") elif repository_names: logger.info(f"Searching ECR repositories with exact names: {', '.join(repository_names)} (session_context={session_context})") else: logger.info(f"Fetching details for all ECR repositories (session_context={session_context})") # Get repository search results from the service using exact name matching search_results = await ecr.search_repositories( repository_name=repository_name, repository_names=repository_names, session_context=session_context ) if not search_results.get("success", False): return { "error": search_results.get("error", "Unknown error"), "repositories": [], "count": 0, "search_term": repository_name or (', '.join(repository_names) if repository_names else 'ALL'), "scan_timestamp": datetime.utcnow().isoformat() } # Format the search results formatted_results = format_repository_search_results(search_results) return formatted_results except Exception as e: logger.error(f"Error searching ECR repositories: {str(e)}") return { "repositories": [], "count": 0, "search_term": repository_name or (', '.join(repository_names) if repository_names else 'ALL'), "scan_timestamp": datetime.utcnow().isoformat(), "error": str(e) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server