Skip to main content
Glama

AWS Security MCP

main.py22.3 kB
"""Entry point for AWS Security MCP server.""" import importlib import logging import sys import signal from typing import Any, Dict, List, Optional try: from fastapi import FastAPI import uvicorn except ImportError: print("ERROR: Missing required dependencies.") print("Please install required packages using:") print(" uv pip install -r requirements.txt") sys.exit(1) try: from mcp.server.fastmcp import FastMCP from mcp.server import Server # For SSE transport except ImportError: print("ERROR: Missing MCP package required for Claude Desktop integration.") print("Please install the MCP package using:") print(" uv pip install mcp>=1.0.0") sys.exit(1) # SSE transport imports try: from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Route, Mount from starlette.responses import JSONResponse, RedirectResponse SSE_AVAILABLE = True except ImportError: SSE_AVAILABLE = False from aws_security_mcp.config import config from aws_security_mcp.tools import get_all_tools from aws_security_mcp.services.base import clear_client_cache # Configure logging logging.basicConfig( level=getattr(logging, config.server.log_level.upper()), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) # Create MCP server mcp = FastMCP("aws-security") # Global flag for graceful shutdown _shutdown_flag = False def signal_handler(signum, frame): """Handle shutdown signals gracefully.""" global _shutdown_flag logger.info(f"Received signal {signum}, initiating graceful shutdown...") _shutdown_flag = True cleanup_resources() sys.exit(0) # Register signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) async def validate_aws_credentials() -> Dict[str, Any]: """Validate that basic AWS credentials are working. Returns: Dict with validation results """ if not config.server.startup_quiet: logger.info("Validating AWS credentials...") try: from aws_security_mcp.services.base import get_client # Test basic STS access sts_client = get_client('sts') identity = sts_client.get_caller_identity() if not config.server.startup_quiet: logger.info("AWS credentials validated successfully") logger.debug(f"Identity: Account={identity['Account']}, ARN={identity['Arn']}") return { "success": True, "identity": identity, "account_id": identity['Account'], "arn": identity['Arn'] } except Exception as e: logger.error(f"AWS credential validation failed: {e}") return { "success": False, "error": str(e) } async def initialize_cross_account_sessions() -> Dict[str, Any]: """Initialize cross-account sessions if auto-setup is enabled. Returns: Dict with session initialization results """ if not config.cross_account.auto_setup_on_startup: if not config.server.startup_quiet: logger.info("Cross-account auto-setup disabled, skipping session initialization") return { "success": True, "sessions_created": 0, "accounts_processed": 0, "message": "Auto-setup disabled" } if not config.server.startup_quiet: logger.info("Initializing cross-account credential sessions...") try: # Import the credentials service from aws_security_mcp.services import credentials # Set up cross-account sessions result = await credentials.setup_cross_account_sessions() if result.get("success"): sessions_created = result.get("sessions_created", 0) sessions_failed = result.get("sessions_failed", 0) accounts_processed = result.get("accounts_processed", 0) if not config.server.startup_quiet: if sessions_created > 0: logger.info(f"Multi-account access enabled for {sessions_created} accounts") if sessions_failed > 0: logger.warning(f"Failed to access {sessions_failed} accounts - check role permissions") # Always log debug details logger.debug(f"Cross-account session initialization complete:") logger.debug(f" Accounts processed: {accounts_processed}") logger.debug(f" Sessions created: {sessions_created}") logger.debug(f" Sessions failed: {sessions_failed}") return result else: error = result.get("error", "Unknown error") logger.warning(f"Cross-account session initialization failed: {error}") if not config.server.startup_quiet: logger.info("You can still set up sessions manually using credentials_security_operations") return result except Exception as e: logger.error(f"Error during cross-account session initialization: {e}") if not config.server.startup_quiet: logger.info("Cross-account access will not be available until sessions are set up manually") return { "success": False, "error": str(e), "sessions_created": 0, "accounts_processed": 0 } async def setup_aws_environment() -> Dict[str, Any]: """Set up AWS environment by validating credentials and initializing sessions. Returns: Dict with setup results and session information """ if not config.server.startup_quiet: logger.info("Setting up AWS environment...") # Step 1: Validate basic AWS credentials credential_validation = await validate_aws_credentials() if not credential_validation.get("success"): return { "success": False, "error": f"AWS credential validation failed: {credential_validation.get('error')}", "credentials_valid": False, "sessions_available": False } # Step 2: Initialize cross-account sessions session_result = await initialize_cross_account_sessions() sessions_created = session_result.get("sessions_created", 0) # Determine success criteria aws_setup_success = credential_validation.get("success", False) multi_account_available = session_result.get("success", False) and sessions_created > 0 return { "success": aws_setup_success, "credentials_valid": credential_validation.get("success", False), "account_id": credential_validation.get("account_id"), "arn": credential_validation.get("arn"), "sessions_available": multi_account_available, "sessions_created": sessions_created, "accounts_processed": session_result.get("accounts_processed", 0), "session_setup_success": session_result.get("success", False) } def register_tools_conditionally(aws_setup_result: Dict[str, Any]) -> None: """Register MCP tools conditionally based on AWS environment setup. Args: aws_setup_result: Results from AWS environment setup """ from aws_security_mcp.tools.registry import should_register_tool credentials_valid = aws_setup_result.get("credentials_valid", False) sessions_available = aws_setup_result.get("sessions_available", False) if not config.server.startup_quiet: logger.info("Registering MCP tools...") logger.debug(f"AWS credentials valid: {credentials_valid}") logger.debug(f"Multi-account sessions available: {sessions_available}") if not credentials_valid: logger.error("Cannot register tools - AWS credentials are invalid") return # List of tool modules to import tool_modules = [ # Always needed "aws_security_mcp.tools.credentials_tools", "aws_security_mcp.tools.wrappers.credentials_wrapper", # Core service modules (require basic AWS access) "aws_security_mcp.tools.guardduty_tools", "aws_security_mcp.tools.securityhub_tools", "aws_security_mcp.tools.access_analyzer_tools", "aws_security_mcp.tools.iam_tools", "aws_security_mcp.tools.ec2_tools", "aws_security_mcp.tools.load_balancer_tools", "aws_security_mcp.tools.cloudfront_tools", "aws_security_mcp.tools.route53_tools", "aws_security_mcp.tools.lambda_tools", "aws_security_mcp.tools.s3_tools", "aws_security_mcp.tools.waf_tools", "aws_security_mcp.tools.shield_tools", "aws_security_mcp.tools.resource_tagging_tools", "aws_security_mcp.tools.trusted_advisor_tools", "aws_security_mcp.tools.ecr_tools", "aws_security_mcp.tools.ecs_tools", "aws_security_mcp.tools.org_tools", # Service wrapper modules "aws_security_mcp.tools.wrappers.guardduty_wrapper", "aws_security_mcp.tools.wrappers.ec2_wrapper", "aws_security_mcp.tools.wrappers.load_balancer_wrapper", "aws_security_mcp.tools.wrappers.cloudfront_wrapper", "aws_security_mcp.tools.wrappers.ecs_wrapper", "aws_security_mcp.tools.wrappers.ecr_wrapper", "aws_security_mcp.tools.wrappers.iam_wrapper", "aws_security_mcp.tools.wrappers.lambda_wrapper", "aws_security_mcp.tools.wrappers.access_analyzer_wrapper", "aws_security_mcp.tools.wrappers.resource_tagging_wrapper", "aws_security_mcp.tools.wrappers.org_wrapper", "aws_security_mcp.tools.wrappers.s3_wrapper", "aws_security_mcp.tools.wrappers.route53_wrapper", "aws_security_mcp.tools.wrappers.securityhub_wrapper", "aws_security_mcp.tools.wrappers.shield_wrapper", "aws_security_mcp.tools.wrappers.waf_wrapper", "aws_security_mcp.tools.wrappers.trusted_advisor_wrapper", ] # Import tool modules imported_count = 0 for module_name in tool_modules: try: importlib.import_module(module_name) logger.debug(f"Imported tools from {module_name}") imported_count += 1 except ImportError as e: logger.warning(f"Could not import {module_name}: {e}") logger.debug(f"Imported {imported_count}/{len(tool_modules)} tool modules") # Get all available tools all_tools = get_all_tools() logger.debug(f"Total available tools: {len(all_tools)}") # Register tools conditionally registered_count = 0 excluded_count = 0 safe_tools_count = 0 for tool_name, tool_func in all_tools.items(): should_register = should_register_tool(tool_name) # Always register safe credential tools if tool_name in ["refresh_aws_session", "connected_aws_accounts", "aws_session_operations", "discover_aws_session_operations"]: if should_register: logger.debug(f"Registering safe credential tool: {tool_name}") mcp.tool(name=tool_name)(tool_func) registered_count += 1 safe_tools_count += 1 continue # Register other tools based on registry and credential status if should_register: logger.debug(f"Registering tool: {tool_name}") mcp.tool(name=tool_name)(tool_func) registered_count += 1 else: logger.debug(f"Excluding tool: {tool_name}") excluded_count += 1 # Log registration statistics if not config.server.startup_quiet: logger.info(f"Tool registration complete: {registered_count} tools registered") if sessions_available: logger.info(f"Multi-account tools available (sessions: {aws_setup_result.get('sessions_created', 0)})") else: logger.debug("Multi-account sessions not available - some tools may have limited functionality") # Always log debug statistics logger.debug(f"Tool Registration Summary:") logger.debug(f" Registered: {registered_count}") logger.debug(f" Safe credential tools: {safe_tools_count}") logger.debug(f" Excluded: {excluded_count}") logger.debug(f" Tool reduction: {len(all_tools)} → {registered_count}") # For FastAPI HTTP server mode (not used with Claude Desktop but kept for reference) app = FastAPI( title="AWS CloudSecurity MCP", description="MCP Server to inspect everything related to AWS Cloud Security!", version="0.1.0", ) @app.get("/") async def root(): """Root endpoint.""" return {"message": "AWS Security MCP is running"} @app.get("/health") async def health(): """Health check endpoint.""" return {"status": "healthy", "service": "aws-security-mcp"} @app.get("/tools") async def list_tools(): """List all available MCP tools.""" try: # Try different possible attributes for registered tools if hasattr(mcp, 'registered_tools'): tools = list(mcp.registered_tools.keys()) elif hasattr(mcp, '_tools'): tools = list(mcp._tools.keys()) elif hasattr(mcp, 'tools'): tools = list(mcp.tools.keys()) else: # Fallback to getting tools from the tools module from aws_security_mcp.tools import get_all_tools all_tools = get_all_tools() tools = list(all_tools.keys()) return { "tools": tools, "total_count": len(tools), "message": "Available MCP tools" } except Exception as e: # Return a safe response if there's any error return { "tools": [], "total_count": 0, "error": str(e), "message": "Unable to retrieve tools list" } def cleanup_resources() -> None: """Clean up AWS client resources.""" try: clear_client_cache() logger.info("Cleaned up AWS client cache") except Exception as e: logger.error(f"Error during cleanup: {e}") def run_sse_server() -> None: """Run the MCP server in SSE mode using FastMCP's built-in SSE support.""" if not SSE_AVAILABLE: logger.error("SSE transport dependencies not available. Please install starlette>=0.27.0") sys.exit(1) try: if not config.server.startup_quiet: logger.info("Starting AWS Security MCP SSE Server...") # Set up AWS environment and register tools conditionally import asyncio try: aws_setup_result = asyncio.run(setup_aws_environment()) register_tools_conditionally(aws_setup_result) if not aws_setup_result.get("success"): logger.error("AWS environment setup failed. Server will start with limited functionality.") elif not config.server.startup_quiet: # Show a clean startup summary sessions_count = aws_setup_result.get('sessions_created', 0) if sessions_count > 0: logger.info(f"AWS Security MCP ready: {sessions_count} accounts accessible") else: logger.info("AWS Security MCP ready: Single account mode") except Exception as e: logger.error(f"Could not set up AWS environment: {e}") if not config.server.startup_quiet: logger.info("Starting server without AWS tools...") # Create SSE app with health endpoint from starlette.applications import Starlette from starlette.routing import Route, Mount from starlette.responses import JSONResponse async def health_check(request): """Health check endpoint for ECS/ALB health checks.""" return JSONResponse({"status": "healthy", "service": "aws-security-mcp"}) # Get the base SSE app from FastMCP sse_app = mcp.sse_app() # Create a new Starlette app that includes both SSE and health endpoints app = Starlette( routes=[ Route("/health", health_check, methods=["GET"]), Mount("/", sse_app), ] ) if not config.server.startup_quiet: logger.info("SSE endpoint available at: /sse") logger.info("Health check available at: /health") logger.info(f"Use: npx @modelcontextprotocol/inspector http://127.0.0.1:8000/sse") logger.debug("Note: Load balancer should be configured to not redirect /sse to /sse/") # Run the combined app with uvicorn import uvicorn uvicorn.run( app, host="0.0.0.0", port=8000, log_level="warning" if config.server.minimal_logging else config.server.log_level, access_log=not config.server.minimal_logging ) except KeyboardInterrupt: logger.info("SSE server shutdown requested") except Exception as e: logger.error(f"SSE server error: {e}") import traceback logger.error(f"SSE server traceback: {traceback.format_exc()}") finally: cleanup_resources() def run_http_app() -> None: """Run the MCP server in HTTP mode.""" try: # Set up AWS environment and register tools conditionally import asyncio try: aws_setup_result = asyncio.run(setup_aws_environment()) register_tools_conditionally(aws_setup_result) if not aws_setup_result.get("success"): logger.error("AWS environment setup failed. Server will start with limited functionality.") elif not config.server.startup_quiet: # Show a clean startup summary sessions_count = aws_setup_result.get('sessions_created', 0) if sessions_count > 0: logger.info(f"AWS Security MCP ready: {sessions_count} accounts accessible") else: logger.info("AWS Security MCP ready: Single account mode") except Exception as e: logger.error(f"Could not set up AWS environment: {e}") if not config.server.startup_quiet: logger.info("Starting server without AWS tools...") # Start the HTTP server uvicorn.run( "aws_security_mcp.main:app", host="0.0.0.0", port=8000, reload=config.server.debug, log_level="warning" if config.server.minimal_logging else config.server.log_level, access_log=not config.server.minimal_logging ) except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {e}") finally: cleanup_resources() def run_mcp_stdio() -> None: """Run the MCP server in stdio mode for Claude Desktop.""" try: if not config.server.startup_quiet: logger.info("Starting MCP server...") # Set up AWS environment and register tools conditionally import asyncio try: aws_setup_result = asyncio.run(setup_aws_environment()) register_tools_conditionally(aws_setup_result) if not aws_setup_result.get("success"): logger.error("AWS environment setup failed. Server will start with limited functionality.") elif not config.server.startup_quiet: # Show a clean startup summary sessions_count = aws_setup_result.get('sessions_created', 0) if sessions_count > 0: logger.info(f"AWS Security MCP ready: {sessions_count} accounts accessible") else: logger.info("AWS Security MCP ready: Single account mode") except Exception as e: logger.error(f"Could not set up AWS environment: {e}") if not config.server.startup_quiet: logger.info("Starting server without AWS tools...") # Run MCP server with stdio transport (required for Claude Desktop) mcp.run(transport='stdio') except KeyboardInterrupt: logger.info("Server shutdown requested via keyboard interrupt") except (BrokenPipeError, ConnectionResetError) as e: logger.warning(f"Client disconnected unexpectedly: {e}") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) # For anyio.BrokenResourceError, log but don't crash if "BrokenResourceError" in str(type(e)): logger.error("Stream broken - client likely disconnected") finally: # Clean up resources cleanup_resources() def print_usage(): """Print usage information.""" print("AWS Security MCP Server") print("Usage: python aws_security_mcp/main.py [mode]") print("") print("Modes:") print(" stdio - Standard I/O transport (default, for Claude Desktop)") print(" http - HTTP REST API server") print(" sse - Server-Sent Events transport (MCP over HTTP)") print("") print("Examples:") print(" python aws_security_mcp/main.py stdio # Claude Desktop") print(" python aws_security_mcp/main.py http # REST API on port 8000") print(" python aws_security_mcp/main.py sse # SSE on port 8001") if __name__ == "__main__": # Check for mode argument if len(sys.argv) > 1: mode = sys.argv[1].lower() if mode in ["help", "-h", "--help"]: print_usage() sys.exit(0) elif mode == "sse": run_sse_server() elif mode == "http": run_http_app() elif mode == "stdio": run_mcp_stdio() else: print(f"Error: Unknown mode '{mode}'") print("") print_usage() sys.exit(1) else: # Default to stdio for Claude Desktop compatibility run_mcp_stdio()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server