import boto3
import structlog
from typing import Dict, List, Any, Optional, Tuple
from botocore.exceptions import ClientError
logger = structlog.get_logger(__name__)
class SecurityLakeTableDiscovery:
"""Dynamic discovery and management of Security Lake tables"""
def __init__(self, aws_region: str, aws_profile: Optional[str] = None):
self.aws_region = aws_region
self.aws_profile = aws_profile
self._session: Optional[boto3.Session] = None
self._glue_client = None
def _get_session(self) -> boto3.Session:
"""Get or create AWS session"""
if self._session is None:
if self.aws_profile:
self._session = boto3.Session(
profile_name=self.aws_profile,
region_name=self.aws_region
)
else:
self._session = boto3.Session(region_name=self.aws_region)
return self._session
def _get_glue_client(self):
"""Get or create Glue client"""
if self._glue_client is None:
session = self._get_session()
self._glue_client = session.client('glue')
return self._glue_client
def discover_security_lake_tables(self, database_name: str) -> Dict[str, List[str]]:
"""
Discover available Security Lake tables and categorize by data source
Returns:
Dictionary mapping data source names to list of table names
"""
try:
glue_client = self._get_glue_client()
logger.info("Discovering Security Lake tables", database=database_name)
# Get all tables in the database
tables_response = glue_client.get_tables(DatabaseName=database_name)
data_sources = {
'cloudtrail': [],
'guardduty': [],
'security_hub': [],
'vpc_flow': [],
'route53': [],
'eks_audit': [],
'lambda_execution': [],
'waf': [],
'dns': [],
'other': []
}
for table in tables_response['TableList']:
table_name = table['Name']
# Skip non-Security Lake tables
if not table_name.startswith('amazon_security_lake_table_'):
continue
# Extract data source from table name pattern
data_source = self._extract_data_source_from_table_name(table_name)
if data_source in data_sources:
data_sources[data_source].append(table_name)
else:
data_sources['other'].append(table_name)
logger.info(
"Found Security Lake table",
table=table_name,
data_source=data_source,
ocsf_version=self.get_ocsf_version_from_table(table_name)
)
# Remove empty categories
data_sources = {k: v for k, v in data_sources.items() if v}
logger.info(
"Table discovery completed",
total_sources=len(data_sources),
sources=list(data_sources.keys())
)
return data_sources
except ClientError as e:
logger.error("Failed to discover tables", error=str(e))
return {}
except Exception as e:
logger.error("Unexpected error during table discovery", error=str(e))
return {}
def _extract_data_source_from_table_name(self, table_name: str) -> str:
"""Extract data source type from table name"""
table_lower = table_name.lower()
# Map table name patterns to data source types
if 'cloud_trail' in table_lower or 'cloudtrail' in table_lower:
return 'cloudtrail'
elif 'guardduty' in table_lower:
return 'guardduty'
elif 'sh_findings' in table_lower or 'security_hub' in table_lower:
return 'security_hub'
elif 'vpc_flow' in table_lower or 'vpcflow' in table_lower:
return 'vpc_flow'
elif 'route53' in table_lower:
return 'route53'
elif 'eks_audit' in table_lower:
return 'eks_audit'
elif 'lambda_execution' in table_lower:
return 'lambda_execution'
elif 'waf' in table_lower:
return 'waf'
elif 'dns' in table_lower:
return 'dns'
else:
return 'other'
def get_ocsf_version_from_table(self, table_name: str) -> str:
"""Extract OCSF version from table name"""
if '_1_0' in table_name:
return '1.0'
elif '_2_0' in table_name:
return '2.0'
elif '_3_0' in table_name:
return '3.0'
else:
return 'unknown'
def check_data_source_availability(self, table_name: str, database_name: str) -> Dict[str, Any]:
"""
Check if a data source has available data
Returns:
Dictionary with availability status and metadata
"""
try:
session = self._get_session()
athena_client = session.client('athena')
# Simple count query to check for data
query = f"""
SELECT COUNT(*) as record_count
FROM "{database_name}"."{table_name}"
LIMIT 1
"""
logger.info("Checking data availability", table=table_name)
# This is a simplified check - in production you'd want to use the full Athena client
# For now, we'll assume tables exist if they're in Glue catalog
glue_client = self._get_glue_client()
try:
table_response = glue_client.get_table(
DatabaseName=database_name,
Name=table_name
)
# Check if table has any partitions (indicates data)
try:
partitions = glue_client.get_partitions(
DatabaseName=database_name,
TableName=table_name,
MaxResults=1
)
has_partitions = len(partitions.get('Partitions', [])) > 0
except ClientError:
has_partitions = False
return {
'available': True,
'has_data': has_partitions,
'status': 'active' if has_partitions else 'empty',
'table_format': table_response['Table'].get('StorageDescriptor', {}).get('InputFormat', 'unknown'),
'location': table_response['Table'].get('StorageDescriptor', {}).get('Location', '')
}
except ClientError as e:
if e.response['Error']['Code'] == 'EntityNotFoundException':
return {
'available': False,
'has_data': False,
'status': 'not_found',
'error': 'Table not found in catalog'
}
else:
raise
except Exception as e:
logger.error("Failed to check data availability", table=table_name, error=str(e))
return {
'available': False,
'has_data': False,
'status': 'error',
'error': str(e)
}
def get_best_table_for_data_source(
self,
data_source: str,
available_tables: Dict[str, List[str]],
preferred_ocsf_version: str = "2.0"
) -> Optional[str]:
"""
Get the best table for a given data source, preferring specific OCSF versions
Args:
data_source: The data source type (e.g., 'guardduty', 'security_hub')
available_tables: Dict from discover_security_lake_tables()
preferred_ocsf_version: Preferred OCSF version (default: "2.0")
Returns:
Best table name or None if no suitable table found
"""
if data_source not in available_tables or not available_tables[data_source]:
return None
tables = available_tables[data_source]
# First, try to find a table with the preferred OCSF version
for table in tables:
if f"_{preferred_ocsf_version.replace('.', '_')}" in table:
return table
# If preferred version not found, return the first available table
return tables[0]
def get_table_schema_info(self, table_name: str, database_name: str) -> Dict[str, Any]:
"""
Get detailed schema information for a table
Returns:
Dictionary with schema details and OCSF structure information
"""
try:
glue_client = self._get_glue_client()
table_response = glue_client.get_table(
DatabaseName=database_name,
Name=table_name
)
table = table_response['Table']
columns = table.get('StorageDescriptor', {}).get('Columns', [])
# Analyze OCSF structure
ocsf_fields = self._analyze_ocsf_structure(columns)
return {
'table_name': table_name,
'ocsf_version': self.get_ocsf_version_from_table(table_name),
'column_count': len(columns),
'storage_format': table.get('StorageDescriptor', {}).get('InputFormat', 'unknown'),
'location': table.get('StorageDescriptor', {}).get('Location', ''),
'ocsf_fields': ocsf_fields,
'has_nested_structures': any('struct<' in col.get('Type', '') for col in columns),
'partition_keys': [pk['Name'] for pk in table.get('PartitionKeys', [])]
}
except Exception as e:
logger.error("Failed to get table schema", table=table_name, error=str(e))
return {'error': str(e)}
def _analyze_ocsf_structure(self, columns: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Analyze columns to identify OCSF field categories"""
ocsf_categories = {
'core_fields': [],
'metadata_fields': [],
'finding_fields': [],
'network_fields': [],
'cloud_fields': [],
'actor_fields': [],
'device_fields': [],
'other_fields': []
}
for column in columns:
col_name = column.get('Name', '').lower()
if col_name in ['time', 'type_name', 'type_uid', 'class_name', 'class_uid', 'severity', 'severity_id']:
ocsf_categories['core_fields'].append(column['Name'])
elif col_name.startswith('metadata') or 'metadata' in col_name:
ocsf_categories['metadata_fields'].append(column['Name'])
elif col_name.startswith('finding') or 'finding' in col_name:
ocsf_categories['finding_fields'].append(column['Name'])
elif col_name in ['src_endpoint', 'dst_endpoint'] or 'endpoint' in col_name or 'network' in col_name:
ocsf_categories['network_fields'].append(column['Name'])
elif col_name.startswith('cloud') or 'account' in col_name:
ocsf_categories['cloud_fields'].append(column['Name'])
elif col_name.startswith('actor') or 'user' in col_name:
ocsf_categories['actor_fields'].append(column['Name'])
elif col_name.startswith('device') or col_name.startswith('host'):
ocsf_categories['device_fields'].append(column['Name'])
else:
ocsf_categories['other_fields'].append(column['Name'])
# Remove empty categories
return {k: v for k, v in ocsf_categories.items() if v}
def auto_select_data_sources(
self,
query_type: str,
available_tables: Dict[str, List[str]]
) -> List[str]:
"""
Automatically select relevant data sources based on query type
Args:
query_type: Type of query ('findings', 'network', 'api_calls', etc.)
available_tables: Available tables from discovery
Returns:
List of relevant data source names
"""
query_mappings = {
'findings': ['security_hub', 'guardduty'],
'network': ['vpc_flow', 'dns', 'route53'],
'api_calls': ['cloudtrail'],
'lambda': ['lambda_execution'],
'kubernetes': ['eks_audit'],
'web_security': ['waf'],
'ip_search': ['vpc_flow', 'cloudtrail', 'dns', 'security_hub'],
'threat_detection': ['guardduty', 'security_hub'],
'compliance': ['security_hub', 'cloudtrail']
}
# Get suggested sources for query type
suggested_sources = query_mappings.get(query_type, list(available_tables.keys()))
# Return only sources that actually have tables available
return [source for source in suggested_sources if source in available_tables]
def get_discovery_summary(self, database_name: str) -> Dict[str, Any]:
"""Get a comprehensive summary of discovered Security Lake resources"""
try:
available_tables = self.discover_security_lake_tables(database_name)
summary = {
'database': database_name,
'total_data_sources': len(available_tables),
'data_sources': {},
'ocsf_versions': set(),
'recommendations': []
}
for data_source, tables in available_tables.items():
source_info = {
'tables': tables,
'table_count': len(tables),
'ocsf_versions': []
}
for table in tables:
ocsf_version = self.get_ocsf_version_from_table(table)
source_info['ocsf_versions'].append(ocsf_version)
summary['ocsf_versions'].add(ocsf_version)
# Check data availability
availability = self.check_data_source_availability(table, database_name)
source_info['status'] = availability.get('status', 'unknown')
summary['data_sources'][data_source] = source_info
# Convert set to list for JSON serialization
summary['ocsf_versions'] = list(summary['ocsf_versions'])
# Generate recommendations
if not available_tables:
summary['recommendations'].append("No Security Lake tables found. Verify Security Lake is enabled.")
elif len(available_tables) < 3:
summary['recommendations'].append("Consider enabling more data sources in Security Lake for comprehensive security analysis.")
if '2.0' in summary['ocsf_versions'] and '1.0' in summary['ocsf_versions']:
summary['recommendations'].append("Mixed OCSF versions detected. Consider standardizing to OCSF 2.0.")
return summary
except Exception as e:
logger.error("Failed to generate discovery summary", error=str(e))
return {
'error': str(e),
'database': database_name,
'total_data_sources': 0,
'data_sources': {},
'recommendations': ['Fix database connectivity issues before proceeding']
}