import asyncio
import sys
from typing import Any, Dict, List, Optional
import structlog
from fastmcp import FastMCP
from pydantic import BaseModel, Field
from .aws.discovery import AWSResourceDiscovery
from .config.settings import get_settings
from .tools.data_sources import DataSourcesTool
from .tools.guardduty_search import GuardDutySearchTool
from .tools.ip_search import IPSearchTool
from .tools.universal_search import UniversalSearchTool
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger(__name__)
# Initialize MCP server
mcp = FastMCP("Amazon Security Lake MCP Server")
# Load settings
settings = get_settings()
# Initialize tools
ip_search_tool = IPSearchTool(settings)
guardduty_search_tool = GuardDutySearchTool(settings)
data_sources_tool = DataSourcesTool(settings)
universal_search_tool = UniversalSearchTool(settings)
# Pydantic models for request validation
class IPSearchRequest(BaseModel):
ip_address: str = Field(..., description="IP address to search for")
start_time: Optional[str] = Field(None, description="Start time for search (ISO format)")
end_time: Optional[str] = Field(None, description="End time for search (ISO format)")
sources: Optional[List[str]] = Field(None, description="Data sources to search in")
limit: int = Field(100, description="Maximum number of results to return")
class GuardDutySearchRequest(BaseModel):
finding_id: Optional[str] = Field(None, description="Specific GuardDuty finding ID")
severity: Optional[str] = Field(None, description="Severity level (Critical, High, Medium, Low)")
finding_type: Optional[str] = Field(None, description="Type of finding to search for")
start_time: Optional[str] = Field(None, description="Start time for search (ISO format)")
end_time: Optional[str] = Field(None, description="End time for search (ISO format)")
limit: int = Field(100, description="Maximum number of results to return")
class DataSourcesRequest(BaseModel):
include_schema: bool = Field(False, description="Include table schema information")
class UniversalSearchRequest(BaseModel):
query_type: str = Field(..., description="Type of search: 'findings', 'network', 'api_calls', 'ip_search'")
filters: Dict[str, Any] = Field(..., description="Search filters (ip_address, severity, time_range, etc.)")
data_sources: Optional[List[str]] = Field(None, description="Specific data sources to search")
limit: int = Field(100, description="Maximum number of results to return")
@mcp.tool()
async def search_ip_addresses(request: IPSearchRequest) -> Dict[str, Any]:
"""
Search for IP addresses in Amazon Security Lake data.
This tool searches for the specified IP address in both source and destination
IP fields across Security Lake's OCSF-normalized data. It returns detailed
information about network events, security findings, and associated metadata.
Args:
ip_address: The IP address to search for (IPv4 or IPv6)
start_time: Start time for the search in ISO format (optional, defaults to 7 days ago)
end_time: End time for the search in ISO format (optional, defaults to now)
sources: List of data sources to search (optional, searches all by default)
limit: Maximum number of results to return (default: 100, max: 1000)
Returns:
A dictionary containing:
- success: Boolean indicating if the search was successful
- results: List of matching events with enriched context
- metadata: Query information and summary statistics
- count: Number of results returned
"""
logger.info("IP search request received", ip_address=request.ip_address)
try:
result = await ip_search_tool.execute(
ip_address=request.ip_address,
start_time=request.start_time,
end_time=request.end_time,
sources=request.sources,
limit=request.limit
)
logger.info(
"IP search completed",
ip_address=request.ip_address,
result_count=result.get("count", 0),
success=result.get("success", False)
)
return result
except Exception as e:
logger.error("IP search failed", ip_address=request.ip_address, error=str(e))
return {
"success": False,
"error": f"Search failed: {str(e)}",
"results": [],
"count": 0
}
@mcp.tool()
async def search_guardduty_findings(request: GuardDutySearchRequest) -> Dict[str, Any]:
"""
Search for GuardDuty findings in Amazon Security Lake data.
This tool searches for GuardDuty security findings with various filtering options.
It returns detailed information about threats, affected resources, and
recommended remediation actions.
Args:
finding_id: Specific GuardDuty finding ID to search for (optional)
severity: Filter by severity level - Critical, High, Medium, Low, Informational (optional)
finding_type: Filter by finding type/category (optional)
start_time: Start time for the search in ISO format (optional, defaults to 7 days ago)
end_time: End time for the search in ISO format (optional, defaults to now)
limit: Maximum number of results to return (default: 100, max: 1000)
Returns:
A dictionary containing:
- success: Boolean indicating if the search was successful
- results: List of matching findings with risk assessment and context
- metadata: Query information, summary statistics, and recommendations
- count: Number of results returned
"""
logger.info(
"GuardDuty search request received",
finding_id=request.finding_id,
severity=request.severity,
finding_type=request.finding_type
)
try:
result = await guardduty_search_tool.execute(
finding_id=request.finding_id,
severity=request.severity,
finding_type=request.finding_type,
start_time=request.start_time,
end_time=request.end_time,
limit=request.limit
)
logger.info(
"GuardDuty search completed",
finding_id=request.finding_id,
result_count=result.get("count", 0),
success=result.get("success", False)
)
return result
except Exception as e:
logger.error(
"GuardDuty search failed",
finding_id=request.finding_id,
error=str(e)
)
return {
"success": False,
"error": f"Search failed: {str(e)}",
"results": [],
"count": 0
}
@mcp.tool()
async def list_data_sources(request: DataSourcesRequest) -> Dict[str, Any]:
"""
List available data sources in Amazon Security Lake.
This tool provides information about available Security Lake data sources,
including table metadata, OCSF compliance, data freshness, and schema details.
Args:
include_schema: Whether to include detailed schema information for each table (optional)
Returns:
A dictionary containing:
- success: Boolean indicating if the operation was successful
- results: List of available data sources with metadata
- metadata: Summary information about the data sources
- count: Number of data sources found
"""
logger.info("Data sources list request received", include_schema=request.include_schema)
try:
result = await data_sources_tool.execute(include_schema=request.include_schema)
logger.info(
"Data sources list completed",
source_count=result.get("count", 0),
success=result.get("success", False)
)
return result
except Exception as e:
logger.error("Data sources list failed", error=str(e))
return {
"success": False,
"error": f"Failed to list data sources: {str(e)}",
"results": [],
"count": 0
}
@mcp.tool()
async def universal_security_search(request: UniversalSearchRequest) -> Dict[str, Any]:
"""
Universal search interface that adapts to available Security Lake data sources.
This tool automatically discovers available data sources and performs intelligent
searches across Security Lake data based on the query type. It supports multiple
OCSF versions and gracefully handles missing data sources.
Args:
query_type: Type of search to perform:
- 'findings': Search security findings (GuardDuty, Security Hub)
- 'network': Search network activity (VPC Flow, DNS, Route53)
- 'api_calls': Search API activity (CloudTrail)
- 'ip_search': Search by IP address across all sources
filters: Search criteria dictionary:
- ip_address: IP to search for (required for ip_search)
- severity: Severity level filter
- start_time: Start time in ISO format
- end_time: End time in ISO format
- Additional filters based on query type
data_sources: Optional list of specific sources to search
limit: Maximum results to return (default: 100)
Returns:
A dictionary containing:
- success: Boolean indicating if the search was successful
- results: List of matching events with unified format
- metadata: Query information and data source details
- count: Number of results returned
"""
logger.info(
"Universal search request received",
query_type=request.query_type,
data_sources=request.data_sources
)
try:
result = await universal_search_tool.execute(
query_type=request.query_type,
filters=request.filters,
data_sources=request.data_sources,
limit=request.limit
)
logger.info(
"Universal search completed",
query_type=request.query_type,
result_count=result.get("count", 0),
success=result.get("success", False)
)
return result
except Exception as e:
logger.error("Universal search failed", query_type=request.query_type, error=str(e))
return {
"success": False,
"error": f"Search failed: {str(e)}",
"results": [],
"count": 0
}
@mcp.tool()
async def discover_aws_resources() -> Dict[str, Any]:
"""
Discover AWS resources for Security Lake automatically.
This tool scans your AWS account to find Security Lake databases,
S3 buckets for Athena results, and provides configuration recommendations.
Returns:
A dictionary containing:
- discovered resources and their locations
- configuration recommendations
- setup status and next steps
"""
logger.info("AWS resource discovery requested")
try:
discovery = AWSResourceDiscovery(
aws_region=settings.aws_region,
aws_profile=settings.aws_profile
)
summary = discovery.get_discovery_summary()
logger.info(
"AWS resource discovery completed",
success=summary.get("discovery_successful", False),
account_id=summary.get("account_id"),
region=summary.get("region")
)
return {
"success": True,
"discovery_results": summary,
"configuration_ready": summary.get("discovery_successful", False),
"next_steps": summary.get("recommendations", [])
}
except Exception as e:
logger.error("AWS resource discovery failed", error=str(e))
return {
"success": False,
"error": f"Discovery failed: {str(e)}",
"discovery_results": {},
"configuration_ready": False,
"next_steps": [
"Ensure AWS credentials are properly configured",
"Verify Security Lake is enabled in your AWS account",
"Check IAM permissions for S3, Glue, and STS access"
]
}
@mcp.tool()
async def test_connection() -> Dict[str, Any]:
"""
Test the connection to Amazon Security Lake and Athena.
This tool verifies that the MCP server can successfully connect to AWS services
and access the Security Lake database.
Returns:
A dictionary containing:
- success: Boolean indicating if the connection test passed
- message: Description of the test results
- details: Additional connection information
"""
logger.info("Connection test requested")
try:
# Test Athena connection
athena_client = ip_search_tool.athena_client
connection_ok = await athena_client.test_connection()
if connection_ok:
# Get basic information about available databases
databases = await athena_client.list_databases()
result = {
"success": True,
"message": "Connection to Amazon Security Lake successful",
"details": {
"aws_region": settings.aws_region,
"security_lake_database": settings.security_lake_database,
"athena_workgroup": settings.athena_workgroup,
"available_databases": databases,
"database_accessible": settings.security_lake_database in databases
}
}
else:
result = {
"success": False,
"message": "Failed to connect to Amazon Security Lake",
"details": {
"aws_region": settings.aws_region,
"security_lake_database": settings.security_lake_database
}
}
logger.info("Connection test completed", success=result["success"])
return result
except Exception as e:
logger.error("Connection test failed", error=str(e))
return {
"success": False,
"message": f"Connection test failed: {str(e)}",
"details": {
"error_type": type(e).__name__,
"error_message": str(e)
}
}
def main():
"""Main entry point for the MCP server."""
import os
# Set log level from environment or settings
log_level = os.getenv("ASL_MCP_LOG_LEVEL", settings.log_level).upper()
logger.info(
"Starting Amazon Security Lake MCP Server",
version="0.1.0",
log_level=log_level,
aws_region=settings.aws_region,
security_lake_database=settings.security_lake_database
)
try:
# Run the MCP server
mcp.run()
except KeyboardInterrupt:
logger.info("Server shutdown requested")
except Exception as e:
logger.error("Server error", error=str(e))
sys.exit(1)
if __name__ == "__main__":
main()