from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class SeverityLevel(str, Enum):
UNKNOWN = "Unknown"
INFORMATIONAL = "Informational"
LOW = "Low"
MEDIUM = "Medium"
HIGH = "High"
CRITICAL = "Critical"
class SeverityId(int, Enum):
UNKNOWN = 99
INFORMATIONAL = 1
LOW = 2
MEDIUM = 3
HIGH = 4
CRITICAL = 5
class ActivityId(int, Enum):
UNKNOWN = 0
CREATE = 1
READ = 2
UPDATE = 3
DELETE = 4
EXECUTE = 5
CONNECT = 6
DISCONNECT = 7
AUTHORIZE = 8
AUTHENTICATE = 9
class TypeUid(int, Enum):
# Network Activity
NETWORK_ACTIVITY = 4001
HTTP_ACTIVITY = 4002
DNS_ACTIVITY = 4003
DHCP_ACTIVITY = 4004
RDP_ACTIVITY = 4005
SMB_ACTIVITY = 4006
SSH_ACTIVITY = 4007
FTP_ACTIVITY = 4008
EMAIL_ACTIVITY = 4009
# Security Findings
DETECTION_FINDING = 2001
INCIDENT_FINDING = 2002
DATA_SECURITY_FINDING = 2003
MALWARE_FINDING = 2004
VULNERABILITY_FINDING = 2005
# Cloud API Activity
API_ACTIVITY = 3001
class Endpoint(BaseModel):
ip: Optional[str] = Field(None, description="IP address")
hostname: Optional[str] = Field(None, description="Hostname")
port: Optional[int] = Field(None, description="Port number")
mac: Optional[str] = Field(None, description="MAC address")
subnet_uid: Optional[str] = Field(None, description="Subnet identifier")
vpc_uid: Optional[str] = Field(None, description="VPC identifier")
instance_uid: Optional[str] = Field(None, description="Instance identifier")
interface_uid: Optional[str] = Field(None, description="Interface identifier")
class Device(BaseModel):
name: Optional[str] = Field(None, description="Device name")
type: Optional[str] = Field(None, description="Device type")
uid: Optional[str] = Field(None, description="Device unique identifier")
hostname: Optional[str] = Field(None, description="Device hostname")
ip: Optional[str] = Field(None, description="Device IP address")
mac: Optional[str] = Field(None, description="Device MAC address")
os: Optional[Dict[str, Any]] = Field(None, description="Operating system information")
class Actor(BaseModel):
user: Optional[Dict[str, Any]] = Field(None, description="User information")
session: Optional[Dict[str, Any]] = Field(None, description="Session information")
process: Optional[Dict[str, Any]] = Field(None, description="Process information")
app: Optional[Dict[str, Any]] = Field(None, description="Application information")
class Cloud(BaseModel):
account: Optional[Dict[str, str]] = Field(None, description="Cloud account information")
provider: Optional[str] = Field(None, description="Cloud provider")
region: Optional[str] = Field(None, description="Cloud region")
zone: Optional[str] = Field(None, description="Availability zone")
project_uid: Optional[str] = Field(None, description="Project identifier")
class Metadata(BaseModel):
version: str = Field(..., description="OCSF version")
product: Dict[str, Any] = Field(..., description="Product information")
profiles: Optional[List[str]] = Field(None, description="OCSF profiles")
event_code: Optional[str] = Field(None, description="Product-specific event code")
correlation_uid: Optional[str] = Field(None, description="Event correlation ID")
original_time: Optional[str] = Field(None, description="Original event timestamp")
class Finding(BaseModel):
uid: str = Field(..., description="Finding unique identifier")
title: Optional[str] = Field(None, description="Finding title")
desc: Optional[str] = Field(None, description="Finding description")
types: Optional[List[str]] = Field(None, description="Finding types/categories")
src_url: Optional[str] = Field(None, description="Source URL for more information")
supporting_data: Optional[Dict[str, Any]] = Field(None, description="Additional supporting data")
class Resource(BaseModel):
uid: str = Field(..., description="Resource unique identifier")
name: Optional[str] = Field(None, description="Resource name")
type: Optional[str] = Field(None, description="Resource type")
region: Optional[str] = Field(None, description="Resource region")
account_uid: Optional[str] = Field(None, description="Account identifier")
labels: Optional[List[str]] = Field(None, description="Resource labels/tags")
data: Optional[Dict[str, Any]] = Field(None, description="Resource-specific data")
class BaseEvent(BaseModel):
"""Base OCSF event class"""
time: str = Field(..., description="Event timestamp")
type_name: str = Field(..., description="Event type name")
type_uid: int = Field(..., description="Event type identifier")
category_name: Optional[str] = Field(None, description="Event category name")
category_uid: Optional[int] = Field(None, description="Event category identifier")
class_name: str = Field(..., description="Event class name")
class_uid: int = Field(..., description="Event class identifier")
activity_name: Optional[str] = Field(None, description="Activity name")
activity_id: Optional[int] = Field(None, description="Activity identifier")
severity: Optional[str] = Field(None, description="Event severity")
severity_id: Optional[int] = Field(None, description="Severity identifier")
message: Optional[str] = Field(None, description="Event message")
metadata: Metadata = Field(..., description="Event metadata")
cloud: Optional[Cloud] = Field(None, description="Cloud context")
actor: Optional[Actor] = Field(None, description="Actor information")
device: Optional[Device] = Field(None, description="Device information")
class NetworkActivityEvent(BaseEvent):
"""OCSF Network Activity event"""
src_endpoint: Optional[Endpoint] = Field(None, description="Source endpoint")
dst_endpoint: Optional[Endpoint] = Field(None, description="Destination endpoint")
connection_info: Optional[Dict[str, Any]] = Field(None, description="Connection details")
traffic: Optional[Dict[str, Any]] = Field(None, description="Traffic information")
protocol_name: Optional[str] = Field(None, description="Protocol name")
protocol_ver: Optional[str] = Field(None, description="Protocol version")
class SecurityFindingEvent(BaseEvent):
"""OCSF Security Finding event"""
finding: Finding = Field(..., description="Finding details")
resources: Optional[List[Resource]] = Field(None, description="Affected resources")
remediation: Optional[Dict[str, Any]] = Field(None, description="Remediation information")
malware: Optional[List[Dict[str, Any]]] = Field(None, description="Malware information")
vulnerabilities: Optional[List[Dict[str, Any]]] = Field(None, description="Vulnerability details")
compliance: Optional[Dict[str, Any]] = Field(None, description="Compliance information")
class APIActivityEvent(BaseEvent):
"""OCSF API Activity event"""
api: Dict[str, Any] = Field(..., description="API information")
http_request: Optional[Dict[str, Any]] = Field(None, description="HTTP request details")
http_response: Optional[Dict[str, Any]] = Field(None, description="HTTP response details")
src_endpoint: Optional[Endpoint] = Field(None, description="Source endpoint")
user_agent: Optional[str] = Field(None, description="User agent string")
session: Optional[Dict[str, Any]] = Field(None, description="Session information")
class OCSFEventFactory:
"""Factory class for creating OCSF events from raw data"""
@staticmethod
def create_event(raw_data: Dict[str, Any]) -> BaseEvent:
"""Create an OCSF event from raw data"""
type_uid = raw_data.get("type_uid")
if type_uid in [4001, 4002, 4003, 4004, 4005, 4006, 4007, 4008, 4009]:
return NetworkActivityEvent(**raw_data)
elif type_uid in [2001, 2002, 2003, 2004, 2005]:
return SecurityFindingEvent(**raw_data)
elif type_uid == 3001:
return APIActivityEvent(**raw_data)
else:
return BaseEvent(**raw_data)
@staticmethod
def validate_event(event_data: Dict[str, Any]) -> bool:
"""Validate if event data conforms to OCSF schema"""
required_fields = ["time", "type_name", "type_uid", "class_name", "class_uid", "metadata"]
for field in required_fields:
if field not in event_data:
return False
# Validate metadata structure
metadata = event_data.get("metadata", {})
if not isinstance(metadata, dict) or "version" not in metadata or "product" not in metadata:
return False
return True
# Common OCSF field mappings for different data sources
class OCSFFieldMappings:
"""Common field mappings for different Security Lake data sources"""
GUARDDUTY = {
"finding_id": "finding.uid",
"finding_title": "finding.title",
"finding_description": "finding.desc",
"finding_type": "finding.types",
"severity": "severity",
"service_name": "metadata.product.name",
"account_id": "cloud.account.uid",
"region": "cloud.region"
}
CLOUDTRAIL = {
"event_name": "api.operation",
"event_source": "api.service.name",
"user_name": "actor.user.name",
"source_ip": "src_endpoint.ip",
"user_agent": "http_request.user_agent",
"aws_region": "cloud.region",
"account_id": "cloud.account.uid"
}
VPC_FLOW = {
"srcaddr": "src_endpoint.ip",
"dstaddr": "dst_endpoint.ip",
"srcport": "src_endpoint.port",
"dstport": "dst_endpoint.port",
"protocol": "connection_info.protocol_num",
"bytes": "traffic.bytes",
"packets": "traffic.packets"
}