Skip to main content
Glama

AWS Security MCP

guardduty.py21.5 kB
"""AWS GuardDuty service client module. This module provides functions for interacting with the AWS GuardDuty service. """ from typing import Dict, List, Optional, Any import logging import boto3 from botocore.exceptions import ClientError, NoCredentialsError from aws_security_mcp.services.base import get_aws_session, get_client, handle_pagination logger = logging.getLogger(__name__) def get_guardduty_client(session_context: Optional[str] = None, **kwargs: Any) -> boto3.client: """Get AWS GuardDuty client. Args: session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the boto3 client constructor Returns: boto3.client: An initialized GuardDuty client """ return get_client('guardduty', session_context=session_context, **kwargs) def list_detectors(max_results: int = 100, session_context: Optional[str] = None, **kwargs: Any) -> List[Dict[str, Any]]: """List all GuardDuty detectors in the current region. Args: max_results: Maximum number of detectors to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the list_detectors API call Returns: List[Dict[str, Any]]: List of detector details including DetectorId """ client = get_guardduty_client(session_context=session_context) # Set up parameters params = { 'MaxResults': min(50, max_results), # API max is 50 **kwargs } try: # Call the API directly and handle pagination manually response = client.list_detectors(**params) detector_ids = response.get('DetectorIds', []) # Handle pagination if there's a next token next_token = response.get('NextToken') while next_token and len(detector_ids) < max_results: params['NextToken'] = next_token # Adjust max_results for the next request params['MaxResults'] = min(50, max_results - len(detector_ids)) response = client.list_detectors(**params) detector_ids.extend(response.get('DetectorIds', [])) next_token = response.get('NextToken') # Break if we have enough detectors if len(detector_ids) >= max_results: break # Limit the results if we got more than requested if len(detector_ids) > max_results: detector_ids = detector_ids[:max_results] # Return list of dictionaries with DetectorId for consistency return [{"DetectorId": detector_id} for detector_id in detector_ids] except NoCredentialsError: logger.error("AWS credentials not found for listing GuardDuty detectors") return [] except ClientError as e: error_code = e.response['Error']['Code'] logger.error(f"AWS API error listing GuardDuty detectors: {error_code} - {e}") return [] except Exception as e: logger.error(f"Unexpected error listing GuardDuty detectors: {e}") return [] def get_detector(detector_id: str, session_context: Optional[str] = None, **kwargs: Any) -> Optional[Dict[str, Any]]: """Get detailed information about a GuardDuty detector. Args: detector_id: GuardDuty detector ID session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the get_detector API call Returns: Optional[Dict[str, Any]]: Detector details or None if not found """ client = get_guardduty_client(session_context=session_context) try: response = client.get_detector( DetectorId=detector_id, **kwargs ) # Add the DetectorId to the response for consistency detector_details = response.copy() detector_details['DetectorId'] = detector_id return detector_details except NoCredentialsError: logger.error(f"AWS credentials not found for getting GuardDuty detector {detector_id}") return None except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'BadRequestException': logger.warning(f"GuardDuty detector {detector_id} not found") else: logger.error(f"AWS API error getting GuardDuty detector {detector_id}: {error_code} - {e}") return None except Exception as e: logger.error(f"Unexpected error getting GuardDuty detector {detector_id}: {e}") return None def get_detector_id(session_context: Optional[str] = None) -> Optional[str]: """Get the first GuardDuty detector ID in the current region. Args: session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: Optional[str]: The first detector ID or None if no detectors exist """ detectors = list_detectors(session_context=session_context) if detectors: return detectors[0].get('DetectorId') return None def list_findings( detector_id: Optional[str] = None, finding_criteria: Optional[Dict[str, Any]] = None, max_results: int = 50, session_context: Optional[str] = None, **kwargs: Any ) -> List[str]: """List GuardDuty findings based on criteria. Args: detector_id: GuardDuty detector ID (if None, gets the first detector) finding_criteria: Criteria to filter findings max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the list_findings API call Returns: List[str]: List of finding IDs """ # Get detector ID if not provided if detector_id is None: detector_id = get_detector_id(session_context=session_context) if detector_id is None: return [] client = get_guardduty_client(session_context=session_context) # Set up parameters params = { 'DetectorId': detector_id, 'MaxResults': min(50, max_results), # API max is 50 **kwargs } if finding_criteria: params['FindingCriteria'] = finding_criteria try: # Call the API directly and handle pagination manually response = client.list_findings(**params) finding_ids = response.get('FindingIds', []) # Handle pagination if there's a next token next_token = response.get('NextToken') while next_token and len(finding_ids) < max_results: params['NextToken'] = next_token # Adjust max_results for the next request params['MaxResults'] = min(50, max_results - len(finding_ids)) response = client.list_findings(**params) finding_ids.extend(response.get('FindingIds', [])) next_token = response.get('NextToken') # Break if we have enough findings if len(finding_ids) >= max_results: break # Limit the results if we got more than requested if len(finding_ids) > max_results: finding_ids = finding_ids[:max_results] return finding_ids except NoCredentialsError: logger.error(f"AWS credentials not found for listing GuardDuty findings for detector {detector_id}") return [] except ClientError as e: error_code = e.response['Error']['Code'] logger.error(f"AWS API error listing GuardDuty findings for detector {detector_id}: {error_code} - {e}") return [] except Exception as e: logger.error(f"Unexpected error listing GuardDuty findings for detector {detector_id}: {e}") return [] def get_findings( detector_id: Optional[str] = None, finding_ids: Optional[List[str]] = None, max_results: int = 50, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """Get detailed information about GuardDuty findings. Args: detector_id: GuardDuty detector ID (if None, gets the first detector) finding_ids: List of finding IDs to get details for max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the get_findings API call Returns: List[Dict[str, Any]]: List of finding details """ # Get detector ID if not provided if detector_id is None: detector_id = get_detector_id(session_context=session_context) if detector_id is None: return [] # If no finding IDs provided, list findings if not finding_ids: finding_ids = list_findings(detector_id=detector_id, max_results=max_results, session_context=session_context) if not finding_ids: return [] client = get_guardduty_client(session_context=session_context) all_findings = [] try: # Process finding IDs in batches of 50 (API limitation) for i in range(0, len(finding_ids), 50): batch_ids = finding_ids[i:i+50] findings_response = client.get_findings( DetectorId=detector_id, FindingIds=batch_ids, **kwargs ) findings = findings_response.get('Findings', []) all_findings.extend(findings) # Check if we have enough findings if len(all_findings) >= max_results: return all_findings[:max_results] return all_findings except NoCredentialsError: logger.error(f"AWS credentials not found for getting GuardDuty findings for detector {detector_id}") return [] except ClientError as e: error_code = e.response['Error']['Code'] logger.error(f"AWS API error getting GuardDuty findings for detector {detector_id}: {error_code} - {e}") return [] except Exception as e: logger.error(f"Unexpected error getting GuardDuty findings for detector {detector_id}: {e}") return [] def get_findings_statistics( detector_id: Optional[str] = None, finding_statistic_types: Optional[List[str]] = None, group_by: Optional[str] = None, finding_criteria: Optional[Dict[str, Any]] = None, order_by: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None, **kwargs: Any ) -> Optional[Dict[str, Any]]: """Get official AWS-calculated statistics for GuardDuty findings. Args: detector_id: GuardDuty detector ID (if None, gets the first detector) finding_statistic_types: Types of statistics to retrieve (e.g., ['COUNT_BY_SEVERITY']) group_by: Group statistics by one of: ACCOUNT, DATE, FINDING_TYPE, RESOURCE, SEVERITY finding_criteria: Criteria to filter findings for statistics order_by: Sort order: ASC or DESC (only with group_by) max_results: Maximum results to return (only with group_by, default 25, max 100) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the get_findings_statistics API call Returns: Optional[Dict[str, Any]]: Official AWS findings statistics or None if error Note: You must provide either finding_statistic_types OR group_by parameter, but not both. The order_by and max_results parameters can only be used with group_by. """ # Get detector ID if not provided if detector_id is None: detector_id = get_detector_id(session_context=session_context) if detector_id is None: return None client = get_guardduty_client(session_context=session_context) # Validate parameters if finding_statistic_types and group_by: logger.error("Cannot provide both finding_statistic_types and group_by parameters") return None if not finding_statistic_types and not group_by: logger.error("Must provide either finding_statistic_types or group_by parameter") return None if (order_by or max_results) and not group_by: logger.error("order_by and max_results can only be used with group_by parameter") return None # Set up parameters params = { 'DetectorId': detector_id, **kwargs } if finding_statistic_types: params['FindingStatisticTypes'] = finding_statistic_types if group_by: params['GroupBy'] = group_by if order_by: params['OrderBy'] = order_by if max_results: params['MaxResults'] = min(100, max_results) # API max is 100 if finding_criteria: params['FindingCriteria'] = finding_criteria try: response = client.get_findings_statistics(**params) # Add the DetectorId to the response for consistency statistics = response.copy() statistics['DetectorId'] = detector_id return statistics except NoCredentialsError: logger.error(f"AWS credentials not found for getting GuardDuty findings statistics for detector {detector_id}") return None except ClientError as e: error_code = e.response['Error']['Code'] logger.error(f"AWS API error getting GuardDuty findings statistics for detector {detector_id}: {error_code} - {e}") return None except Exception as e: logger.error(f"Unexpected error getting GuardDuty findings statistics for detector {detector_id}: {e}") return None def list_ip_sets( detector_id: Optional[str] = None, max_results: int = 50, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """List IP sets for a GuardDuty detector. Args: detector_id: GuardDuty detector ID (if None, gets the first detector) max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the list_ip_sets API call Returns: List[Dict[str, Any]]: List of IP set details """ # Get detector ID if not provided if detector_id is None: detector_id = get_detector_id(session_context=session_context) if detector_id is None: return [] client = get_guardduty_client(session_context=session_context) # Set up parameters params = { 'DetectorId': detector_id, 'MaxResults': min(50, max_results), # API max is 50 **kwargs } try: # Call the API directly and handle pagination manually response = client.list_ip_sets(**params) ip_set_ids = response.get('IpSetIds', []) # Handle pagination if there's a next token next_token = response.get('NextToken') while next_token and len(ip_set_ids) < max_results: params['NextToken'] = next_token # Adjust max_results for the next request params['MaxResults'] = min(50, max_results - len(ip_set_ids)) response = client.list_ip_sets(**params) ip_set_ids.extend(response.get('IpSetIds', [])) next_token = response.get('NextToken') # Break if we have enough IP sets if len(ip_set_ids) >= max_results: break # Limit the results if we got more than requested if len(ip_set_ids) > max_results: ip_set_ids = ip_set_ids[:max_results] return [{"IpSetId": ip_set_id} for ip_set_id in ip_set_ids] except NoCredentialsError: logger.error(f"AWS credentials not found for listing GuardDuty IP sets for detector {detector_id}") return [] except ClientError as e: error_code = e.response['Error']['Code'] logger.error(f"AWS API error listing GuardDuty IP sets for detector {detector_id}: {error_code} - {e}") return [] except Exception as e: logger.error(f"Unexpected error listing GuardDuty IP sets for detector {detector_id}: {e}") return [] def list_threat_intel_sets( detector_id: Optional[str] = None, max_results: int = 50, session_context: Optional[str] = None, **kwargs: Any ) -> List[Dict[str, Any]]: """List threat intelligence sets for a GuardDuty detector. Args: detector_id: GuardDuty detector ID (if None, gets the first detector) max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") **kwargs: Additional arguments to pass to the list_threat_intel_sets API call Returns: List[Dict[str, Any]]: List of threat intel set details """ # Get detector ID if not provided if detector_id is None: detector_id = get_detector_id(session_context=session_context) if detector_id is None: return [] client = get_guardduty_client(session_context=session_context) # Set up parameters params = { 'DetectorId': detector_id, 'MaxResults': min(50, max_results), # API max is 50 **kwargs } try: # Call the API directly and handle pagination manually response = client.list_threat_intel_sets(**params) threat_intel_set_ids = response.get('ThreatIntelSetIds', []) # Handle pagination if there's a next token next_token = response.get('NextToken') while next_token and len(threat_intel_set_ids) < max_results: params['NextToken'] = next_token # Adjust max_results for the next request params['MaxResults'] = min(50, max_results - len(threat_intel_set_ids)) response = client.list_threat_intel_sets(**params) threat_intel_set_ids.extend(response.get('ThreatIntelSetIds', [])) next_token = response.get('NextToken') # Break if we have enough threat intel sets if len(threat_intel_set_ids) >= max_results: break # Limit the results if we got more than requested if len(threat_intel_set_ids) > max_results: threat_intel_set_ids = threat_intel_set_ids[:max_results] return [{"ThreatIntelSetId": threat_intel_set_id} for threat_intel_set_id in threat_intel_set_ids] except NoCredentialsError: logger.error(f"AWS credentials not found for listing GuardDuty threat intel sets for detector {detector_id}") return [] except ClientError as e: error_code = e.response['Error']['Code'] logger.error(f"AWS API error listing GuardDuty threat intel sets for detector {detector_id}: {error_code} - {e}") return [] except Exception as e: logger.error(f"Unexpected error listing GuardDuty threat intel sets for detector {detector_id}: {e}") return [] def filter_findings_by_severity( findings: List[Dict[str, Any]], severity: str = "ALL" ) -> List[Dict[str, Any]]: """Filter findings by severity level. Args: findings: List of findings to filter severity: Severity level ("LOW", "MEDIUM", "HIGH", or "ALL") Returns: List[Dict[str, Any]]: Filtered list of findings """ if severity == "ALL": return findings severity_ranges = { "LOW": (1.0, 3.9), "MEDIUM": (4.0, 6.9), "HIGH": (7.0, 10.0) } if severity not in severity_ranges: raise ValueError(f"Invalid severity level: {severity}") min_severity, max_severity = severity_ranges[severity] return [ finding for finding in findings if min_severity <= finding.get('Severity', 0) <= max_severity ] def filter_findings_by_text( findings: List[Dict[str, Any]], search_term: str = "" ) -> List[Dict[str, Any]]: """Filter findings by search term. Args: findings: List of findings to filter search_term: Term to search for in finding fields Returns: List[Dict[str, Any]]: Filtered list of findings """ if not search_term: return findings search_term_lower = search_term.lower() filtered_findings = [] for finding in findings: # Check if the search term matches any important fields if (search_term_lower in finding.get('Id', '').lower() or search_term_lower in finding.get('Type', '').lower() or search_term_lower in finding.get('Description', '').lower() or search_term_lower in str(finding.get('Resource', {})).lower() or search_term_lower in finding.get('Title', '').lower()): filtered_findings.append(finding) return filtered_findings

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server