Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
discovery.py10.9 kB
import boto3 import structlog from typing import Optional, List, Dict, Any from botocore.exceptions import ClientError, NoCredentialsError logger = structlog.get_logger(__name__) class AWSResourceDiscovery: """Auto-discovery of AWS resources for Security Lake""" def __init__(self, aws_region: str, aws_profile: Optional[str] = None): self.aws_region = aws_region self.aws_profile = aws_profile self._session: Optional[boto3.Session] = None def _get_session(self) -> boto3.Session: """Get or create AWS session""" if self._session is None: if self.aws_profile: self._session = boto3.Session( profile_name=self.aws_profile, region_name=self.aws_region ) else: self._session = boto3.Session(region_name=self.aws_region) return self._session def discover_security_lake_bucket(self) -> Optional[str]: """ Automatically discover the Security Lake S3 bucket for Athena results. Returns: S3 URI for Athena results bucket or None if not found """ try: session = self._get_session() s3_client = session.client('s3') sts_client = session.client('sts') # Get current account ID account_id = sts_client.get_caller_identity()['Account'] logger.info("Discovering Security Lake bucket", account_id=account_id, region=self.aws_region) # List buckets and look for Security Lake patterns response = s3_client.list_buckets() security_lake_buckets = [] athena_result_buckets = [] for bucket in response['Buckets']: bucket_name = bucket['Name'] # Check for Security Lake bucket patterns if self._is_security_lake_bucket(bucket_name, account_id): security_lake_buckets.append(bucket_name) logger.info("Found Security Lake bucket", bucket=bucket_name) # Check for potential Athena results buckets if self._is_athena_results_bucket(bucket_name, account_id): athena_result_buckets.append(bucket_name) logger.info("Found potential Athena results bucket", bucket=bucket_name) # Try to find the best Athena results location athena_location = self._find_best_athena_location( athena_result_buckets, security_lake_buckets, account_id ) if athena_location: logger.info("Selected Athena output location", location=athena_location) return athena_location else: logger.warning("No suitable Athena output location found") return None except NoCredentialsError: logger.error("AWS credentials not configured for bucket discovery") return None except ClientError as e: logger.error("AWS error during bucket discovery", error=str(e)) return None except Exception as e: logger.error("Unexpected error during bucket discovery", error=str(e)) return None def _is_security_lake_bucket(self, bucket_name: str, account_id: str) -> bool: """Check if bucket matches Security Lake patterns""" patterns = [ f"aws-security-data-lake-{self.aws_region}-{account_id}", f"amazon-security-lake-{self.aws_region}-{account_id}", f"security-lake-{self.aws_region}-{account_id}", "aws-security-data-lake", "amazon-security-lake" ] return any(pattern in bucket_name.lower() for pattern in patterns) def _is_athena_results_bucket(self, bucket_name: str, account_id: str) -> bool: """Check if bucket could be used for Athena results""" patterns = [ "athena-results", "athena-query-results", "aws-athena-query-results", f"athena-results-{account_id}", f"aws-athena-query-results-{account_id}-{self.aws_region}", "query-results", "analytics-results" ] return any(pattern in bucket_name.lower() for pattern in patterns) def _find_best_athena_location( self, athena_buckets: List[str], security_lake_buckets: List[str], account_id: str ) -> Optional[str]: """Find the best S3 location for Athena results""" # Priority 1: Existing Athena results bucket if athena_buckets: # Prefer buckets with account ID and region for bucket in athena_buckets: if account_id in bucket and self.aws_region in bucket: return f"s3://{bucket}/security-lake-queries/" # Fall back to any athena results bucket return f"s3://{athena_buckets[0]}/security-lake-queries/" # Priority 2: Create path in Security Lake bucket if it exists if security_lake_buckets: # Check if we can write to the Security Lake bucket for bucket in security_lake_buckets: if self._test_bucket_write_access(bucket): return f"s3://{bucket}/athena-results/" # Priority 3: Suggest default naming pattern default_bucket = f"aws-athena-query-results-{account_id}-{self.aws_region}" return f"s3://{default_bucket}/security-lake-queries/" def _test_bucket_write_access(self, bucket_name: str) -> bool: """Test if we have write access to a bucket""" try: session = self._get_session() s3_client = session.client('s3') # Try to get bucket location (this also tests read access) s3_client.get_bucket_location(Bucket=bucket_name) # Try to list objects with a prefix (tests basic access) s3_client.list_objects_v2(Bucket=bucket_name, Prefix='test/', MaxKeys=1) return True except ClientError as e: error_code = e.response.get('Error', {}).get('Code', '') if error_code in ['AccessDenied', 'NoSuchBucket']: return False # For other errors, assume we might have access return True except Exception: return False def discover_security_lake_database(self) -> Optional[str]: """ Discover the Security Lake Glue database name. Returns: Database name or None if not found """ try: session = self._get_session() glue_client = session.client('glue') logger.info("Discovering Security Lake database") # Get list of databases response = glue_client.get_databases() security_lake_databases = [] for database in response['DatabaseList']: db_name = database['Name'] # Check for Security Lake database patterns if self._is_security_lake_database(db_name): security_lake_databases.append(db_name) logger.info("Found Security Lake database", database=db_name) if security_lake_databases: # Return the first match, preferring the standard name standard_name = "amazon_security_lake_glue_db" if standard_name in security_lake_databases: return standard_name else: return security_lake_databases[0] else: logger.warning("No Security Lake database found") return None except Exception as e: logger.error("Error discovering Security Lake database", error=str(e)) return None def _is_security_lake_database(self, db_name: str) -> bool: """Check if database matches Security Lake patterns""" patterns = [ "amazon_security_lake", "security_lake", "aws_security_lake", "securitylake" ] return any(pattern in db_name.lower() for pattern in patterns) def get_discovery_summary(self) -> Dict[str, Any]: """Get a summary of discovered AWS resources""" try: session = self._get_session() sts_client = session.client('sts') account_id = sts_client.get_caller_identity()['Account'] bucket = self.discover_security_lake_bucket() database = self.discover_security_lake_database() return { "account_id": account_id, "region": self.aws_region, "athena_output_location": bucket, "security_lake_database": database, "discovery_successful": bucket is not None and database is not None, "recommendations": self._get_setup_recommendations(bucket, database, account_id) } except Exception as e: return { "error": str(e), "discovery_successful": False, "recommendations": [ "Ensure AWS credentials are properly configured", "Verify Security Lake is enabled in your AWS account", "Check IAM permissions for S3 and Glue access" ] } def _get_setup_recommendations( self, bucket: Optional[str], database: Optional[str], account_id: str ) -> List[str]: """Generate setup recommendations based on discovery results""" recommendations = [] if not database: recommendations.append( "Enable Amazon Security Lake in your AWS account to create the required database" ) if not bucket: recommendations.extend([ f"Create an S3 bucket for Athena query results, e.g., 'aws-athena-query-results-{account_id}-{self.aws_region}'", "Ensure the bucket is in the same region as your Security Lake data", "Grant appropriate permissions for Athena to write query results" ]) if bucket and database: recommendations.append( "All required resources discovered successfully! You can use automatic configuration." ) return recommendations

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server