Skip to main content
Glama

AWS Security MCP

guardduty_tools.py20 kB
"""GuardDuty tools for AWS Security MCP.""" import logging import json from datetime import datetime from typing import List, Optional, Dict, Any from aws_security_mcp.services import guardduty from aws_security_mcp.tools import register_tool from aws_security_mcp.formatters.guardduty import ( format_guardduty_detector_json, format_guardduty_finding_json, format_guardduty_findings_list_statistics_json, format_guardduty_findings_statistics_json, format_guardduty_detectors_summary_json, format_guardduty_ip_set_json, format_guardduty_threat_intel_set_json ) # Configure logging logger = logging.getLogger(__name__) @register_tool() async def list_detectors(max_results: int = 100, session_context: Optional[str] = None) -> str: """List all GuardDuty detectors in the account. Args: max_results: Maximum number of detectors to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with GuardDuty detectors """ logger.info(f"Listing GuardDuty detectors (session_context={session_context})") try: logger.info("Calling guardduty.list_detectors") detectors = guardduty.list_detectors(max_results=max_results, session_context=session_context) logger.info(f"Received detectors: {detectors}") if not detectors: return json.dumps({ "count": 0, "summary": "No GuardDuty detectors found in the account", "detectors": [] }) formatted_detectors = [] detector_details = [] for detector in detectors: logger.info(f"Processing detector: {detector}") # The detector should be a dictionary with a DetectorId key detector_id = detector.get('DetectorId') logger.info(f"Detector ID: {detector_id}") if not detector_id: logger.warning(f"Detector without ID found: {detector}") continue try: # Get detector details logger.info(f"Calling guardduty.get_detector with ID: {detector_id}") details = guardduty.get_detector(detector_id, session_context=session_context) logger.info(f"Received detector details: {details}") if details: # Store the details for summary detector_details.append(details) # Format the detector with our JSON formatter logger.info("Formatting detector with format_guardduty_detector_json") formatted_detector = format_guardduty_detector_json(details) logger.info(f"Formatted detector: {formatted_detector}") formatted_detectors.append(formatted_detector) else: logger.warning(f"No details found for detector ID: {detector_id}") # Just include the basic detector ID formatted_detectors.append({ "detector_id": detector_id, "status": "Unknown", "created_at": None, "updated_at": None, "service_role": None, "features": [] }) except Exception as e: logger.warning(f"Error getting details for detector {detector_id}: {e}") # Just include the basic detector ID formatted_detectors.append({ "detector_id": detector_id, "status": "Unknown", "created_at": None, "updated_at": None, "service_role": None, "features": [] }) # Get a summary of all detectors logger.info(f"Creating summary from detector details: {detector_details}") if detector_details: logger.info("Calling format_guardduty_detectors_summary_json") detectors_summary = format_guardduty_detectors_summary_json(detector_details) logger.info(f"Detector summary: {detectors_summary}") else: logger.info("No detector details for summary, using default values") detectors_summary = { "total_detectors": len(formatted_detectors), "enabled_detectors": 0, "disabled_detectors": 0, "finding_publishing_frequencies": {}, "total_findings": None, "regions_covered": 0 } result = { "count": len(formatted_detectors), "summary": f"Found {len(formatted_detectors)} GuardDuty detector(s)", "detectors": formatted_detectors, "detectors_summary": detectors_summary } logger.info("Serializing result to JSON") return json.dumps(result, default=lambda obj: obj.isoformat() if hasattr(obj, 'isoformat') else str(obj)) except Exception as e: logger.error(f"Error listing GuardDuty detectors: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return json.dumps({ "error": { "message": f"Error listing GuardDuty detectors: {str(e)}", "type": type(e).__name__ } }) @register_tool() async def list_findings( detector_id: str, max_results: int = 50, finding_ids: Optional[List[str]] = None, severity: Optional[str] = None, search_term: Optional[str] = None, session_context: Optional[str] = None ) -> str: """List GuardDuty findings for a specific detector. Args: detector_id: GuardDuty detector ID max_results: Maximum number of findings to return finding_ids: Optional list of specific finding IDs to retrieve severity: Optional severity filter (LOW, MEDIUM, HIGH, ALL) search_term: Optional text search term session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with GuardDuty findings """ logger.info(f"Listing GuardDuty findings for detector {detector_id} (session_context={session_context})") try: # Fetch findings if finding_ids: findings = guardduty.get_findings(detector_id, finding_ids, max_results=max_results, session_context=session_context) else: finding_ids = guardduty.list_findings(detector_id, max_results=max_results, session_context=session_context) findings = guardduty.get_findings(detector_id, finding_ids, max_results=max_results, session_context=session_context) if not findings: return json.dumps({ "detector_id": detector_id, "count": 0, "summary": f"No GuardDuty findings for detector '{detector_id}'", "findings": [] }) # Apply filters if provided if severity and severity != "ALL": findings = guardduty.filter_findings_by_severity(findings, severity) if search_term: findings = guardduty.filter_findings_by_text(findings, search_term) # Get a summary of findings findings_summary = format_guardduty_findings_list_statistics_json(findings) # Format each finding formatted_findings = [] for finding in findings: formatted_finding = format_guardduty_finding_json(finding) formatted_findings.append(formatted_finding) result = { "detector_id": detector_id, "count": len(findings), "summary": f"Found {len(findings)} GuardDuty finding(s) for detector '{detector_id}'", "findings": formatted_findings, "severity_distribution": findings_summary.get("severity_distribution", {}), "top_finding_types": findings_summary.get("top_finding_types", []) } return json.dumps(result, default=lambda obj: obj.isoformat() if hasattr(obj, 'isoformat') else str(obj)) except Exception as e: logger.error(f"Error listing GuardDuty findings: {e}") return json.dumps({ "error": { "message": f"Error listing GuardDuty findings for detector '{detector_id}': {str(e)}", "type": type(e).__name__ } }) @register_tool() async def get_findings_statistics( detector_id: str, finding_statistic_types: Optional[List[str]] = None, group_by: Optional[str] = None, finding_criteria: Optional[Dict[str, Any]] = None, order_by: Optional[str] = None, max_results: Optional[int] = None, session_context: Optional[str] = None ) -> str: """Get official AWS-calculated statistics for GuardDuty findings. Args: detector_id: GuardDuty detector ID finding_statistic_types: Types of statistics to retrieve (e.g., ['COUNT_BY_SEVERITY']) group_by: Group statistics by one of: ACCOUNT, DATE, FINDING_TYPE, RESOURCE, SEVERITY finding_criteria: Criteria to filter findings for statistics order_by: Sort order: ASC or DESC (only with group_by) max_results: Maximum results to return (only with group_by, default 25, max 100) session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with official AWS GuardDuty findings statistics Examples: # Get severity count statistics finding_statistic_types=['COUNT_BY_SEVERITY'] # Get statistics grouped by finding type group_by='FINDING_TYPE', order_by='DESC', max_results=10 # Get statistics with filtering finding_criteria={'Criterion': {'severity': {'Gte': 4.0}}} """ logger.info(f"Getting GuardDuty findings statistics for detector {detector_id} (session_context={session_context})") try: # Call the service function statistics = guardduty.get_findings_statistics( detector_id=detector_id, finding_statistic_types=finding_statistic_types, group_by=group_by, finding_criteria=finding_criteria, order_by=order_by, max_results=max_results, session_context=session_context ) if not statistics: return json.dumps({ "detector_id": detector_id, "error": "Unable to retrieve findings statistics", "statistics": None }) # Extract the findings statistics from the response finding_statistics = statistics.get('FindingStatistics', {}) # Format the statistics using the existing formatter formatted_statistics = format_guardduty_findings_statistics_json(finding_statistics) result = { "detector_id": detector_id, "summary": "Official AWS GuardDuty findings statistics", "statistics_type": "severity_count" if finding_statistic_types else f"grouped_by_{group_by.lower()}" if group_by else "unknown", "official_statistics": formatted_statistics, "raw_response": { "finding_statistics": finding_statistics, "next_token": statistics.get('NextToken') } } return json.dumps(result, default=lambda obj: obj.isoformat() if hasattr(obj, 'isoformat') else str(obj)) except Exception as e: logger.error(f"Error getting GuardDuty findings statistics: {e}") return json.dumps({ "error": { "message": f"Error getting GuardDuty findings statistics for detector '{detector_id}': {str(e)}", "type": type(e).__name__ } }) @register_tool() async def get_finding_details(detector_id: str, finding_id: str, session_context: Optional[str] = None) -> str: """Get detailed information about a specific GuardDuty finding. Args: detector_id: GuardDuty detector ID finding_id: ID of the finding to retrieve session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with detailed finding information """ logger.info(f"Getting GuardDuty finding details for detector {detector_id}, finding {finding_id} (session_context={session_context})") try: findings = guardduty.get_findings(detector_id, [finding_id], session_context=session_context) if not findings: return json.dumps({ "error": f"Finding '{finding_id}' not found for detector '{detector_id}'" }) finding = findings[0] # Should only be one finding # Use the JSON formatter for detailed formatting formatted_finding = format_guardduty_finding_json(finding) result = { "detector_id": detector_id, "finding": formatted_finding } return json.dumps(result, default=lambda obj: obj.isoformat() if hasattr(obj, 'isoformat') else str(obj)) except Exception as e: logger.error(f"Error retrieving GuardDuty finding details: {e}") return json.dumps({ "error": { "message": f"Error retrieving GuardDuty finding details for detector '{detector_id}', finding '{finding_id}': {str(e)}", "type": type(e).__name__ } }) @register_tool() async def list_ip_sets(detector_id: str, max_results: int = 50, session_context: Optional[str] = None) -> str: """List IP sets for a GuardDuty detector. Args: detector_id: GuardDuty detector ID max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with GuardDuty IP sets """ logger.info(f"Listing GuardDuty IP sets for detector {detector_id} (session_context={session_context})") try: ip_sets_ids = guardduty.list_ip_sets(detector_id, max_results=max_results, session_context=session_context) if not ip_sets_ids: return json.dumps({ "detector_id": detector_id, "count": 0, "summary": f"No GuardDuty IP sets for detector '{detector_id}'", "ip_sets": [] }) # Get details for each IP set client = guardduty.get_guardduty_client(session_context=session_context) formatted_ip_sets = [] for ip_set in ip_sets_ids: ip_set_id = ip_set.get('IpSetId') try: ip_set_details = client.get_ip_set( DetectorId=detector_id, IpSetId=ip_set_id ) # Add the IP set ID to the response if not present if 'IpSetId' not in ip_set_details: ip_set_details['IpSetId'] = ip_set_id formatted_ip_set = format_guardduty_ip_set_json(ip_set_details) formatted_ip_sets.append(formatted_ip_set) except Exception as e: logger.warning(f"Error getting details for IP set {ip_set_id}: {e}") # Include a basic entry for the IP set formatted_ip_sets.append({ "ip_set_id": ip_set_id, "name": "Unknown", "status": "Unknown", "location": "Unknown" }) result = { "detector_id": detector_id, "count": len(formatted_ip_sets), "summary": f"Found {len(formatted_ip_sets)} GuardDuty IP set(s) for detector '{detector_id}'", "ip_sets": formatted_ip_sets } return json.dumps(result, default=lambda obj: obj.isoformat() if hasattr(obj, 'isoformat') else str(obj)) except Exception as e: logger.error(f"Error listing GuardDuty IP sets: {e}") return json.dumps({ "error": { "message": f"Error listing GuardDuty IP sets for detector '{detector_id}': {str(e)}", "type": type(e).__name__ } }) @register_tool() async def list_threat_intel_sets(detector_id: str, max_results: int = 50, session_context: Optional[str] = None) -> str: """List threat intelligence sets for a GuardDuty detector. Args: detector_id: GuardDuty detector ID max_results: Maximum number of results to return session_context: Optional session key for cross-account access (e.g., "123456789012_aws_dev") Returns: JSON formatted string with GuardDuty threat intel sets """ logger.info(f"Listing GuardDuty threat intel sets for detector {detector_id} (session_context={session_context})") try: threat_intel_set_ids = guardduty.list_threat_intel_sets(detector_id, max_results=max_results, session_context=session_context) if not threat_intel_set_ids: return json.dumps({ "detector_id": detector_id, "count": 0, "summary": f"No GuardDuty threat intel sets for detector '{detector_id}'", "threat_intel_sets": [] }) # Get details for each threat intel set client = guardduty.get_guardduty_client(session_context=session_context) formatted_threat_intel_sets = [] for threat_intel_set in threat_intel_set_ids: threat_intel_set_id = threat_intel_set.get('ThreatIntelSetId') try: threat_intel_set_details = client.get_threat_intel_set( DetectorId=detector_id, ThreatIntelSetId=threat_intel_set_id ) # Add the threat intel set ID to the response if not present if 'ThreatIntelSetId' not in threat_intel_set_details: threat_intel_set_details['ThreatIntelSetId'] = threat_intel_set_id formatted_threat_intel_set = format_guardduty_threat_intel_set_json(threat_intel_set_details) formatted_threat_intel_sets.append(formatted_threat_intel_set) except Exception as e: logger.warning(f"Error getting details for threat intel set {threat_intel_set_id}: {e}") # Include a basic entry for the threat intel set formatted_threat_intel_sets.append({ "threat_intel_set_id": threat_intel_set_id, "name": "Unknown", "status": "Unknown", "location": "Unknown" }) result = { "detector_id": detector_id, "count": len(formatted_threat_intel_sets), "summary": f"Found {len(formatted_threat_intel_sets)} GuardDuty threat intel set(s) for detector '{detector_id}'", "threat_intel_sets": formatted_threat_intel_sets } return json.dumps(result, default=lambda obj: obj.isoformat() if hasattr(obj, 'isoformat') else str(obj)) except Exception as e: logger.error(f"Error listing GuardDuty threat intel sets: {e}") return json.dumps({ "error": { "message": f"Error listing GuardDuty threat intel sets for detector '{detector_id}': {str(e)}", "type": type(e).__name__ } })

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/groovyBugify/aws-security-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server