Skip to main content
Glama

AWS Security MCP

lambda_formatter.py15.5 kB
"""Formatters for AWS Lambda resources. This module provides JSON-based formatting functions for AWS Lambda resources to make them more suitable for API responses and LLM consumption. """ from typing import Dict, List, Any, Optional from datetime import datetime def format_lambda_function_json(lambda_function: Dict[str, Any]) -> Dict[str, Any]: """Format a Lambda function into structured data for JSON output. Args: lambda_function: Lambda function data dictionary Returns: Dictionary with formatted Lambda function data """ # Format the last modified date last_modified = lambda_function.get('LastModified', '') if isinstance(last_modified, datetime): last_modified = last_modified.isoformat() # Calculate memory allocation memory_size = lambda_function.get('MemorySize', 0) memory_display = f"{memory_size} MB" # Calculate code size in readable format code_size = lambda_function.get('CodeSize', 0) code_size_kb = code_size / 1024 code_size_mb = code_size_kb / 1024 if code_size_mb >= 1: code_size_display = f"{code_size_mb:.2f} MB" else: code_size_display = f"{code_size_kb:.2f} KB" # Format the timeout timeout = lambda_function.get('Timeout', 0) timeout_display = f"{timeout} seconds" # Get runtime and architecture runtime = lambda_function.get('Runtime', '') architectures = lambda_function.get('Architectures', []) arch_display = ', '.join(architectures) if architectures else 'x86_64' # VPC Configuration vpc_config = lambda_function.get('VpcConfig', {}) vpc_enabled = bool(vpc_config.get('VpcId', '')) subnet_count = len(vpc_config.get('SubnetIds', [])) security_group_count = len(vpc_config.get('SecurityGroupIds', [])) # Check if function has any layers layers = lambda_function.get('Layers', []) layer_count = len(layers) layer_names = [layer.get('Arn', '').split(':')[-2] for layer in layers] return { "function_name": lambda_function.get('FunctionName', ''), "function_arn": lambda_function.get('FunctionArn', ''), "runtime": runtime, "role": lambda_function.get('Role', ''), "handler": lambda_function.get('Handler', ''), "code_size": code_size_display, "code_size_bytes": code_size, "timeout": timeout_display, "timeout_seconds": timeout, "memory": memory_display, "memory_mb": memory_size, "last_modified": last_modified, "description": lambda_function.get('Description', ''), "environment_variables": lambda_function.get('Environment', {}).get('Variables', {}), "architectures": arch_display, "state": lambda_function.get('State', ''), "last_update_status": lambda_function.get('LastUpdateStatus', ''), "vpc": { "enabled": vpc_enabled, "vpc_id": vpc_config.get('VpcId', ''), "subnet_count": subnet_count, "security_group_count": security_group_count }, "layers": { "count": layer_count, "names": layer_names }, "concurrency": { "reserved": lambda_function.get('ReservedConcurrentExecutions', 0) }, "tracing_config": lambda_function.get('TracingConfig', {}).get('Mode', 'PassThrough'), "tags": lambda_function.get('Tags', {}), "function_url": lambda_function.get('FunctionUrl', {}) } def format_lambda_alias_json(alias: Dict[str, Any]) -> Dict[str, Any]: """Format a Lambda alias into structured data for JSON output. Args: alias: Lambda alias data dictionary Returns: Dictionary with formatted Lambda alias data """ # Format routing config if present routing_config = None if 'RoutingConfig' in alias and 'AdditionalVersionWeights' in alias['RoutingConfig']: routing_config = { "additional_version_weights": alias['RoutingConfig']['AdditionalVersionWeights'] } return { "alias_name": alias.get('Name', ''), "alias_arn": alias.get('AliasArn', ''), "function_version": alias.get('FunctionVersion', ''), "description": alias.get('Description', ''), "routing_config": routing_config, "revision_id": alias.get('RevisionId', '') } def format_lambda_event_source_mapping_json(mapping: Dict[str, Any]) -> Dict[str, Any]: """Format a Lambda event source mapping into structured data for JSON output. Args: mapping: Lambda event source mapping data dictionary Returns: Dictionary with formatted Lambda event source mapping data """ # Format dates created_date = mapping.get('CreationDate') if isinstance(created_date, datetime): created_date = created_date.isoformat() last_modified = mapping.get('LastModified') if isinstance(last_modified, datetime): last_modified = last_modified.isoformat() return { "uuid": mapping.get('UUID', ''), "function_arn": mapping.get('FunctionArn', ''), "event_source_arn": mapping.get('EventSourceArn', ''), "enabled": mapping.get('State', '') == 'Enabled', "batch_size": mapping.get('BatchSize', 0), "starting_position": mapping.get('StartingPosition', ''), "starting_position_timestamp": mapping.get('StartingPositionTimestamp', ''), "last_processing_result": mapping.get('LastProcessingResult', ''), "state": mapping.get('State', ''), "state_transition_reason": mapping.get('StateTransitionReason', ''), "maximum_retry_attempts": mapping.get('MaximumRetryAttempts', 0), "maximum_record_age_in_seconds": mapping.get('MaximumRecordAgeInSeconds', 0), "parallelization_factor": mapping.get('ParallelizationFactor', 0), "topics": mapping.get('Topics', []), "queues": mapping.get('Queues', []), "source_access_configurations": mapping.get('SourceAccessConfigurations', []), "created_date": created_date, "last_modified": last_modified } def format_lambda_version_json(version: Dict[str, Any]) -> Dict[str, Any]: """Format a Lambda version into structured data for JSON output. Args: version: Lambda version data dictionary Returns: Dictionary with formatted Lambda version data """ # Format the last modified time last_modified = version.get('LastModified') if isinstance(last_modified, datetime): last_modified = last_modified.isoformat() return { "function_name": version.get('FunctionName', ''), "function_arn": version.get('FunctionArn', ''), "version": version.get('Version', ''), "description": version.get('Description', ''), "runtime": version.get('Runtime', ''), "code_size": version.get('CodeSize', 0), "timeout": version.get('Timeout', 0), "memory_size": version.get('MemorySize', 0), "last_modified": last_modified, "code_sha256": version.get('CodeSha256', ''), "architectures": version.get('Architectures', []) } def format_lambda_layer_json(layer: Dict[str, Any]) -> Dict[str, Any]: """Format a Lambda layer into structured data for JSON output. Args: layer: Lambda layer data dictionary Returns: Dictionary with formatted Lambda layer data """ # Format the layer versions layer_versions = [] for version in layer.get('LatestMatchingVersion', {}).get('LayerVersionArn', '').split(':'): if version.isdigit(): layer_versions.append(int(version)) # Format the creation date created_date = layer.get('LatestMatchingVersion', {}).get('CreatedDate') if isinstance(created_date, datetime): created_date = created_date.isoformat() return { "layer_name": layer.get('LayerName', ''), "layer_arn": layer.get('LayerArn', ''), "latest_version": max(layer_versions) if layer_versions else None, "description": layer.get('LatestMatchingVersion', {}).get('Description', ''), "created_date": created_date, "compatible_runtimes": layer.get('LatestMatchingVersion', {}).get('CompatibleRuntimes', []), "compatible_architectures": layer.get('LatestMatchingVersion', {}).get('CompatibleArchitectures', []), "license_info": layer.get('LatestMatchingVersion', {}).get('LicenseInfo', ''), "layer_version_arn": layer.get('LatestMatchingVersion', {}).get('LayerVersionArn', '') } def format_lambda_functions_summary_json(functions: List[Dict[str, Any]]) -> Dict[str, Any]: """Format a summary of Lambda functions into structured data for JSON output. Args: functions: List of Lambda function data dictionaries Returns: Dictionary with summary statistics for Lambda functions """ # Count runtimes runtimes = {} for function in functions: runtime = function.get('Runtime', 'Unknown') runtimes[runtime] = runtimes.get(runtime, 0) + 1 # Calculate total code size total_code_size = sum(function.get('CodeSize', 0) for function in functions) # Find memory usage statistics memory_sizes = [function.get('MemorySize', 0) for function in functions] avg_memory = sum(memory_sizes) / len(memory_sizes) if memory_sizes else 0 # VPC vs non-VPC functions vpc_functions = sum(1 for function in functions if function.get('VpcConfig', {}).get('VpcId')) # Function with environment variables with_env_vars = sum(1 for function in functions if function.get('Environment', {}).get('Variables')) return { "total_functions": len(functions), "runtimes": runtimes, "total_code_size_bytes": total_code_size, "avg_memory_size_mb": avg_memory, "vpc_functions": vpc_functions, "non_vpc_functions": len(functions) - vpc_functions, "with_environment_variables": with_env_vars, "without_environment_variables": len(functions) - with_env_vars } def format_function_url_config_json(url_config: Dict[str, Any]) -> Dict[str, Any]: """Format a Lambda function URL configuration into structured data for JSON output. Args: url_config: Lambda function URL configuration dictionary Returns: Dictionary with formatted Lambda function URL configuration data """ # Handle creation time creation_time = url_config.get('CreationTime') if isinstance(creation_time, datetime): creation_time = creation_time.isoformat() # Handle last modified time last_modified = url_config.get('LastModifiedTime') if isinstance(last_modified, datetime): last_modified = last_modified.isoformat() return { "function_url": url_config.get('FunctionUrl', ''), "function_arn": url_config.get('FunctionArn', ''), "auth_type": url_config.get('AuthType', ''), "cors": url_config.get('Cors', {}), "creation_time": creation_time, "last_modified": last_modified, "invoke_mode": url_config.get('InvokeMode', 'BUFFERED') } def format_function_url_discrepancy_json(discrepancy_check: Dict[str, Any]) -> Dict[str, Any]: """Format Lambda function URL discrepancy check results into structured data for JSON output. Args: discrepancy_check: Lambda function URL discrepancy check results Returns: Dictionary with formatted Lambda function URL discrepancy check results """ result = { "function_name": discrepancy_check.get('function_name', ''), "has_function_url": discrepancy_check.get('has_function_url', False), "has_function_url_policy": discrepancy_check.get('has_function_url_policy', False), "discrepancy_detected": discrepancy_check.get('discrepancy', False), "security_issue": False } # Add function URL if it exists if discrepancy_check.get('function_url'): result["function_url"] = discrepancy_check.get('function_url') # Add authentication type if available if discrepancy_check.get('auth_type'): result["auth_type"] = discrepancy_check.get('auth_type') # Mark as security issue if it's a public URL (NONE auth type) if discrepancy_check.get('auth_type') == 'NONE': result["security_issue"] = True # Add policy condition details if available policy_condition_details = discrepancy_check.get('policy_condition_details') if policy_condition_details: result["policy_conditions"] = { "has_conditions": any(condition.get('has_conditions', False) for condition in policy_condition_details), "details": [] } # Format each condition result for condition in policy_condition_details: if condition.get('has_conditions'): condition_info = { "types": condition.get('condition_types', []), "restriction_level": condition.get('restriction_level', 'None'), "potential_public_access": condition.get('potential_public_access', True) } # Add condition details if available if condition.get('details'): condition_info["details"] = condition.get('details') result["policy_conditions"]["details"].append(condition_info) # Update security assessment based on conditions if result["policy_conditions"]["has_conditions"]: # Check if any conditions provide strong restrictions has_strong_restrictions = any( condition.get('restriction_level') == 'Strong' for condition in policy_condition_details ) # Lower security issue severity if strong conditions exist if has_strong_restrictions and result["security_issue"]: result["security_level"] = "Low" result["condition_note"] = "Policy has strong conditional restrictions that may limit access" # Add discrepancy type if there's a discrepancy if discrepancy_check.get('discrepancy'): result["discrepancy_type"] = discrepancy_check.get('discrepancy_type') # Provide description based on discrepancy type if discrepancy_check.get('discrepancy_type') == 'MISSING_POLICY': result["description"] = "Function URL exists but no policy explicitly grants access to it" # If auth type is NONE, this isn't necessarily a security issue if discrepancy_check.get('auth_type') == 'NONE': result["security_level"] = "Info" else: result["security_level"] = "Low" elif discrepancy_check.get('discrepancy_type') == 'MISSING_URL': result["description"] = "Policy grants access to function URL but no URL is configured" result["security_level"] = "Medium" result["security_issue"] = True result["recommendation"] = "Remove unused function URL permissions from the resource policy" return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server