Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
server.py16 kB
import asyncio import sys from typing import Any, Dict, List, Optional import structlog from fastmcp import FastMCP from pydantic import BaseModel, Field from .aws.discovery import AWSResourceDiscovery from .config.settings import get_settings from .tools.data_sources import DataSourcesTool from .tools.guardduty_search import GuardDutySearchTool from .tools.ip_search import IPSearchTool from .tools.universal_search import UniversalSearchTool # Configure structured logging structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger(__name__) # Initialize MCP server mcp = FastMCP("Amazon Security Lake MCP Server") # Load settings settings = get_settings() # Initialize tools ip_search_tool = IPSearchTool(settings) guardduty_search_tool = GuardDutySearchTool(settings) data_sources_tool = DataSourcesTool(settings) universal_search_tool = UniversalSearchTool(settings) # Pydantic models for request validation class IPSearchRequest(BaseModel): ip_address: str = Field(..., description="IP address to search for") start_time: Optional[str] = Field(None, description="Start time for search (ISO format)") end_time: Optional[str] = Field(None, description="End time for search (ISO format)") sources: Optional[List[str]] = Field(None, description="Data sources to search in") limit: int = Field(100, description="Maximum number of results to return") class GuardDutySearchRequest(BaseModel): finding_id: Optional[str] = Field(None, description="Specific GuardDuty finding ID") severity: Optional[str] = Field(None, description="Severity level (Critical, High, Medium, Low)") finding_type: Optional[str] = Field(None, description="Type of finding to search for") start_time: Optional[str] = Field(None, description="Start time for search (ISO format)") end_time: Optional[str] = Field(None, description="End time for search (ISO format)") limit: int = Field(100, description="Maximum number of results to return") class DataSourcesRequest(BaseModel): include_schema: bool = Field(False, description="Include table schema information") class UniversalSearchRequest(BaseModel): query_type: str = Field(..., description="Type of search: 'findings', 'network', 'api_calls', 'ip_search'") filters: Dict[str, Any] = Field(..., description="Search filters (ip_address, severity, time_range, etc.)") data_sources: Optional[List[str]] = Field(None, description="Specific data sources to search") limit: int = Field(100, description="Maximum number of results to return") @mcp.tool() async def search_ip_addresses(request: IPSearchRequest) -> Dict[str, Any]: """ Search for IP addresses in Amazon Security Lake data. This tool searches for the specified IP address in both source and destination IP fields across Security Lake's OCSF-normalized data. It returns detailed information about network events, security findings, and associated metadata. Args: ip_address: The IP address to search for (IPv4 or IPv6) start_time: Start time for the search in ISO format (optional, defaults to 7 days ago) end_time: End time for the search in ISO format (optional, defaults to now) sources: List of data sources to search (optional, searches all by default) limit: Maximum number of results to return (default: 100, max: 1000) Returns: A dictionary containing: - success: Boolean indicating if the search was successful - results: List of matching events with enriched context - metadata: Query information and summary statistics - count: Number of results returned """ logger.info("IP search request received", ip_address=request.ip_address) try: result = await ip_search_tool.execute( ip_address=request.ip_address, start_time=request.start_time, end_time=request.end_time, sources=request.sources, limit=request.limit ) logger.info( "IP search completed", ip_address=request.ip_address, result_count=result.get("count", 0), success=result.get("success", False) ) return result except Exception as e: logger.error("IP search failed", ip_address=request.ip_address, error=str(e)) return { "success": False, "error": f"Search failed: {str(e)}", "results": [], "count": 0 } @mcp.tool() async def search_guardduty_findings(request: GuardDutySearchRequest) -> Dict[str, Any]: """ Search for GuardDuty findings in Amazon Security Lake data. This tool searches for GuardDuty security findings with various filtering options. It returns detailed information about threats, affected resources, and recommended remediation actions. Args: finding_id: Specific GuardDuty finding ID to search for (optional) severity: Filter by severity level - Critical, High, Medium, Low, Informational (optional) finding_type: Filter by finding type/category (optional) start_time: Start time for the search in ISO format (optional, defaults to 7 days ago) end_time: End time for the search in ISO format (optional, defaults to now) limit: Maximum number of results to return (default: 100, max: 1000) Returns: A dictionary containing: - success: Boolean indicating if the search was successful - results: List of matching findings with risk assessment and context - metadata: Query information, summary statistics, and recommendations - count: Number of results returned """ logger.info( "GuardDuty search request received", finding_id=request.finding_id, severity=request.severity, finding_type=request.finding_type ) try: result = await guardduty_search_tool.execute( finding_id=request.finding_id, severity=request.severity, finding_type=request.finding_type, start_time=request.start_time, end_time=request.end_time, limit=request.limit ) logger.info( "GuardDuty search completed", finding_id=request.finding_id, result_count=result.get("count", 0), success=result.get("success", False) ) return result except Exception as e: logger.error( "GuardDuty search failed", finding_id=request.finding_id, error=str(e) ) return { "success": False, "error": f"Search failed: {str(e)}", "results": [], "count": 0 } @mcp.tool() async def list_data_sources(request: DataSourcesRequest) -> Dict[str, Any]: """ List available data sources in Amazon Security Lake. This tool provides information about available Security Lake data sources, including table metadata, OCSF compliance, data freshness, and schema details. Args: include_schema: Whether to include detailed schema information for each table (optional) Returns: A dictionary containing: - success: Boolean indicating if the operation was successful - results: List of available data sources with metadata - metadata: Summary information about the data sources - count: Number of data sources found """ logger.info("Data sources list request received", include_schema=request.include_schema) try: result = await data_sources_tool.execute(include_schema=request.include_schema) logger.info( "Data sources list completed", source_count=result.get("count", 0), success=result.get("success", False) ) return result except Exception as e: logger.error("Data sources list failed", error=str(e)) return { "success": False, "error": f"Failed to list data sources: {str(e)}", "results": [], "count": 0 } @mcp.tool() async def universal_security_search(request: UniversalSearchRequest) -> Dict[str, Any]: """ Universal search interface that adapts to available Security Lake data sources. This tool automatically discovers available data sources and performs intelligent searches across Security Lake data based on the query type. It supports multiple OCSF versions and gracefully handles missing data sources. Args: query_type: Type of search to perform: - 'findings': Search security findings (GuardDuty, Security Hub) - 'network': Search network activity (VPC Flow, DNS, Route53) - 'api_calls': Search API activity (CloudTrail) - 'ip_search': Search by IP address across all sources filters: Search criteria dictionary: - ip_address: IP to search for (required for ip_search) - severity: Severity level filter - start_time: Start time in ISO format - end_time: End time in ISO format - Additional filters based on query type data_sources: Optional list of specific sources to search limit: Maximum results to return (default: 100) Returns: A dictionary containing: - success: Boolean indicating if the search was successful - results: List of matching events with unified format - metadata: Query information and data source details - count: Number of results returned """ logger.info( "Universal search request received", query_type=request.query_type, data_sources=request.data_sources ) try: result = await universal_search_tool.execute( query_type=request.query_type, filters=request.filters, data_sources=request.data_sources, limit=request.limit ) logger.info( "Universal search completed", query_type=request.query_type, result_count=result.get("count", 0), success=result.get("success", False) ) return result except Exception as e: logger.error("Universal search failed", query_type=request.query_type, error=str(e)) return { "success": False, "error": f"Search failed: {str(e)}", "results": [], "count": 0 } @mcp.tool() async def discover_aws_resources() -> Dict[str, Any]: """ Discover AWS resources for Security Lake automatically. This tool scans your AWS account to find Security Lake databases, S3 buckets for Athena results, and provides configuration recommendations. Returns: A dictionary containing: - discovered resources and their locations - configuration recommendations - setup status and next steps """ logger.info("AWS resource discovery requested") try: discovery = AWSResourceDiscovery( aws_region=settings.aws_region, aws_profile=settings.aws_profile ) summary = discovery.get_discovery_summary() logger.info( "AWS resource discovery completed", success=summary.get("discovery_successful", False), account_id=summary.get("account_id"), region=summary.get("region") ) return { "success": True, "discovery_results": summary, "configuration_ready": summary.get("discovery_successful", False), "next_steps": summary.get("recommendations", []) } except Exception as e: logger.error("AWS resource discovery failed", error=str(e)) return { "success": False, "error": f"Discovery failed: {str(e)}", "discovery_results": {}, "configuration_ready": False, "next_steps": [ "Ensure AWS credentials are properly configured", "Verify Security Lake is enabled in your AWS account", "Check IAM permissions for S3, Glue, and STS access" ] } @mcp.tool() async def test_connection() -> Dict[str, Any]: """ Test the connection to Amazon Security Lake and Athena. This tool verifies that the MCP server can successfully connect to AWS services and access the Security Lake database. Returns: A dictionary containing: - success: Boolean indicating if the connection test passed - message: Description of the test results - details: Additional connection information """ logger.info("Connection test requested") try: # Test Athena connection athena_client = ip_search_tool.athena_client connection_ok = await athena_client.test_connection() if connection_ok: # Get basic information about available databases databases = await athena_client.list_databases() result = { "success": True, "message": "Connection to Amazon Security Lake successful", "details": { "aws_region": settings.aws_region, "security_lake_database": settings.security_lake_database, "athena_workgroup": settings.athena_workgroup, "available_databases": databases, "database_accessible": settings.security_lake_database in databases } } else: result = { "success": False, "message": "Failed to connect to Amazon Security Lake", "details": { "aws_region": settings.aws_region, "security_lake_database": settings.security_lake_database } } logger.info("Connection test completed", success=result["success"]) return result except Exception as e: logger.error("Connection test failed", error=str(e)) return { "success": False, "message": f"Connection test failed: {str(e)}", "details": { "error_type": type(e).__name__, "error_message": str(e) } } def main(): """Main entry point for the MCP server.""" import os # Set log level from environment or settings log_level = os.getenv("ASL_MCP_LOG_LEVEL", settings.log_level).upper() logger.info( "Starting Amazon Security Lake MCP Server", version="0.1.0", log_level=log_level, aws_region=settings.aws_region, security_lake_database=settings.security_lake_database ) try: # Run the MCP server mcp.run() except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error("Server error", error=str(e)) sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server