Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
mcp_security_lake_analysis.md9.27 kB
# Amazon Security Lake MCP Server - Issue Analysis & Improvement Recommendations ## Executive Summary The Amazon Security Lake MCP server tools are failing because they are hardcoded to expect specific table schemas and data sources that don't match real-world Security Lake deployments. The tools need to be made more flexible and dynamic. ## Root Cause Analysis ### 1. **Hardcoded Table Name Expectations** - **MCP Tools Expect**: `amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0` - **MCP Tools Expect**: `amazon_security_lake_table_us_east_1_guardduty_1_0` - **Actual Tables Available**: - `amazon_security_lake_table_us_east_1_eks_audit_2_0` - `amazon_security_lake_table_us_east_1_lambda_execution_2_0` - `amazon_security_lake_table_us_east_1_route53_2_0` - `amazon_security_lake_table_us_east_1_sh_findings_2_0` - `amazon_security_lake_table_us_east_1_vpc_flow_2_0` ### 2. **OCSF Version Mismatch** - **MCP Tools Expect**: OCSF 1.0 schema (`_1_0` suffix) - **Actual Deployment**: OCSF 2.0 schema (`_2_0` suffix) ### 3. **Data Source Availability Assumptions** - **MCP Tools Assume**: CloudTrail and GuardDuty are always enabled - **Reality**: Users may have different data source combinations based on their specific needs ### 4. **Table Storage Format** - **Actual Tables**: Use Apache Iceberg format with complex nested OCSF structures - **MCP Tools**: May not properly handle Iceberg metadata and complex struct types ## Detailed Findings ### Current Security Lake Setup ```json { "account_id": "767398042573", "region": "us-east-1", "database": "amazon_security_lake_glue_db_us_east_1", "total_tables": 5, "table_format": "ICEBERG", "ocsf_version": "2.0" } ``` ### Available Data Sources & Record Counts ```json { "eks_audit": {"records": 0, "status": "no_data"}, "lambda_execution": {"records": 4158, "status": "active"}, "route53": {"records": 0, "status": "no_data"}, "security_hub_findings": {"records": 25224, "status": "active"}, "vpc_flow": {"records": 0, "status": "no_data"} } ``` ### Error Pattern Analysis All MCP tool failures show the same pattern: ``` TABLE_NOT_FOUND: line 2:14: Table 'awsdatacatalog.amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0' does not exist ``` ## Recommended MCP Server Improvements ### 1. **Dynamic Table Discovery** Replace hardcoded table names with dynamic discovery: ```python def discover_security_lake_tables(database_name: str) -> Dict[str, List[str]]: """ Discover available Security Lake tables and categorize by data source """ glue_client = boto3.client('glue') tables = glue_client.get_tables(DatabaseName=database_name) data_sources = { 'cloudtrail': [], 'guardduty': [], 'security_hub': [], 'vpc_flow': [], 'route53': [], 'eks_audit': [], 'lambda_execution': [], 'other': [] } for table in tables['TableList']: table_name = table['Name'] # Extract data source from table name pattern if 'cloud_trail' in table_name: data_sources['cloudtrail'].append(table_name) elif 'guardduty' in table_name: data_sources['guardduty'].append(table_name) elif 'sh_findings' in table_name: data_sources['security_hub'].append(table_name) elif 'vpc_flow' in table_name: data_sources['vpc_flow'].append(table_name) elif 'route53' in table_name: data_sources['route53'].append(table_name) elif 'eks_audit' in table_name: data_sources['eks_audit'].append(table_name) elif 'lambda_execution' in table_name: data_sources['lambda_execution'].append(table_name) else: data_sources['other'].append(table_name) return data_sources ``` ### 2. **Flexible OCSF Version Support** Detect and handle both OCSF 1.0 and 2.0 schemas: ```python def get_ocsf_version_from_table(table_name: str) -> str: """Extract OCSF version from table name""" if '_1_0' in table_name: return '1.0' elif '_2_0' in table_name: return '2.0' else: return 'unknown' def build_query_for_ocsf_version(base_query: str, version: str, table_name: str) -> str: """Adapt query structure based on OCSF version""" if version == '2.0': # Handle OCSF 2.0 nested structures return adapt_query_for_ocsf_v2(base_query, table_name) else: # Handle OCSF 1.0 flat structures return adapt_query_for_ocsf_v1(base_query, table_name) ``` ### 3. **Graceful Data Source Handling** Check for data availability before querying: ```python def check_data_source_availability(table_name: str) -> Dict[str, Any]: """ Check if a data source has available data """ athena_client = boto3.client('athena') # Quick count query to check for data query = f"SELECT COUNT(*) FROM {table_name} LIMIT 1" try: result = execute_athena_query(query) count = int(result['ResultSet']['Rows'][1]['Data'][0]['VarCharValue']) return { 'available': count > 0, 'record_count': count, 'status': 'active' if count > 0 else 'no_data' } except Exception as e: return { 'available': False, 'record_count': 0, 'status': 'error', 'error': str(e) } ``` ### 4. **Enhanced Error Handling** Provide meaningful feedback when data sources aren't available: ```python def search_guardduty_findings(request: GuardDutySearchRequest) -> Dict[str, Any]: """ Search GuardDuty findings with improved error handling """ # Discover available tables tables = discover_security_lake_tables(DATABASE_NAME) if not tables['guardduty']: return { 'success': False, 'error': 'GuardDuty data source not found', 'suggestions': [ 'Enable GuardDuty in Security Lake', 'Check if GuardDuty findings are available in Security Hub data', 'Use Security Hub findings search instead' ], 'available_data_sources': list(tables.keys()) } # Continue with actual search using discovered table... ``` ### 5. **Universal Security Lake Query Interface** Create a unified interface that works with any available data source: ```python def universal_security_search( query_type: str, # 'findings', 'network', 'api_calls', etc. filters: Dict[str, Any], data_sources: List[str] = None # Auto-detect if None ) -> Dict[str, Any]: """ Universal search interface that adapts to available data sources """ available_tables = discover_security_lake_tables(DATABASE_NAME) if data_sources is None: # Auto-select relevant data sources based on query type data_sources = auto_select_data_sources(query_type, available_tables) results = [] for source in data_sources: if available_tables.get(source): table_name = available_tables[source][0] # Use first available table source_results = query_data_source(table_name, query_type, filters) results.extend(source_results) return { 'success': True, 'results': results, 'data_sources_used': data_sources, 'total_results': len(results) } ``` ## Implementation Priority ### High Priority (Critical Issues) 1. **Dynamic table discovery** - Replace all hardcoded table names 2. **OCSF 2.0 support** - Handle the newer schema format 3. **Data source availability checks** - Prevent queries to empty/missing tables ### Medium Priority (Usability) 1. **Enhanced error messages** - Provide actionable feedback 2. **Auto-fallback logic** - Use Security Hub for GuardDuty data if GuardDuty tables missing 3. **Query optimization** - Handle Iceberg table format efficiently ### Low Priority (Nice to Have) 1. **Multiple region support** - Currently hardcoded to us-east-1 2. **Custom data source support** - Handle non-AWS data sources 3. **Query result caching** - Improve performance for repeated queries ## Test Cases for Validation Create test scenarios covering: - OCSF 1.0 vs 2.0 environments - Different data source combinations - Empty/missing data sources - Multiple regions - Various query types (IP search, findings, etc.) ## Expected Outcomes After implementing these improvements: - ✅ Tools work with any Security Lake configuration - ✅ Graceful handling of missing data sources - ✅ Support for both OCSF 1.0 and 2.0 - ✅ Better error messages and user guidance - ✅ Dynamic adaptation to available data ## Current Working Example The Security Hub findings can be successfully queried using this approach: ```sql SELECT severity, finding_info.title, finding_info.desc, resource.type, compliance.control FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0" WHERE severity IS NOT NULL ORDER BY CASE severity WHEN 'Critical' THEN 1 WHEN 'High' THEN 2 ELSE 3 END ``` This demonstrates the tools can work when properly adapted to the actual table structure.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server