Skip to main content
Glama

AWS Security MCP

lambda_tools.py30.3 kB
"""Lambda tools for AWS Security MCP.""" import json import logging from datetime import datetime from typing import Dict, List, Optional, Union from aws_security_mcp.services import lambda_service from aws_security_mcp.tools import register_tool from aws_security_mcp.utils.policy_evaluator import evaluate_policy_conditions # Import lambda formatters from the correct module from aws_security_mcp.formatters.lambda_formatter import ( format_lambda_function_json, format_lambda_functions_summary_json, format_lambda_alias_json, format_lambda_event_source_mapping_json, format_lambda_layer_json, format_function_url_config_json, format_function_url_discrepancy_json ) # Configure logging logger = logging.getLogger(__name__) @register_tool() async def list_functions(region: Optional[str] = None, search_term: str = "", next_token: Optional[str] = None, session_context: Optional[str] = None) -> str: """List Lambda functions in the AWS account. Args: region: Optional region to filter functions search_term: Optional search term to filter functions by name next_token: Pagination token for fetching the next set of functions (optional) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with Lambda functions """ logger.info(f"Listing Lambda functions in region {region if region else 'all regions'} (search_term='{search_term}', next_token={next_token})") try: # Create client kwargs dictionary client_kwargs = {} if region: client_kwargs['region_name'] = region if session_context: client_kwargs['session_context'] = session_context # Get functions with optional region - now automatically includes all functions with pagination functions_response = lambda_service.get_all_functions( search_term=search_term, marker=next_token, **client_kwargs ) functions = functions_response['functions'] marker = functions_response['marker'] if not functions: return json.dumps({ "count": 0, "summary": f"No Lambda functions found{' matching ' + search_term if search_term else ''}", "functions": [], "pagination": { "next_token": None, "is_truncated": False } }) # Format each function using the JSON formatter formatted_functions = [format_lambda_function_json(function) for function in functions] # Create summary information summary_data = format_lambda_functions_summary_json(functions) # Prepare response formatted_data = { "count": len(functions), "summary": summary_data, "functions": formatted_functions, "pagination": { "next_token": marker, "is_truncated": marker is not None } } # Add region information if provided if region: formatted_data["region"] = region # Add search term if provided if search_term: formatted_data["search_term"] = search_term formatted_data["description"] = f"Found {len(functions)} Lambda function(s) matching '{search_term}'" return json.dumps(formatted_data, default=lambda o: o.isoformat() if hasattr(o, 'isoformat') else str(o)) except Exception as e: logger.error(f"Error listing Lambda functions: {e}") return json.dumps({ "error": { "message": f"Error listing Lambda functions: {str(e)}", "type": type(e).__name__ } }) @register_tool() async def get_function_details(function_name: Union[str, List[str]], session_context: Optional[str] = None) -> str: """Get detailed information about one or more Lambda functions. Args: function_name: Name/ARN of the Lambda function, or a list of function names/ARNs session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with Lambda function details Examples: # Single account (default) get_function_details("my-function") # Cross-account access get_function_details("my-function", session_context="123456789012_aws_dev") """ # Handle single function name vs list of names if isinstance(function_name, str): logger.info(f"Getting details for Lambda function: {function_name}") single_function = True function_names = [function_name] else: logger.info(f"Getting details for multiple Lambda functions: {function_name}") single_function = False function_names = function_name all_results = [] for fn_name in function_names: try: # Get function details function = lambda_service.get_function(fn_name, session_context=session_context) if not function: result = { "function_name": fn_name, "error": f"Lambda function '{fn_name}' not found" } all_results.append(result) continue # Extract function configuration config = function.get('Configuration', {}) # Use the JSON formatter for better structured output formatted_data = format_lambda_function_json(config) # Add code location if available code_info = function.get('Code', {}) location = code_info.get('Location') if location: formatted_data["code_location"] = location # Get policy and add to result policy_json = None try: policy = lambda_service.get_policy(fn_name, session_context=session_context) if policy: policy_document = policy.get('Policy', '{}') # Parse the policy try: policy_json = json.loads(policy_document) formatted_data["resource_policy"] = policy_json # Add detailed policy analysis policy_analysis = { "has_policy": True, "statement_count": 0, "statements_analyzed": [] } # Analyze each statement in the policy statements = policy_json.get('Statement', []) if isinstance(statements, dict): statements = [statements] policy_analysis["statement_count"] = len(statements) has_conditions = False has_function_url_permissions = False for statement in statements: statement_info = { "sid": statement.get('Sid', 'No SID'), "effect": statement.get('Effect', 'Unknown'), "principal": statement.get('Principal', {}), "action": statement.get('Action', []), "resource": statement.get('Resource', []), } # Check if statement relates to function URL action = statement.get('Action') invoke_url_action_found = False if isinstance(action, str) and 'lambda:InvokeFunctionUrl' in action: invoke_url_action_found = True has_function_url_permissions = True elif isinstance(action, list) and 'lambda:InvokeFunctionUrl' in action: invoke_url_action_found = True has_function_url_permissions = True statement_info["is_function_url_related"] = invoke_url_action_found # Analyze conditions in the statement conditions = statement.get('Condition', {}) if conditions: has_conditions = True # Use the policy_evaluator utility for condition analysis condition_analysis = evaluate_policy_conditions(statement) statement_info["conditions"] = condition_analysis statement_info["has_strong_restrictions"] = condition_analysis.get('restriction_level') == 'Strong' statement_info["potential_public_access"] = condition_analysis.get('potential_public_access', True) else: statement_info["has_conditions"] = False policy_analysis["statements_analyzed"].append(statement_info) policy_analysis["has_conditional_statements"] = has_conditions policy_analysis["has_function_url_permissions"] = has_function_url_permissions formatted_data["policy_analysis"] = policy_analysis except json.JSONDecodeError: formatted_data["resource_policy"] = { "error": "Failed to parse policy document", "raw_policy": policy_document } formatted_data["policy_analysis"] = { "has_policy": True, "error": "Failed to parse policy document for analysis" } else: formatted_data["policy_analysis"] = { "has_policy": False, "message": "No resource policy found for this function" } except Exception as policy_error: formatted_data["resource_policy"] = { "error": f"Error retrieving policy: {str(policy_error)}" } formatted_data["policy_analysis"] = { "has_policy": False, "error": f"Error retrieving policy: {str(policy_error)}" } # Function URL security analysis formatted_data["function_url_security"] = {} url_security = formatted_data["function_url_security"] try: # Get function URL configuration if available url_config = lambda_service.get_function_url_config(fn_name, session_context=session_context) if url_config: url_security["has_function_url"] = True url_security["function_url"] = url_config.get('FunctionUrl') url_security["auth_type"] = url_config.get('AuthType') # Format the full URL config formatted_data["function_url_config"] = format_function_url_config_json(url_config) # Check for potential discrepancies between function URL and policy discrepancy_check = lambda_service.check_function_url_discrepancy(fn_name, session_context=session_context) formatted_discrepancy = format_function_url_discrepancy_json(discrepancy_check) # Add detailed security assessment # Check security implications if url_config.get('AuthType') == 'NONE': url_security["security_level"] = "High" url_security["security_issue"] = True url_security["description"] = "Function URL is publicly accessible without authentication" url_security["recommendation"] = "Consider changing AuthType to AWS_IAM or implement custom authorization" else: url_security["security_level"] = "Low" url_security["security_issue"] = False url_security["description"] = "Function URL requires AWS IAM authentication" # Policy conditions analysis has_policy_conditions = False has_strong_conditions = False if discrepancy_check.get('policy_condition_details'): for condition in discrepancy_check.get('policy_condition_details', []): if condition.get('has_conditions'): has_policy_conditions = True if condition.get('restriction_level') == 'Strong': has_strong_conditions = True break url_security["has_policy_conditions"] = has_policy_conditions url_security["has_strong_conditions"] = has_strong_conditions if has_policy_conditions: url_security["policy_conditions"] = formatted_discrepancy.get("policy_conditions", {}) # Update security assessment if strong conditions exist if has_strong_conditions and url_config.get('AuthType') == 'NONE': url_security["security_level"] = "Medium" url_security["description"] += " (mitigated by strong policy conditions)" # Add discrepancy information if detected if discrepancy_check and discrepancy_check.get('discrepancy'): url_security["discrepancy_detected"] = True url_security["discrepancy_type"] = discrepancy_check.get('discrepancy_type') url_security["discrepancy_details"] = formatted_discrepancy if discrepancy_check.get('discrepancy_type') == 'MISSING_URL': url_security["security_level"] = "Medium" url_security["security_issue"] = True url_security["recommendation"] = "Remove unused function URL permissions from the resource policy" elif discrepancy_check.get('discrepancy_type') == 'MISSING_POLICY': if discrepancy_check.get('auth_type') == 'NONE': url_security["note"] = "Function URL is public but no explicit policy exists - this is normal with AuthType=NONE" else: url_security["security_level"] = "Low" url_security["recommendation"] = "Consider adding an explicit resource policy for the function URL" else: url_security["discrepancy_detected"] = False else: url_security["has_function_url"] = False url_security["description"] = "No function URL configured for this Lambda function" # Check if policy grants function URL permissions without URL configured if policy_json: statements = policy_json.get('Statement', []) has_function_url_permissions = False for statement in statements: action = statement.get('Action') if isinstance(action, str) and 'lambda:InvokeFunctionUrl' in action: has_function_url_permissions = True break elif isinstance(action, list) and 'lambda:InvokeFunctionUrl' in action: has_function_url_permissions = True break if has_function_url_permissions: url_security["security_issue"] = True url_security["security_level"] = "Low" url_security["description"] = "Policy grants function URL permissions, but no function URL is configured" url_security["recommendation"] = "Remove unused function URL permissions from resource policy" # Add consolidated security assessment security_assessment = { "security_level": url_security.get("security_level", "Low"), "issues": [], "recommendations": [] } # Collect identified issues if url_security.get("security_issue"): security_assessment["issues"].append(url_security.get("description", "")) # Add recommendation if present if url_security.get("recommendation"): security_assessment["recommendations"].append(url_security.get("recommendation")) formatted_data["security_assessment"] = security_assessment except Exception as url_error: url_security["error"] = f"Error analyzing function URL: {str(url_error)}" all_results.append(formatted_data) except Exception as e: logger.error(f"Error getting Lambda function details: {e}") all_results.append({ "function_name": fn_name, "error": { "message": f"Error getting Lambda function details: {str(e)}", "type": type(e).__name__ } }) # Return single result or list depending on input if single_function: return json.dumps(all_results[0], default=lambda o: o.isoformat() if hasattr(o, 'isoformat') else str(o)) else: return json.dumps(all_results, default=lambda o: o.isoformat() if hasattr(o, 'isoformat') else str(o)) @register_tool() async def get_function_policy(function_name: Union[str, List[str]], session_context: Optional[str] = None) -> str: """Get the resource policy for one or more Lambda functions. Args: function_name: Name or ARN of the Lambda function, or a list of function names/ARNs session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with Lambda function policies Examples: # Single account (default) get_function_policy("my-function") # Cross-account access get_function_policy("my-function", session_context="123456789012_aws_dev") """ # If a single function name is provided, convert to list for consistent processing if isinstance(function_name, str): function_names = [function_name] logger.info(f"Getting policy for Lambda function: {function_name}") else: function_names = function_name logger.info(f"Getting policies for {len(function_names)} Lambda functions") results = [] for fn_name in function_names: try: policy = lambda_service.get_policy(fn_name, session_context=session_context) result = { "functionName": fn_name, "result": "success" if policy else "no_policy", "policy": None } if policy: policy_document = policy.get('Policy', '{}') revision_id = policy.get('RevisionId', 'Unknown') # Parse and format the policy try: policy_json = json.loads(policy_document) result["policy"] = policy_json result["revision_id"] = revision_id except json.JSONDecodeError: result["result"] = "parse_error" result["error"] = "Invalid JSON in policy document" result["raw_policy"] = policy_document results.append(result) except Exception as e: logger.error(f"Error getting policy for Lambda function '{fn_name}': {e}") results.append({ "functionName": fn_name, "result": "error", "error": str(e), "error_type": type(e).__name__ }) # If only one function was requested, return just that result for backwards compatibility if len(results) == 1 and isinstance(function_name, str): return json.dumps(results[0], indent=2) # Otherwise return the array of results return json.dumps(results, indent=2) @register_tool() async def list_function_permissions(function_name: str, session_context: Optional[str] = None) -> str: """List permissions granted to invoke a Lambda function. Args: function_name: Name or ARN of the Lambda function session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with Lambda function permissions Examples: # Single account (default) list_function_permissions("my-function") # Cross-account access list_function_permissions("my-function", session_context="123456789012_aws_dev") """ logger.info(f"Listing permissions for Lambda function: {function_name}") try: policy = lambda_service.get_policy(function_name, session_context=session_context) if not policy or 'Policy' not in policy: return json.dumps({ "function_name": function_name, "count": 0, "permissions": [], "summary": f"No permissions found for Lambda function '{function_name}'" }) policy_document = policy.get('Policy', '{}') # Parse the policy try: policy_json = json.loads(policy_document) statements = policy_json.get('Statement', []) permissions = [] for statement in statements: sid = statement.get('Sid', '') effect = statement.get('Effect', 'Unknown') principal = statement.get('Principal', {}) action = statement.get('Action', 'Unknown') condition = statement.get('Condition', {}) # Format the principal for better readability principal_formatted = {} if isinstance(principal, dict): principal_formatted = principal else: principal_formatted = {"Value": str(principal)} permission = { "sid": sid, "effect": effect, "principal": principal_formatted, "action": action } if condition: permission["condition"] = condition permissions.append(permission) result = { "function_name": function_name, "count": len(permissions), "permissions": permissions, "summary": f"Found {len(permissions)} permission(s) for Lambda function '{function_name}'" } return json.dumps(result, default=lambda o: o.isoformat() if hasattr(o, 'isoformat') else str(o)) except json.JSONDecodeError: return json.dumps({ "error": { "message": f"Error parsing policy for Lambda function '{function_name}': Invalid JSON", "raw_policy": policy_document } }) except Exception as e: logger.error(f"Error listing Lambda function permissions: {e}") return json.dumps({ "error": { "message": f"Error listing permissions for Lambda function '{function_name}': {str(e)}", "type": type(e).__name__ } }) @register_tool() async def list_function_layers(function_name: str, session_context: Optional[str] = None) -> str: """List layers used by a Lambda function. Args: function_name: Name or ARN of the Lambda function session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Formatted string with Lambda function layers Examples: # Single account (default) list_function_layers("my-function") # Cross-account access list_function_layers("my-function", session_context="123456789012_aws_dev") """ logger.info(f"Listing layers for Lambda function: {function_name}") try: function = lambda_service.get_function(function_name, session_context=session_context) if not function: return f"Lambda function '{function_name}' not found" config = function.get('Configuration', {}) layers = config.get('Layers', []) if not layers: return f"No layers found for Lambda function '{function_name}'" results = [] for layer in layers: layer_arn = layer.get('Arn', 'Unknown') layer_name = layer_arn.split(':layer:')[1].split(':')[0] if ':layer:' in layer_arn else 'Unknown' layer_version = layer.get('Version', 'Unknown') code_size = layer.get('CodeSize', 0) # Format result result = f"Layer: {layer_name}\n" result += f"ARN: {layer_arn}\n" result += f"Version: {layer_version}\n" # Format code size if code_size: if code_size < 1024: result += f"Code Size: {code_size} B\n" elif code_size < 1024 * 1024: result += f"Code Size: {code_size / 1024:.2f} KB\n" else: result += f"Code Size: {code_size / (1024 * 1024):.2f} MB\n" results.append(result) return f"Layers for Lambda function '{function_name}':\n\n" + "\n\n".join(results) except Exception as e: logger.error(f"Error listing Lambda function layers: {e}") return f"Error listing layers for Lambda function '{function_name}': {str(e)}" @register_tool() async def list_invocations(function_name: str, limit: int = 10, session_context: Optional[str] = None) -> str: """Get recent invocations of a Lambda function from CloudWatch logs. Args: function_name: Name or ARN of the Lambda function limit: Maximum number of invocations to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Formatted string with recent Lambda function invocations Examples: # Single account (default) list_invocations("my-function", limit=5) # Cross-account access list_invocations("my-function", limit=5, session_context="123456789012_aws_dev") """ logger.info(f"Getting recent invocations for Lambda function: {function_name} (limit={limit})") try: invocations = lambda_service.get_recent_invocations(function_name, limit=limit, session_context=session_context) if not invocations: return f"No recent invocations found for Lambda function '{function_name}'" results = [] for invocation in invocations: timestamp = invocation.get('timestamp') request_id = invocation.get('request_id', 'Unknown') status = invocation.get('status', 'Unknown') duration = invocation.get('duration', 'Unknown') memory_used = invocation.get('memory_used', 'Unknown') # Format timestamp if isinstance(timestamp, datetime): formatted_time = timestamp.strftime('%Y-%m-%d %H:%M:%S') else: formatted_time = str(timestamp) # Format result result = f"Invocation Time: {formatted_time}\n" result += f"Request ID: {request_id}\n" result += f"Status: {status}\n" if duration != 'Unknown': result += f"Duration: {duration:.2f} ms\n" if memory_used != 'Unknown': result += f"Memory Used: {memory_used:.2f} MB\n" results.append(result) return f"Recent invocations for Lambda function '{function_name}':\n\n" + "\n\n".join(results) except Exception as e: logger.error(f"Error getting Lambda function invocations: {e}") return f"Error getting recent invocations for Lambda function '{function_name}': {str(e)}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server