import json
from datetime import datetime
from typing import Any, Dict, List, Tuple
import structlog
from dateutil import parser as date_parser
from .schemas import BaseEvent, OCSFEventFactory
logger = structlog.get_logger(__name__)
class OCSFValidator:
"""Validator for OCSF events and data structures"""
def __init__(self):
self.validation_errors = []
def validate_event(self, event_data: Dict[str, Any]) -> Tuple[bool, List[str]]:
"""
Validate an OCSF event structure
Args:
event_data: Raw event data dictionary
Returns:
Tuple of (is_valid, list_of_errors)
"""
self.validation_errors = []
# Check required fields
self._validate_required_fields(event_data)
# Validate field types and formats
self._validate_field_types(event_data)
# Validate timestamps
self._validate_timestamps(event_data)
# Validate metadata structure
self._validate_metadata(event_data)
# Validate cloud context if present
if "cloud" in event_data:
self._validate_cloud_context(event_data["cloud"])
# Validate endpoints if present
for endpoint_field in ["src_endpoint", "dst_endpoint"]:
if endpoint_field in event_data:
self._validate_endpoint(event_data[endpoint_field], endpoint_field)
# Validate finding structure if present
if "finding" in event_data:
self._validate_finding(event_data["finding"])
is_valid = len(self.validation_errors) == 0
return is_valid, self.validation_errors.copy()
def _validate_required_fields(self, event_data: Dict[str, Any]) -> None:
"""Validate presence of required OCSF fields"""
required_fields = [
"time",
"type_name",
"type_uid",
"class_name",
"class_uid",
"metadata"
]
for field in required_fields:
if field not in event_data:
self.validation_errors.append(f"Required field '{field}' is missing")
elif event_data[field] is None:
self.validation_errors.append(f"Required field '{field}' cannot be null")
def _validate_field_types(self, event_data: Dict[str, Any]) -> None:
"""Validate data types of OCSF fields"""
type_validations = {
"type_uid": int,
"class_uid": int,
"category_uid": int,
"activity_id": int,
"severity_id": int,
"type_name": str,
"class_name": str,
"category_name": str,
"activity_name": str,
"severity": str,
"message": str
}
for field, expected_type in type_validations.items():
if field in event_data and event_data[field] is not None:
if not isinstance(event_data[field], expected_type):
self.validation_errors.append(
f"Field '{field}' should be {expected_type.__name__}, "
f"got {type(event_data[field]).__name__}"
)
def _validate_timestamps(self, event_data: Dict[str, Any]) -> None:
"""Validate timestamp fields"""
timestamp_fields = ["time", "start_time", "end_time"]
for field in timestamp_fields:
if field in event_data and event_data[field] is not None:
try:
# Try to parse the timestamp
parsed_time = date_parser.parse(event_data[field])
# Check if timestamp is reasonable (not too far in future)
now = datetime.utcnow()
if parsed_time > now:
days_in_future = (parsed_time - now).days
if days_in_future > 1:
self.validation_errors.append(
f"Timestamp '{field}' is {days_in_future} days in the future"
)
except (ValueError, TypeError) as e:
self.validation_errors.append(
f"Invalid timestamp format in field '{field}': {event_data[field]}"
)
def _validate_metadata(self, event_data: Dict[str, Any]) -> None:
"""Validate metadata structure"""
metadata = event_data.get("metadata")
if not metadata:
return
if not isinstance(metadata, dict):
self.validation_errors.append("Metadata must be a dictionary")
return
# Check required metadata fields
required_metadata_fields = ["version", "product"]
for field in required_metadata_fields:
if field not in metadata:
self.validation_errors.append(f"Metadata missing required field '{field}'")
# Validate product structure
product = metadata.get("product")
if product and isinstance(product, dict):
if "name" not in product:
self.validation_errors.append("Metadata product missing 'name' field")
elif product is not None:
self.validation_errors.append("Metadata product must be a dictionary")
# Validate version format
version = metadata.get("version")
if version and not isinstance(version, str):
self.validation_errors.append("Metadata version must be a string")
def _validate_cloud_context(self, cloud_data: Dict[str, Any]) -> None:
"""Validate cloud context structure"""
if not isinstance(cloud_data, dict):
self.validation_errors.append("Cloud context must be a dictionary")
return
# Validate account structure
account = cloud_data.get("account")
if account:
if isinstance(account, dict):
if "uid" not in account:
self.validation_errors.append("Cloud account missing 'uid' field")
else:
self.validation_errors.append("Cloud account must be a dictionary")
# Validate region format
region = cloud_data.get("region")
if region and not isinstance(region, str):
self.validation_errors.append("Cloud region must be a string")
def _validate_endpoint(self, endpoint_data: Dict[str, Any], field_name: str) -> None:
"""Validate endpoint structure"""
if not isinstance(endpoint_data, dict):
self.validation_errors.append(f"{field_name} must be a dictionary")
return
# Validate IP address format if present
ip = endpoint_data.get("ip")
if ip:
if not self._is_valid_ip(ip):
self.validation_errors.append(f"Invalid IP address in {field_name}: {ip}")
# Validate port number if present
port = endpoint_data.get("port")
if port is not None:
if not isinstance(port, int) or port < 0 or port > 65535:
self.validation_errors.append(f"Invalid port number in {field_name}: {port}")
def _validate_finding(self, finding_data: Dict[str, Any]) -> None:
"""Validate finding structure"""
if not isinstance(finding_data, dict):
self.validation_errors.append("Finding must be a dictionary")
return
# Check required finding fields
if "uid" not in finding_data:
self.validation_errors.append("Finding missing required 'uid' field")
# Validate finding types if present
types = finding_data.get("types")
if types is not None:
if isinstance(types, str):
# Try to parse as JSON if it's a string
try:
types = json.loads(types)
except json.JSONDecodeError:
types = [types] # Treat as single type
if not isinstance(types, list):
self.validation_errors.append("Finding types must be a list or JSON array string")
def _is_valid_ip(self, ip: str) -> bool:
"""Validate IP address format"""
try:
import ipaddress
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def validate_security_lake_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate a complete Security Lake record
Args:
record: Security Lake record dictionary
Returns:
Validation result with details
"""
result = {
"is_valid": True,
"errors": [],
"warnings": [],
"ocsf_compliance": {},
"suggestions": []
}
try:
# Basic OCSF validation
is_valid, errors = self.validate_event(record)
result["is_valid"] = is_valid
result["errors"] = errors
# Check OCSF compliance
compliance = self._check_ocsf_compliance(record)
result["ocsf_compliance"] = compliance
# Generate suggestions for improvement
suggestions = self._generate_improvement_suggestions(record, errors)
result["suggestions"] = suggestions
# Check for warnings (non-critical issues)
warnings = self._check_for_warnings(record)
result["warnings"] = warnings
except Exception as e:
result["is_valid"] = False
result["errors"].append(f"Validation failed with exception: {str(e)}")
logger.error("Record validation failed", error=str(e))
return result
def _check_ocsf_compliance(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""Check OCSF compliance level"""
total_fields = len(record)
standard_fields = 0
# Common OCSF fields to check for
ocsf_fields = [
"time", "type_name", "type_uid", "class_name", "class_uid",
"category_name", "category_uid", "activity_name", "activity_id",
"severity", "severity_id", "metadata", "cloud", "actor",
"src_endpoint", "dst_endpoint", "finding", "resources"
]
found_fields = []
for field in ocsf_fields:
if field in record:
standard_fields += 1
found_fields.append(field)
compliance_percentage = (standard_fields / len(ocsf_fields)) * 100 if ocsf_fields else 0
return {
"score": compliance_percentage,
"standard_fields_found": standard_fields,
"total_standard_fields": len(ocsf_fields),
"found_fields": found_fields,
"missing_standard_fields": [f for f in ocsf_fields if f not in record],
"compliance_level": self._get_compliance_level(compliance_percentage)
}
def _get_compliance_level(self, percentage: float) -> str:
"""Determine compliance level based on percentage"""
if percentage >= 80:
return "HIGH"
elif percentage >= 60:
return "MEDIUM"
elif percentage >= 40:
return "LOW"
else:
return "MINIMAL"
def _generate_improvement_suggestions(
self,
record: Dict[str, Any],
errors: List[str]
) -> List[str]:
"""Generate suggestions for improving OCSF compliance"""
suggestions = []
# Suggest missing critical fields
critical_fields = ["time", "type_name", "type_uid", "severity"]
for field in critical_fields:
if field not in record:
suggestions.append(f"Add required field '{field}' for better OCSF compliance")
# Suggest metadata improvements
metadata = record.get("metadata", {})
if not metadata.get("product", {}).get("vendor_name"):
suggestions.append("Add vendor_name to metadata.product for better identification")
if not metadata.get("profiles"):
suggestions.append("Consider adding OCSF profiles to metadata for enhanced compatibility")
# Suggest cloud context improvements
if "cloud" in record:
cloud = record["cloud"]
if not cloud.get("provider"):
suggestions.append("Add cloud provider information for better context")
# Suggest endpoint improvements
for endpoint_field in ["src_endpoint", "dst_endpoint"]:
if endpoint_field in record:
endpoint = record[endpoint_field]
if endpoint.get("ip") and not endpoint.get("hostname"):
suggestions.append(f"Consider adding hostname to {endpoint_field} for enrichment")
return suggestions
def _check_for_warnings(self, record: Dict[str, Any]) -> List[str]:
"""Check for non-critical issues that warrant warnings"""
warnings = []
# Check for deprecated fields or patterns
if "eventTime" in record:
warnings.append("Field 'eventTime' should be 'time' in OCSF")
# Check for unusual severity combinations
severity = record.get("severity")
severity_id = record.get("severity_id")
if severity and severity_id:
expected_id = self._get_expected_severity_id(severity)
if expected_id and expected_id != severity_id:
warnings.append(f"Severity '{severity}' doesn't match severity_id {severity_id}")
# Check for missing recommended fields
if "actor" not in record and "device" not in record:
warnings.append("Consider adding actor or device information for better context")
return warnings
def _get_expected_severity_id(self, severity: str) -> int:
"""Get expected severity ID for severity string"""
severity_mapping = {
"unknown": 99,
"informational": 1,
"low": 2,
"medium": 3,
"high": 4,
"critical": 5
}
return severity_mapping.get(severity.lower())