Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
schemas.py9.97 kB
from enum import Enum from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field class SeverityLevel(str, Enum): UNKNOWN = "Unknown" INFORMATIONAL = "Informational" LOW = "Low" MEDIUM = "Medium" HIGH = "High" CRITICAL = "Critical" class SeverityId(int, Enum): UNKNOWN = 99 INFORMATIONAL = 1 LOW = 2 MEDIUM = 3 HIGH = 4 CRITICAL = 5 class ActivityId(int, Enum): UNKNOWN = 0 CREATE = 1 READ = 2 UPDATE = 3 DELETE = 4 EXECUTE = 5 CONNECT = 6 DISCONNECT = 7 AUTHORIZE = 8 AUTHENTICATE = 9 class TypeUid(int, Enum): # Network Activity NETWORK_ACTIVITY = 4001 HTTP_ACTIVITY = 4002 DNS_ACTIVITY = 4003 DHCP_ACTIVITY = 4004 RDP_ACTIVITY = 4005 SMB_ACTIVITY = 4006 SSH_ACTIVITY = 4007 FTP_ACTIVITY = 4008 EMAIL_ACTIVITY = 4009 # Security Findings DETECTION_FINDING = 2001 INCIDENT_FINDING = 2002 DATA_SECURITY_FINDING = 2003 MALWARE_FINDING = 2004 VULNERABILITY_FINDING = 2005 # Cloud API Activity API_ACTIVITY = 3001 class Endpoint(BaseModel): ip: Optional[str] = Field(None, description="IP address") hostname: Optional[str] = Field(None, description="Hostname") port: Optional[int] = Field(None, description="Port number") mac: Optional[str] = Field(None, description="MAC address") subnet_uid: Optional[str] = Field(None, description="Subnet identifier") vpc_uid: Optional[str] = Field(None, description="VPC identifier") instance_uid: Optional[str] = Field(None, description="Instance identifier") interface_uid: Optional[str] = Field(None, description="Interface identifier") class Device(BaseModel): name: Optional[str] = Field(None, description="Device name") type: Optional[str] = Field(None, description="Device type") uid: Optional[str] = Field(None, description="Device unique identifier") hostname: Optional[str] = Field(None, description="Device hostname") ip: Optional[str] = Field(None, description="Device IP address") mac: Optional[str] = Field(None, description="Device MAC address") os: Optional[Dict[str, Any]] = Field(None, description="Operating system information") class Actor(BaseModel): user: Optional[Dict[str, Any]] = Field(None, description="User information") session: Optional[Dict[str, Any]] = Field(None, description="Session information") process: Optional[Dict[str, Any]] = Field(None, description="Process information") app: Optional[Dict[str, Any]] = Field(None, description="Application information") class Cloud(BaseModel): account: Optional[Dict[str, str]] = Field(None, description="Cloud account information") provider: Optional[str] = Field(None, description="Cloud provider") region: Optional[str] = Field(None, description="Cloud region") zone: Optional[str] = Field(None, description="Availability zone") project_uid: Optional[str] = Field(None, description="Project identifier") class Metadata(BaseModel): version: str = Field(..., description="OCSF version") product: Dict[str, Any] = Field(..., description="Product information") profiles: Optional[List[str]] = Field(None, description="OCSF profiles") event_code: Optional[str] = Field(None, description="Product-specific event code") correlation_uid: Optional[str] = Field(None, description="Event correlation ID") original_time: Optional[str] = Field(None, description="Original event timestamp") class Finding(BaseModel): uid: str = Field(..., description="Finding unique identifier") title: Optional[str] = Field(None, description="Finding title") desc: Optional[str] = Field(None, description="Finding description") types: Optional[List[str]] = Field(None, description="Finding types/categories") src_url: Optional[str] = Field(None, description="Source URL for more information") supporting_data: Optional[Dict[str, Any]] = Field(None, description="Additional supporting data") class Resource(BaseModel): uid: str = Field(..., description="Resource unique identifier") name: Optional[str] = Field(None, description="Resource name") type: Optional[str] = Field(None, description="Resource type") region: Optional[str] = Field(None, description="Resource region") account_uid: Optional[str] = Field(None, description="Account identifier") labels: Optional[List[str]] = Field(None, description="Resource labels/tags") data: Optional[Dict[str, Any]] = Field(None, description="Resource-specific data") class BaseEvent(BaseModel): """Base OCSF event class""" time: str = Field(..., description="Event timestamp") type_name: str = Field(..., description="Event type name") type_uid: int = Field(..., description="Event type identifier") category_name: Optional[str] = Field(None, description="Event category name") category_uid: Optional[int] = Field(None, description="Event category identifier") class_name: str = Field(..., description="Event class name") class_uid: int = Field(..., description="Event class identifier") activity_name: Optional[str] = Field(None, description="Activity name") activity_id: Optional[int] = Field(None, description="Activity identifier") severity: Optional[str] = Field(None, description="Event severity") severity_id: Optional[int] = Field(None, description="Severity identifier") message: Optional[str] = Field(None, description="Event message") metadata: Metadata = Field(..., description="Event metadata") cloud: Optional[Cloud] = Field(None, description="Cloud context") actor: Optional[Actor] = Field(None, description="Actor information") device: Optional[Device] = Field(None, description="Device information") class NetworkActivityEvent(BaseEvent): """OCSF Network Activity event""" src_endpoint: Optional[Endpoint] = Field(None, description="Source endpoint") dst_endpoint: Optional[Endpoint] = Field(None, description="Destination endpoint") connection_info: Optional[Dict[str, Any]] = Field(None, description="Connection details") traffic: Optional[Dict[str, Any]] = Field(None, description="Traffic information") protocol_name: Optional[str] = Field(None, description="Protocol name") protocol_ver: Optional[str] = Field(None, description="Protocol version") class SecurityFindingEvent(BaseEvent): """OCSF Security Finding event""" finding: Finding = Field(..., description="Finding details") resources: Optional[List[Resource]] = Field(None, description="Affected resources") remediation: Optional[Dict[str, Any]] = Field(None, description="Remediation information") malware: Optional[List[Dict[str, Any]]] = Field(None, description="Malware information") vulnerabilities: Optional[List[Dict[str, Any]]] = Field(None, description="Vulnerability details") compliance: Optional[Dict[str, Any]] = Field(None, description="Compliance information") class APIActivityEvent(BaseEvent): """OCSF API Activity event""" api: Dict[str, Any] = Field(..., description="API information") http_request: Optional[Dict[str, Any]] = Field(None, description="HTTP request details") http_response: Optional[Dict[str, Any]] = Field(None, description="HTTP response details") src_endpoint: Optional[Endpoint] = Field(None, description="Source endpoint") user_agent: Optional[str] = Field(None, description="User agent string") session: Optional[Dict[str, Any]] = Field(None, description="Session information") class OCSFEventFactory: """Factory class for creating OCSF events from raw data""" @staticmethod def create_event(raw_data: Dict[str, Any]) -> BaseEvent: """Create an OCSF event from raw data""" type_uid = raw_data.get("type_uid") if type_uid in [4001, 4002, 4003, 4004, 4005, 4006, 4007, 4008, 4009]: return NetworkActivityEvent(**raw_data) elif type_uid in [2001, 2002, 2003, 2004, 2005]: return SecurityFindingEvent(**raw_data) elif type_uid == 3001: return APIActivityEvent(**raw_data) else: return BaseEvent(**raw_data) @staticmethod def validate_event(event_data: Dict[str, Any]) -> bool: """Validate if event data conforms to OCSF schema""" required_fields = ["time", "type_name", "type_uid", "class_name", "class_uid", "metadata"] for field in required_fields: if field not in event_data: return False # Validate metadata structure metadata = event_data.get("metadata", {}) if not isinstance(metadata, dict) or "version" not in metadata or "product" not in metadata: return False return True # Common OCSF field mappings for different data sources class OCSFFieldMappings: """Common field mappings for different Security Lake data sources""" GUARDDUTY = { "finding_id": "finding.uid", "finding_title": "finding.title", "finding_description": "finding.desc", "finding_type": "finding.types", "severity": "severity", "service_name": "metadata.product.name", "account_id": "cloud.account.uid", "region": "cloud.region" } CLOUDTRAIL = { "event_name": "api.operation", "event_source": "api.service.name", "user_name": "actor.user.name", "source_ip": "src_endpoint.ip", "user_agent": "http_request.user_agent", "aws_region": "cloud.region", "account_id": "cloud.account.uid" } VPC_FLOW = { "srcaddr": "src_endpoint.ip", "dstaddr": "dst_endpoint.ip", "srcport": "src_endpoint.port", "dstport": "dst_endpoint.port", "protocol": "connection_info.protocol_num", "bytes": "traffic.bytes", "packets": "traffic.packets" }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server