Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
table_discovery.py16.2 kB
import boto3 import structlog from typing import Dict, List, Any, Optional, Tuple from botocore.exceptions import ClientError logger = structlog.get_logger(__name__) class SecurityLakeTableDiscovery: """Dynamic discovery and management of Security Lake tables""" def __init__(self, aws_region: str, aws_profile: Optional[str] = None): self.aws_region = aws_region self.aws_profile = aws_profile self._session: Optional[boto3.Session] = None self._glue_client = None def _get_session(self) -> boto3.Session: """Get or create AWS session""" if self._session is None: if self.aws_profile: self._session = boto3.Session( profile_name=self.aws_profile, region_name=self.aws_region ) else: self._session = boto3.Session(region_name=self.aws_region) return self._session def _get_glue_client(self): """Get or create Glue client""" if self._glue_client is None: session = self._get_session() self._glue_client = session.client('glue') return self._glue_client def discover_security_lake_tables(self, database_name: str) -> Dict[str, List[str]]: """ Discover available Security Lake tables and categorize by data source Returns: Dictionary mapping data source names to list of table names """ try: glue_client = self._get_glue_client() logger.info("Discovering Security Lake tables", database=database_name) # Get all tables in the database tables_response = glue_client.get_tables(DatabaseName=database_name) data_sources = { 'cloudtrail': [], 'guardduty': [], 'security_hub': [], 'vpc_flow': [], 'route53': [], 'eks_audit': [], 'lambda_execution': [], 'waf': [], 'dns': [], 'other': [] } for table in tables_response['TableList']: table_name = table['Name'] # Skip non-Security Lake tables if not table_name.startswith('amazon_security_lake_table_'): continue # Extract data source from table name pattern data_source = self._extract_data_source_from_table_name(table_name) if data_source in data_sources: data_sources[data_source].append(table_name) else: data_sources['other'].append(table_name) logger.info( "Found Security Lake table", table=table_name, data_source=data_source, ocsf_version=self.get_ocsf_version_from_table(table_name) ) # Remove empty categories data_sources = {k: v for k, v in data_sources.items() if v} logger.info( "Table discovery completed", total_sources=len(data_sources), sources=list(data_sources.keys()) ) return data_sources except ClientError as e: logger.error("Failed to discover tables", error=str(e)) return {} except Exception as e: logger.error("Unexpected error during table discovery", error=str(e)) return {} def _extract_data_source_from_table_name(self, table_name: str) -> str: """Extract data source type from table name""" table_lower = table_name.lower() # Map table name patterns to data source types if 'cloud_trail' in table_lower or 'cloudtrail' in table_lower: return 'cloudtrail' elif 'guardduty' in table_lower: return 'guardduty' elif 'sh_findings' in table_lower or 'security_hub' in table_lower: return 'security_hub' elif 'vpc_flow' in table_lower or 'vpcflow' in table_lower: return 'vpc_flow' elif 'route53' in table_lower: return 'route53' elif 'eks_audit' in table_lower: return 'eks_audit' elif 'lambda_execution' in table_lower: return 'lambda_execution' elif 'waf' in table_lower: return 'waf' elif 'dns' in table_lower: return 'dns' else: return 'other' def get_ocsf_version_from_table(self, table_name: str) -> str: """Extract OCSF version from table name""" if '_1_0' in table_name: return '1.0' elif '_2_0' in table_name: return '2.0' elif '_3_0' in table_name: return '3.0' else: return 'unknown' def check_data_source_availability(self, table_name: str, database_name: str) -> Dict[str, Any]: """ Check if a data source has available data Returns: Dictionary with availability status and metadata """ try: session = self._get_session() athena_client = session.client('athena') # Simple count query to check for data query = f""" SELECT COUNT(*) as record_count FROM "{database_name}"."{table_name}" LIMIT 1 """ logger.info("Checking data availability", table=table_name) # This is a simplified check - in production you'd want to use the full Athena client # For now, we'll assume tables exist if they're in Glue catalog glue_client = self._get_glue_client() try: table_response = glue_client.get_table( DatabaseName=database_name, Name=table_name ) # Check if table has any partitions (indicates data) try: partitions = glue_client.get_partitions( DatabaseName=database_name, TableName=table_name, MaxResults=1 ) has_partitions = len(partitions.get('Partitions', [])) > 0 except ClientError: has_partitions = False return { 'available': True, 'has_data': has_partitions, 'status': 'active' if has_partitions else 'empty', 'table_format': table_response['Table'].get('StorageDescriptor', {}).get('InputFormat', 'unknown'), 'location': table_response['Table'].get('StorageDescriptor', {}).get('Location', '') } except ClientError as e: if e.response['Error']['Code'] == 'EntityNotFoundException': return { 'available': False, 'has_data': False, 'status': 'not_found', 'error': 'Table not found in catalog' } else: raise except Exception as e: logger.error("Failed to check data availability", table=table_name, error=str(e)) return { 'available': False, 'has_data': False, 'status': 'error', 'error': str(e) } def get_best_table_for_data_source( self, data_source: str, available_tables: Dict[str, List[str]], preferred_ocsf_version: str = "2.0" ) -> Optional[str]: """ Get the best table for a given data source, preferring specific OCSF versions Args: data_source: The data source type (e.g., 'guardduty', 'security_hub') available_tables: Dict from discover_security_lake_tables() preferred_ocsf_version: Preferred OCSF version (default: "2.0") Returns: Best table name or None if no suitable table found """ if data_source not in available_tables or not available_tables[data_source]: return None tables = available_tables[data_source] # First, try to find a table with the preferred OCSF version for table in tables: if f"_{preferred_ocsf_version.replace('.', '_')}" in table: return table # If preferred version not found, return the first available table return tables[0] def get_table_schema_info(self, table_name: str, database_name: str) -> Dict[str, Any]: """ Get detailed schema information for a table Returns: Dictionary with schema details and OCSF structure information """ try: glue_client = self._get_glue_client() table_response = glue_client.get_table( DatabaseName=database_name, Name=table_name ) table = table_response['Table'] columns = table.get('StorageDescriptor', {}).get('Columns', []) # Analyze OCSF structure ocsf_fields = self._analyze_ocsf_structure(columns) return { 'table_name': table_name, 'ocsf_version': self.get_ocsf_version_from_table(table_name), 'column_count': len(columns), 'storage_format': table.get('StorageDescriptor', {}).get('InputFormat', 'unknown'), 'location': table.get('StorageDescriptor', {}).get('Location', ''), 'ocsf_fields': ocsf_fields, 'has_nested_structures': any('struct<' in col.get('Type', '') for col in columns), 'partition_keys': [pk['Name'] for pk in table.get('PartitionKeys', [])] } except Exception as e: logger.error("Failed to get table schema", table=table_name, error=str(e)) return {'error': str(e)} def _analyze_ocsf_structure(self, columns: List[Dict[str, Any]]) -> Dict[str, List[str]]: """Analyze columns to identify OCSF field categories""" ocsf_categories = { 'core_fields': [], 'metadata_fields': [], 'finding_fields': [], 'network_fields': [], 'cloud_fields': [], 'actor_fields': [], 'device_fields': [], 'other_fields': [] } for column in columns: col_name = column.get('Name', '').lower() if col_name in ['time', 'type_name', 'type_uid', 'class_name', 'class_uid', 'severity', 'severity_id']: ocsf_categories['core_fields'].append(column['Name']) elif col_name.startswith('metadata') or 'metadata' in col_name: ocsf_categories['metadata_fields'].append(column['Name']) elif col_name.startswith('finding') or 'finding' in col_name: ocsf_categories['finding_fields'].append(column['Name']) elif col_name in ['src_endpoint', 'dst_endpoint'] or 'endpoint' in col_name or 'network' in col_name: ocsf_categories['network_fields'].append(column['Name']) elif col_name.startswith('cloud') or 'account' in col_name: ocsf_categories['cloud_fields'].append(column['Name']) elif col_name.startswith('actor') or 'user' in col_name: ocsf_categories['actor_fields'].append(column['Name']) elif col_name.startswith('device') or col_name.startswith('host'): ocsf_categories['device_fields'].append(column['Name']) else: ocsf_categories['other_fields'].append(column['Name']) # Remove empty categories return {k: v for k, v in ocsf_categories.items() if v} def auto_select_data_sources( self, query_type: str, available_tables: Dict[str, List[str]] ) -> List[str]: """ Automatically select relevant data sources based on query type Args: query_type: Type of query ('findings', 'network', 'api_calls', etc.) available_tables: Available tables from discovery Returns: List of relevant data source names """ query_mappings = { 'findings': ['security_hub', 'guardduty'], 'network': ['vpc_flow', 'dns', 'route53'], 'api_calls': ['cloudtrail'], 'lambda': ['lambda_execution'], 'kubernetes': ['eks_audit'], 'web_security': ['waf'], 'ip_search': ['vpc_flow', 'cloudtrail', 'dns', 'security_hub'], 'threat_detection': ['guardduty', 'security_hub'], 'compliance': ['security_hub', 'cloudtrail'] } # Get suggested sources for query type suggested_sources = query_mappings.get(query_type, list(available_tables.keys())) # Return only sources that actually have tables available return [source for source in suggested_sources if source in available_tables] def get_discovery_summary(self, database_name: str) -> Dict[str, Any]: """Get a comprehensive summary of discovered Security Lake resources""" try: available_tables = self.discover_security_lake_tables(database_name) summary = { 'database': database_name, 'total_data_sources': len(available_tables), 'data_sources': {}, 'ocsf_versions': set(), 'recommendations': [] } for data_source, tables in available_tables.items(): source_info = { 'tables': tables, 'table_count': len(tables), 'ocsf_versions': [] } for table in tables: ocsf_version = self.get_ocsf_version_from_table(table) source_info['ocsf_versions'].append(ocsf_version) summary['ocsf_versions'].add(ocsf_version) # Check data availability availability = self.check_data_source_availability(table, database_name) source_info['status'] = availability.get('status', 'unknown') summary['data_sources'][data_source] = source_info # Convert set to list for JSON serialization summary['ocsf_versions'] = list(summary['ocsf_versions']) # Generate recommendations if not available_tables: summary['recommendations'].append("No Security Lake tables found. Verify Security Lake is enabled.") elif len(available_tables) < 3: summary['recommendations'].append("Consider enabling more data sources in Security Lake for comprehensive security analysis.") if '2.0' in summary['ocsf_versions'] and '1.0' in summary['ocsf_versions']: summary['recommendations'].append("Mixed OCSF versions detected. Consider standardizing to OCSF 2.0.") return summary except Exception as e: logger.error("Failed to generate discovery summary", error=str(e)) return { 'error': str(e), 'database': database_name, 'total_data_sources': 0, 'data_sources': {}, 'recommendations': ['Fix database connectivity issues before proceeding'] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server