Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
validators.py14.6 kB
import json from datetime import datetime from typing import Any, Dict, List, Tuple import structlog from dateutil import parser as date_parser from .schemas import BaseEvent, OCSFEventFactory logger = structlog.get_logger(__name__) class OCSFValidator: """Validator for OCSF events and data structures""" def __init__(self): self.validation_errors = [] def validate_event(self, event_data: Dict[str, Any]) -> Tuple[bool, List[str]]: """ Validate an OCSF event structure Args: event_data: Raw event data dictionary Returns: Tuple of (is_valid, list_of_errors) """ self.validation_errors = [] # Check required fields self._validate_required_fields(event_data) # Validate field types and formats self._validate_field_types(event_data) # Validate timestamps self._validate_timestamps(event_data) # Validate metadata structure self._validate_metadata(event_data) # Validate cloud context if present if "cloud" in event_data: self._validate_cloud_context(event_data["cloud"]) # Validate endpoints if present for endpoint_field in ["src_endpoint", "dst_endpoint"]: if endpoint_field in event_data: self._validate_endpoint(event_data[endpoint_field], endpoint_field) # Validate finding structure if present if "finding" in event_data: self._validate_finding(event_data["finding"]) is_valid = len(self.validation_errors) == 0 return is_valid, self.validation_errors.copy() def _validate_required_fields(self, event_data: Dict[str, Any]) -> None: """Validate presence of required OCSF fields""" required_fields = [ "time", "type_name", "type_uid", "class_name", "class_uid", "metadata" ] for field in required_fields: if field not in event_data: self.validation_errors.append(f"Required field '{field}' is missing") elif event_data[field] is None: self.validation_errors.append(f"Required field '{field}' cannot be null") def _validate_field_types(self, event_data: Dict[str, Any]) -> None: """Validate data types of OCSF fields""" type_validations = { "type_uid": int, "class_uid": int, "category_uid": int, "activity_id": int, "severity_id": int, "type_name": str, "class_name": str, "category_name": str, "activity_name": str, "severity": str, "message": str } for field, expected_type in type_validations.items(): if field in event_data and event_data[field] is not None: if not isinstance(event_data[field], expected_type): self.validation_errors.append( f"Field '{field}' should be {expected_type.__name__}, " f"got {type(event_data[field]).__name__}" ) def _validate_timestamps(self, event_data: Dict[str, Any]) -> None: """Validate timestamp fields""" timestamp_fields = ["time", "start_time", "end_time"] for field in timestamp_fields: if field in event_data and event_data[field] is not None: try: # Try to parse the timestamp parsed_time = date_parser.parse(event_data[field]) # Check if timestamp is reasonable (not too far in future) now = datetime.utcnow() if parsed_time > now: days_in_future = (parsed_time - now).days if days_in_future > 1: self.validation_errors.append( f"Timestamp '{field}' is {days_in_future} days in the future" ) except (ValueError, TypeError) as e: self.validation_errors.append( f"Invalid timestamp format in field '{field}': {event_data[field]}" ) def _validate_metadata(self, event_data: Dict[str, Any]) -> None: """Validate metadata structure""" metadata = event_data.get("metadata") if not metadata: return if not isinstance(metadata, dict): self.validation_errors.append("Metadata must be a dictionary") return # Check required metadata fields required_metadata_fields = ["version", "product"] for field in required_metadata_fields: if field not in metadata: self.validation_errors.append(f"Metadata missing required field '{field}'") # Validate product structure product = metadata.get("product") if product and isinstance(product, dict): if "name" not in product: self.validation_errors.append("Metadata product missing 'name' field") elif product is not None: self.validation_errors.append("Metadata product must be a dictionary") # Validate version format version = metadata.get("version") if version and not isinstance(version, str): self.validation_errors.append("Metadata version must be a string") def _validate_cloud_context(self, cloud_data: Dict[str, Any]) -> None: """Validate cloud context structure""" if not isinstance(cloud_data, dict): self.validation_errors.append("Cloud context must be a dictionary") return # Validate account structure account = cloud_data.get("account") if account: if isinstance(account, dict): if "uid" not in account: self.validation_errors.append("Cloud account missing 'uid' field") else: self.validation_errors.append("Cloud account must be a dictionary") # Validate region format region = cloud_data.get("region") if region and not isinstance(region, str): self.validation_errors.append("Cloud region must be a string") def _validate_endpoint(self, endpoint_data: Dict[str, Any], field_name: str) -> None: """Validate endpoint structure""" if not isinstance(endpoint_data, dict): self.validation_errors.append(f"{field_name} must be a dictionary") return # Validate IP address format if present ip = endpoint_data.get("ip") if ip: if not self._is_valid_ip(ip): self.validation_errors.append(f"Invalid IP address in {field_name}: {ip}") # Validate port number if present port = endpoint_data.get("port") if port is not None: if not isinstance(port, int) or port < 0 or port > 65535: self.validation_errors.append(f"Invalid port number in {field_name}: {port}") def _validate_finding(self, finding_data: Dict[str, Any]) -> None: """Validate finding structure""" if not isinstance(finding_data, dict): self.validation_errors.append("Finding must be a dictionary") return # Check required finding fields if "uid" not in finding_data: self.validation_errors.append("Finding missing required 'uid' field") # Validate finding types if present types = finding_data.get("types") if types is not None: if isinstance(types, str): # Try to parse as JSON if it's a string try: types = json.loads(types) except json.JSONDecodeError: types = [types] # Treat as single type if not isinstance(types, list): self.validation_errors.append("Finding types must be a list or JSON array string") def _is_valid_ip(self, ip: str) -> bool: """Validate IP address format""" try: import ipaddress ipaddress.ip_address(ip) return True except ValueError: return False def validate_security_lake_record(self, record: Dict[str, Any]) -> Dict[str, Any]: """ Validate a complete Security Lake record Args: record: Security Lake record dictionary Returns: Validation result with details """ result = { "is_valid": True, "errors": [], "warnings": [], "ocsf_compliance": {}, "suggestions": [] } try: # Basic OCSF validation is_valid, errors = self.validate_event(record) result["is_valid"] = is_valid result["errors"] = errors # Check OCSF compliance compliance = self._check_ocsf_compliance(record) result["ocsf_compliance"] = compliance # Generate suggestions for improvement suggestions = self._generate_improvement_suggestions(record, errors) result["suggestions"] = suggestions # Check for warnings (non-critical issues) warnings = self._check_for_warnings(record) result["warnings"] = warnings except Exception as e: result["is_valid"] = False result["errors"].append(f"Validation failed with exception: {str(e)}") logger.error("Record validation failed", error=str(e)) return result def _check_ocsf_compliance(self, record: Dict[str, Any]) -> Dict[str, Any]: """Check OCSF compliance level""" total_fields = len(record) standard_fields = 0 # Common OCSF fields to check for ocsf_fields = [ "time", "type_name", "type_uid", "class_name", "class_uid", "category_name", "category_uid", "activity_name", "activity_id", "severity", "severity_id", "metadata", "cloud", "actor", "src_endpoint", "dst_endpoint", "finding", "resources" ] found_fields = [] for field in ocsf_fields: if field in record: standard_fields += 1 found_fields.append(field) compliance_percentage = (standard_fields / len(ocsf_fields)) * 100 if ocsf_fields else 0 return { "score": compliance_percentage, "standard_fields_found": standard_fields, "total_standard_fields": len(ocsf_fields), "found_fields": found_fields, "missing_standard_fields": [f for f in ocsf_fields if f not in record], "compliance_level": self._get_compliance_level(compliance_percentage) } def _get_compliance_level(self, percentage: float) -> str: """Determine compliance level based on percentage""" if percentage >= 80: return "HIGH" elif percentage >= 60: return "MEDIUM" elif percentage >= 40: return "LOW" else: return "MINIMAL" def _generate_improvement_suggestions( self, record: Dict[str, Any], errors: List[str] ) -> List[str]: """Generate suggestions for improving OCSF compliance""" suggestions = [] # Suggest missing critical fields critical_fields = ["time", "type_name", "type_uid", "severity"] for field in critical_fields: if field not in record: suggestions.append(f"Add required field '{field}' for better OCSF compliance") # Suggest metadata improvements metadata = record.get("metadata", {}) if not metadata.get("product", {}).get("vendor_name"): suggestions.append("Add vendor_name to metadata.product for better identification") if not metadata.get("profiles"): suggestions.append("Consider adding OCSF profiles to metadata for enhanced compatibility") # Suggest cloud context improvements if "cloud" in record: cloud = record["cloud"] if not cloud.get("provider"): suggestions.append("Add cloud provider information for better context") # Suggest endpoint improvements for endpoint_field in ["src_endpoint", "dst_endpoint"]: if endpoint_field in record: endpoint = record[endpoint_field] if endpoint.get("ip") and not endpoint.get("hostname"): suggestions.append(f"Consider adding hostname to {endpoint_field} for enrichment") return suggestions def _check_for_warnings(self, record: Dict[str, Any]) -> List[str]: """Check for non-critical issues that warrant warnings""" warnings = [] # Check for deprecated fields or patterns if "eventTime" in record: warnings.append("Field 'eventTime' should be 'time' in OCSF") # Check for unusual severity combinations severity = record.get("severity") severity_id = record.get("severity_id") if severity and severity_id: expected_id = self._get_expected_severity_id(severity) if expected_id and expected_id != severity_id: warnings.append(f"Severity '{severity}' doesn't match severity_id {severity_id}") # Check for missing recommended fields if "actor" not in record and "device" not in record: warnings.append("Consider adding actor or device information for better context") return warnings def _get_expected_severity_id(self, severity: str) -> int: """Get expected severity ID for severity string""" severity_mapping = { "unknown": 99, "informational": 1, "low": 2, "medium": 3, "high": 4, "critical": 5 } return severity_mapping.get(severity.lower())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server