import boto3
import structlog
from typing import Optional, List, Dict, Any
from botocore.exceptions import ClientError, NoCredentialsError
logger = structlog.get_logger(__name__)
class AWSResourceDiscovery:
"""Auto-discovery of AWS resources for Security Lake"""
def __init__(self, aws_region: str, aws_profile: Optional[str] = None):
self.aws_region = aws_region
self.aws_profile = aws_profile
self._session: Optional[boto3.Session] = None
def _get_session(self) -> boto3.Session:
"""Get or create AWS session"""
if self._session is None:
if self.aws_profile:
self._session = boto3.Session(
profile_name=self.aws_profile,
region_name=self.aws_region
)
else:
self._session = boto3.Session(region_name=self.aws_region)
return self._session
def discover_security_lake_bucket(self) -> Optional[str]:
"""
Automatically discover the Security Lake S3 bucket for Athena results.
Returns:
S3 URI for Athena results bucket or None if not found
"""
try:
session = self._get_session()
s3_client = session.client('s3')
sts_client = session.client('sts')
# Get current account ID
account_id = sts_client.get_caller_identity()['Account']
logger.info("Discovering Security Lake bucket", account_id=account_id, region=self.aws_region)
# List buckets and look for Security Lake patterns
response = s3_client.list_buckets()
security_lake_buckets = []
athena_result_buckets = []
for bucket in response['Buckets']:
bucket_name = bucket['Name']
# Check for Security Lake bucket patterns
if self._is_security_lake_bucket(bucket_name, account_id):
security_lake_buckets.append(bucket_name)
logger.info("Found Security Lake bucket", bucket=bucket_name)
# Check for potential Athena results buckets
if self._is_athena_results_bucket(bucket_name, account_id):
athena_result_buckets.append(bucket_name)
logger.info("Found potential Athena results bucket", bucket=bucket_name)
# Try to find the best Athena results location
athena_location = self._find_best_athena_location(
athena_result_buckets,
security_lake_buckets,
account_id
)
if athena_location:
logger.info("Selected Athena output location", location=athena_location)
return athena_location
else:
logger.warning("No suitable Athena output location found")
return None
except NoCredentialsError:
logger.error("AWS credentials not configured for bucket discovery")
return None
except ClientError as e:
logger.error("AWS error during bucket discovery", error=str(e))
return None
except Exception as e:
logger.error("Unexpected error during bucket discovery", error=str(e))
return None
def _is_security_lake_bucket(self, bucket_name: str, account_id: str) -> bool:
"""Check if bucket matches Security Lake patterns"""
patterns = [
f"aws-security-data-lake-{self.aws_region}-{account_id}",
f"amazon-security-lake-{self.aws_region}-{account_id}",
f"security-lake-{self.aws_region}-{account_id}",
"aws-security-data-lake",
"amazon-security-lake"
]
return any(pattern in bucket_name.lower() for pattern in patterns)
def _is_athena_results_bucket(self, bucket_name: str, account_id: str) -> bool:
"""Check if bucket could be used for Athena results"""
patterns = [
"athena-results",
"athena-query-results",
"aws-athena-query-results",
f"athena-results-{account_id}",
f"aws-athena-query-results-{account_id}-{self.aws_region}",
"query-results",
"analytics-results"
]
return any(pattern in bucket_name.lower() for pattern in patterns)
def _find_best_athena_location(
self,
athena_buckets: List[str],
security_lake_buckets: List[str],
account_id: str
) -> Optional[str]:
"""Find the best S3 location for Athena results"""
# Priority 1: Existing Athena results bucket
if athena_buckets:
# Prefer buckets with account ID and region
for bucket in athena_buckets:
if account_id in bucket and self.aws_region in bucket:
return f"s3://{bucket}/security-lake-queries/"
# Fall back to any athena results bucket
return f"s3://{athena_buckets[0]}/security-lake-queries/"
# Priority 2: Create path in Security Lake bucket if it exists
if security_lake_buckets:
# Check if we can write to the Security Lake bucket
for bucket in security_lake_buckets:
if self._test_bucket_write_access(bucket):
return f"s3://{bucket}/athena-results/"
# Priority 3: Suggest default naming pattern
default_bucket = f"aws-athena-query-results-{account_id}-{self.aws_region}"
return f"s3://{default_bucket}/security-lake-queries/"
def _test_bucket_write_access(self, bucket_name: str) -> bool:
"""Test if we have write access to a bucket"""
try:
session = self._get_session()
s3_client = session.client('s3')
# Try to get bucket location (this also tests read access)
s3_client.get_bucket_location(Bucket=bucket_name)
# Try to list objects with a prefix (tests basic access)
s3_client.list_objects_v2(Bucket=bucket_name, Prefix='test/', MaxKeys=1)
return True
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code in ['AccessDenied', 'NoSuchBucket']:
return False
# For other errors, assume we might have access
return True
except Exception:
return False
def discover_security_lake_database(self) -> Optional[str]:
"""
Discover the Security Lake Glue database name.
Returns:
Database name or None if not found
"""
try:
session = self._get_session()
glue_client = session.client('glue')
logger.info("Discovering Security Lake database")
# Get list of databases
response = glue_client.get_databases()
security_lake_databases = []
for database in response['DatabaseList']:
db_name = database['Name']
# Check for Security Lake database patterns
if self._is_security_lake_database(db_name):
security_lake_databases.append(db_name)
logger.info("Found Security Lake database", database=db_name)
if security_lake_databases:
# Return the first match, preferring the standard name
standard_name = "amazon_security_lake_glue_db"
if standard_name in security_lake_databases:
return standard_name
else:
return security_lake_databases[0]
else:
logger.warning("No Security Lake database found")
return None
except Exception as e:
logger.error("Error discovering Security Lake database", error=str(e))
return None
def _is_security_lake_database(self, db_name: str) -> bool:
"""Check if database matches Security Lake patterns"""
patterns = [
"amazon_security_lake",
"security_lake",
"aws_security_lake",
"securitylake"
]
return any(pattern in db_name.lower() for pattern in patterns)
def get_discovery_summary(self) -> Dict[str, Any]:
"""Get a summary of discovered AWS resources"""
try:
session = self._get_session()
sts_client = session.client('sts')
account_id = sts_client.get_caller_identity()['Account']
bucket = self.discover_security_lake_bucket()
database = self.discover_security_lake_database()
return {
"account_id": account_id,
"region": self.aws_region,
"athena_output_location": bucket,
"security_lake_database": database,
"discovery_successful": bucket is not None and database is not None,
"recommendations": self._get_setup_recommendations(bucket, database, account_id)
}
except Exception as e:
return {
"error": str(e),
"discovery_successful": False,
"recommendations": [
"Ensure AWS credentials are properly configured",
"Verify Security Lake is enabled in your AWS account",
"Check IAM permissions for S3 and Glue access"
]
}
def _get_setup_recommendations(
self,
bucket: Optional[str],
database: Optional[str],
account_id: str
) -> List[str]:
"""Generate setup recommendations based on discovery results"""
recommendations = []
if not database:
recommendations.append(
"Enable Amazon Security Lake in your AWS account to create the required database"
)
if not bucket:
recommendations.extend([
f"Create an S3 bucket for Athena query results, e.g., 'aws-athena-query-results-{account_id}-{self.aws_region}'",
"Ensure the bucket is in the same region as your Security Lake data",
"Grant appropriate permissions for Athena to write query results"
])
if bucket and database:
recommendations.append(
"All required resources discovered successfully! You can use automatic configuration."
)
return recommendations