"""
Security Context and Cryptographic Types - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: Security Boundary pattern with defense-in-depth
- Security Model: Multi-layer validation with cryptographic integrity
- Performance Profile: O(1) security checks with cached validation
Technical Decisions:
- Cryptographic Keys: Ed25519 for signing, AES-256-GCM for encryption
- Key Derivation: PBKDF2 with high iteration count for key stretching
- Audit Trail: Cryptographically signed logs with tamper detection
- Isolation: Process and filesystem boundaries with strict validation
Dependencies & Integration:
- External: cryptography library for secure primitives
- Internal: Foundation for all security-sensitive operations
Quality Assurance:
- Test Coverage: Property-based testing for all cryptographic operations
- Error Handling: Secure failure modes with no information leakage
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import hashlib
import os
import secrets
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union
# Define ValidationError locally to avoid circular import
class ValidationError(Exception):
"""Exception for validation failures."""
pass
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
CRYPTOGRAPHY_AVAILABLE = False
# ============================================================================
# CRYPTOGRAPHIC TYPE SAFETY - Branded Types for Security
# ============================================================================
from typing import NewType
# Branded types for cryptographic primitives
EncryptionKey = NewType("EncryptionKey", bytes)
SigningKey = NewType("SigningKey", bytes)
VerificationKey = NewType("VerificationKey", bytes)
Salt = NewType("Salt", bytes)
Nonce = NewType("Nonce", bytes)
Signature = NewType("Signature", bytes)
EncryptedData = NewType("EncryptedData", bytes)
KeyFingerprint = NewType("KeyFingerprint", str)
# Additional crypto types for test compatibility
AESKey = NewType("AESKey", bytes)
ECDSAPrivateKey = NewType("ECDSAPrivateKey", bytes)
ECDSAPublicKey = NewType("ECDSAPublicKey", bytes)
JWTToken = NewType("JWTToken", str)
SignatureBytes = NewType("SignatureBytes", bytes)
CertificateBytes = NewType("CertificateBytes", bytes)
# ============================================================================
# SECURITY LEVEL ENUMERATION - Escalating Security Policies
# ============================================================================
class SecurityLevel(Enum):
"""
Security levels with escalating protection and validation.
"""
LOW = "low" # Basic validation, minimal isolation
MEDIUM = "medium" # Standard validation, filesystem boundaries
HIGH = "high" # Enhanced validation, strict isolation
MAXIMUM = "maximum" # Paranoid validation, full sandboxing
INTERNAL = "internal" # Internal operations with elevated privileges
CONFIDENTIAL = "confidential" # Confidential data handling
SECRET = "secret" # Secret data handling
class Permission(Enum):
"""
Permission levels for security operations.
"""
READ = "read"
WRITE = "write"
EXECUTE = "execute"
DELETE = "delete"
ADMIN = "admin"
# Specific operation permissions
CREATE_AGENT = "create_agent"
DELETE_AGENT = "delete_agent"
CREATE_SESSION = "create_session"
MESSAGE_AGENT = "message_agent"
READ_AGENT_STATUS = "read_agent_status"
class CryptographicStrength(Enum):
"""
Cryptographic strength levels for different security requirements.
"""
WEAK = "weak" # For testing only - DO NOT USE IN PRODUCTION
STANDARD = "standard" # Standard enterprise security
HIGH = "high" # High-security applications
MAXIMUM = "maximum" # Maximum security for classified data
# ============================================================================
# CRYPTOGRAPHIC EXCEPTIONS - Secure Error Handling
# ============================================================================
class SecurityError(Exception):
"""Base exception for security-related errors."""
def __init__(self, message: str, error_code: str = "SECURITY_ERROR"):
self.error_code = error_code
super().__init__(f"[{error_code}] {message}")
class CryptographicError(SecurityError):
"""Exception for cryptographic operation failures."""
def __init__(self, message: str):
super().__init__(message, "CRYPTO_ERROR")
class ValidationError(SecurityError):
"""Exception for validation failures."""
def __init__(self, message: str):
super().__init__(message, "VALIDATION_ERROR")
class BoundaryViolationError(SecurityError):
"""Exception for security boundary violations."""
def __init__(self, message: str):
super().__init__(message, "BOUNDARY_VIOLATION")
# ============================================================================
# CRYPTOGRAPHIC KEY MANAGEMENT - Type-Safe Key Operations
# ============================================================================
@dataclass(frozen=True)
class CryptographicKeys:
"""
Immutable cryptographic key container with type safety.
Contracts:
Invariants:
- All keys are properly generated and validated
- Key fingerprints are unique and deterministic
- Private keys never leave secure boundaries
Security Implementation:
- Key Generation: Cryptographically secure random generation
- Key Storage: In-memory only, never persisted unencrypted
- Key Validation: Format and strength validation on creation
- Key Rotation: Immutable design forces explicit rotation
"""
encryption_key: EncryptionKey
signing_private_key: SigningKey
verification_public_key: VerificationKey
key_salt: Salt
created_at: datetime = field(default_factory=datetime.now)
strength: CryptographicStrength = CryptographicStrength.STANDARD
def __post_init__(self):
"""Validate cryptographic keys on creation."""
if not CRYPTOGRAPHY_AVAILABLE:
raise CryptographicError("Cryptography library not available")
# Validate key lengths based on algorithms used
if len(self.encryption_key) != 32: # AES-256 requires 32 bytes
raise CryptographicError(
f"Invalid encryption key length: {len(self.encryption_key)}"
)
if len(self.signing_private_key) != 32: # Ed25519 private key is 32 bytes
raise CryptographicError(
f"Invalid signing key length: {len(self.signing_private_key)}"
)
if len(self.verification_public_key) != 32: # Ed25519 public key is 32 bytes
raise CryptographicError(
f"Invalid verification key length: {len(self.verification_public_key)}"
)
if len(self.key_salt) < 16: # Minimum 16 bytes for salt
raise CryptographicError(f"Salt too short: {len(self.key_salt)} bytes")
def get_encryption_fingerprint(self) -> KeyFingerprint:
"""
Generate fingerprint for encryption key identification.
Returns:
KeyFingerprint: SHA-256 hash of encryption key
"""
key_hash = hashlib.sha256(self.encryption_key).hexdigest()
return KeyFingerprint(f"enc_{key_hash[:16]}")
def get_signing_fingerprint(self) -> KeyFingerprint:
"""
Generate fingerprint for signing key identification.
Returns:
KeyFingerprint: SHA-256 hash of verification key (public part)
"""
key_hash = hashlib.sha256(self.verification_public_key).hexdigest()
return KeyFingerprint(f"sig_{key_hash[:16]}")
def is_expired(self, max_age_hours: int = 24) -> bool:
"""
Check if keys are expired and need rotation.
Args:
max_age_hours: Maximum age in hours before rotation needed
Returns:
bool: True if keys are expired
"""
age = datetime.now() - self.created_at
return age > timedelta(hours=max_age_hours)
def generate_cryptographic_keys(
strength: CryptographicStrength = CryptographicStrength.STANDARD,
) -> CryptographicKeys:
"""
Generate new cryptographic keys with specified strength.
Contracts:
Preconditions:
- Cryptography library is available
- System has sufficient entropy for secure key generation
Postconditions:
- Returns valid CryptographicKeys with all required keys
- Keys are cryptographically secure and properly formatted
Invariants:
- Generated keys are unique for each call
- Key strength matches requested level
Security Implementation:
- Entropy Source: Uses OS-provided cryptographically secure random
- Key Algorithms: Ed25519 for signing, AES-256-GCM for encryption
- Salt Generation: Unique salt for each key set
- Validation: All generated keys validated before return
Args:
strength: Cryptographic strength level
Returns:
CryptographicKeys: New cryptographic key set
Raises:
CryptographicError: If key generation fails
"""
if not CRYPTOGRAPHY_AVAILABLE:
raise CryptographicError("Cryptography library not available")
try:
# Generate encryption key (AES-256)
encryption_key = EncryptionKey(secrets.token_bytes(32))
# Generate signing key pair (Ed25519)
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# Extract raw key material
signing_private_key = SigningKey(
private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
)
verification_public_key = VerificationKey(
public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
)
# Generate salt with length based on strength
salt_length = {
CryptographicStrength.WEAK: 16, # Testing only
CryptographicStrength.STANDARD: 32, # Standard
CryptographicStrength.HIGH: 48, # High security
CryptographicStrength.MAXIMUM: 64, # Maximum security
}[strength]
key_salt = Salt(secrets.token_bytes(salt_length))
return CryptographicKeys(
encryption_key=encryption_key,
signing_private_key=signing_private_key,
verification_public_key=verification_public_key,
key_salt=key_salt,
strength=strength,
)
except Exception as e:
raise CryptographicError(f"Key generation failed: {e}")
# ============================================================================
# SECURE OPERATIONS - Cryptographic Primitives
# ============================================================================
@dataclass(frozen=True)
class EncryptionResult:
"""Immutable result of encryption operation."""
ciphertext: EncryptedData
nonce: Nonce
tag: bytes # Authentication tag for AEAD
key_fingerprint: KeyFingerprint
def __post_init__(self):
"""Validate encryption result."""
if len(self.nonce) != 12: # GCM nonce is 12 bytes
raise CryptographicError(f"Invalid nonce length: {len(self.nonce)}")
if len(self.tag) != 16: # GCM tag is 16 bytes
raise CryptographicError(f"Invalid tag length: {len(self.tag)}")
@dataclass(frozen=True)
class SignatureResult:
"""Immutable result of signing operation."""
signature: Signature
data_hash: str # SHA-256 hash of signed data
key_fingerprint: KeyFingerprint
timestamp: datetime = field(default_factory=datetime.now)
def __post_init__(self):
"""Validate signature result."""
if len(self.signature) != 64: # Ed25519 signature is 64 bytes
raise CryptographicError(f"Invalid signature length: {len(self.signature)}")
if len(self.data_hash) != 64: # SHA-256 hex digest is 64 characters
raise CryptographicError(f"Invalid hash length: {len(self.data_hash)}")
def encrypt_data(data: bytes, keys: CryptographicKeys) -> EncryptionResult:
"""
Encrypt data using AES-256-GCM with authenticated encryption.
Contracts:
Preconditions:
- data is not empty
- keys contains valid encryption key
Postconditions:
- Returns EncryptionResult with ciphertext and authentication
- Original data cannot be recovered without keys
Invariants:
- Same data with different nonces produces different ciphertext
- Decryption with correct keys always recovers original data
Security Implementation:
- Algorithm: AES-256-GCM for authenticated encryption
- Nonce: Cryptographically random, never reused
- Authentication: Built-in AEAD provides integrity protection
- Key Safety: Keys never logged or exposed in errors
Args:
data: Plain data to encrypt
keys: Cryptographic keys for encryption
Returns:
EncryptionResult: Encrypted data with metadata
Raises:
CryptographicError: If encryption fails
"""
if not CRYPTOGRAPHY_AVAILABLE:
raise CryptographicError("Cryptography library not available")
if not data:
raise CryptographicError("Cannot encrypt empty data")
try:
# Generate random nonce
nonce = Nonce(secrets.token_bytes(12))
# Create cipher
cipher = Cipher(
algorithms.AES(keys.encryption_key),
modes.GCM(nonce),
backend=default_backend(),
)
# Encrypt data
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
return EncryptionResult(
ciphertext=EncryptedData(ciphertext),
nonce=nonce,
tag=encryptor.tag,
key_fingerprint=keys.get_encryption_fingerprint(),
)
except Exception as e:
raise CryptographicError(f"Encryption failed: {type(e).__name__}")
def decrypt_data(encryption_result: EncryptionResult, keys: CryptographicKeys) -> bytes:
"""
Decrypt data using AES-256-GCM with authentication verification.
Contracts:
Preconditions:
- encryption_result contains valid ciphertext and metadata
- keys contains correct encryption key
Postconditions:
- Returns original plaintext data
- Authentication is verified before return
Invariants:
- Decryption of valid ciphertext always succeeds with correct key
- Tampered ciphertext always fails authentication
Args:
encryption_result: Encrypted data with metadata
keys: Cryptographic keys for decryption
Returns:
bytes: Decrypted plain data
Raises:
CryptographicError: If decryption or authentication fails
"""
if not CRYPTOGRAPHY_AVAILABLE:
raise CryptographicError("Cryptography library not available")
# Verify key fingerprint
if encryption_result.key_fingerprint != keys.get_encryption_fingerprint():
raise CryptographicError("Key fingerprint mismatch")
try:
# Create cipher
cipher = Cipher(
algorithms.AES(keys.encryption_key),
modes.GCM(encryption_result.nonce, encryption_result.tag),
backend=default_backend(),
)
# Decrypt data
decryptor = cipher.decryptor()
plaintext = (
decryptor.update(encryption_result.ciphertext) + decryptor.finalize()
)
return plaintext
except Exception as e:
raise CryptographicError(f"Decryption failed: {type(e).__name__}")
def sign_data(data: bytes, keys: CryptographicKeys) -> SignatureResult:
"""
Sign data using Ed25519 digital signature.
Contracts:
Preconditions:
- data is not empty
- keys contains valid signing key
Postconditions:
- Returns SignatureResult with signature and metadata
- Signature can be verified with corresponding public key
Invariants:
- Same data always produces same signature with same key
- Signature verification succeeds only with correct public key
Args:
data: Data to sign
keys: Cryptographic keys for signing
Returns:
SignatureResult: Digital signature with metadata
Raises:
CryptographicError: If signing fails
"""
if not CRYPTOGRAPHY_AVAILABLE:
raise CryptographicError("Cryptography library not available")
if not data:
raise CryptographicError("Cannot sign empty data")
try:
# Reconstruct private key from raw bytes
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(
keys.signing_private_key
)
# Sign data
signature = private_key.sign(data)
# Calculate data hash for verification
data_hash = hashlib.sha256(data).hexdigest()
return SignatureResult(
signature=Signature(signature),
data_hash=data_hash,
key_fingerprint=keys.get_signing_fingerprint(),
)
except Exception as e:
raise CryptographicError(f"Signing failed: {type(e).__name__}")
def verify_signature(
data: bytes, signature_result: SignatureResult, keys: CryptographicKeys
) -> bool:
"""
Verify digital signature using Ed25519.
Contracts:
Preconditions:
- data is not empty
- signature_result contains valid signature
- keys contains corresponding verification key
Postconditions:
- Returns True if signature is valid, False otherwise
- No exceptions for invalid signatures (returns False)
Invariants:
- Valid signatures always verify successfully
- Invalid or tampered signatures always fail verification
Args:
data: Original data that was signed
signature_result: Signature result to verify
keys: Cryptographic keys for verification
Returns:
bool: True if signature is valid, False otherwise
"""
if not CRYPTOGRAPHY_AVAILABLE:
return False
if not data:
return False
try:
# Verify key fingerprint
if signature_result.key_fingerprint != keys.get_signing_fingerprint():
return False
# Verify data hash
data_hash = hashlib.sha256(data).hexdigest()
if data_hash != signature_result.data_hash:
return False
# Reconstruct public key from raw bytes
public_key = ed25519.Ed25519PublicKey.from_public_bytes(
keys.verification_public_key
)
# Verify signature (raises exception if invalid)
public_key.verify(signature_result.signature, data)
return True
except Exception:
return False # Any exception means invalid signature
# ============================================================================
# SECURITY CONTEXT - Complete Security Environment
# ============================================================================
@dataclass(frozen=True)
class SecurityBoundary:
"""
Immutable security boundary definition with validation rules.
Contracts:
Invariants:
- All paths are absolute and accessible
- Boundary rules are consistent and non-contradictory
- Resource limits are positive and realistic
"""
allowed_paths: Set[Path]
forbidden_paths: Set[Path] = field(default_factory=set)
allowed_executables: Set[str] = field(
default_factory=lambda: {"claude", "python", "git"}
)
forbidden_executables: Set[str] = field(
default_factory=lambda: {"rm", "dd", "mkfs", "format"}
)
max_file_size_mb: int = 100
max_total_files: int = 10000
max_memory_mb: int = 512
max_cpu_percent: float = 25.0
network_access: bool = False
shell_access: bool = False
def __post_init__(self):
"""Validate security boundary configuration."""
# Path validation
for path in self.allowed_paths:
if not path.is_absolute():
raise ValidationError(f"Allowed path must be absolute: {path}")
for path in self.forbidden_paths:
if not path.is_absolute():
raise ValidationError(f"Forbidden path must be absolute: {path}")
# Check for path conflicts
for allowed in self.allowed_paths:
for forbidden in self.forbidden_paths:
try:
# Check if forbidden path is within allowed path
forbidden.resolve().relative_to(allowed.resolve())
raise ValidationError(
f"Forbidden path {forbidden} within allowed path {allowed}"
)
except ValueError:
continue # No conflict
# Executable validation
dangerous_overlap = self.allowed_executables & self.forbidden_executables
if dangerous_overlap:
raise ValidationError(
f"Executables cannot be both allowed and forbidden: {dangerous_overlap}"
)
# Resource limit validation
if self.max_file_size_mb <= 0 or self.max_file_size_mb > 1024:
raise ValidationError(
f"Max file size {self.max_file_size_mb} must be in range (0, 1024]"
)
if self.max_total_files <= 0 or self.max_total_files > 100000:
raise ValidationError(
f"Max total files {self.max_total_files} must be in range (0, 100000]"
)
if self.max_memory_mb <= 0 or self.max_memory_mb > 4096:
raise ValidationError(
f"Max memory {self.max_memory_mb} must be in range (0, 4096]"
)
if self.max_cpu_percent <= 0 or self.max_cpu_percent > 100:
raise ValidationError(
f"Max CPU {self.max_cpu_percent} must be in range (0, 100]"
)
def is_path_allowed(self, path: Path) -> bool:
"""
Check if path access is allowed within security boundary.
Args:
path: Path to validate
Returns:
bool: True if path access is allowed
"""
try:
resolved_path = path.resolve()
# Check forbidden paths first
for forbidden in self.forbidden_paths:
try:
resolved_path.relative_to(forbidden.resolve())
return False # Path is within forbidden area
except ValueError:
continue
# Check allowed paths
for allowed in self.allowed_paths:
try:
resolved_path.relative_to(allowed.resolve())
return True # Path is within allowed area
except ValueError:
continue
return False # Path not in any allowed area
except (OSError, ValueError):
return False # Invalid path
def is_executable_allowed(self, executable: str) -> bool:
"""
Check if executable is allowed within security boundary.
Args:
executable: Executable name to check
Returns:
bool: True if executable is allowed
"""
exe_name = Path(executable).name.lower()
# Check forbidden list first
if exe_name in {exe.lower() for exe in self.forbidden_executables}:
return False
# Check allowed list
return exe_name in {exe.lower() for exe in self.allowed_executables}
@dataclass(frozen=True)
class AuditEntry:
"""
Immutable audit log entry with cryptographic integrity.
Contracts:
Invariants:
- Timestamp is accurate and monotonic within audit log
- Event data is complete and validated
- Signature provides tamper detection
"""
timestamp: datetime
event_type: str
event_data: Dict[str, Any]
user_context: Optional[str] = None
signature: Optional[SignatureResult] = None
def __post_init__(self):
"""Validate audit entry structure."""
if not self.event_type or not self.event_type.strip():
raise ValidationError("Event type cannot be empty")
if not self.event_data:
raise ValidationError("Event data cannot be empty")
# Validate event type format
valid_event_types = {
"AGENT_CREATED",
"AGENT_DELETED",
"AGENT_STATUS_CHANGED",
"SESSION_CREATED",
"SESSION_DELETED",
"SESSION_STATUS_CHANGED",
"MESSAGE_SENT",
"MESSAGE_RECEIVED",
"FILE_ACCESS",
"SECURITY_VIOLATION",
"AUTHENTICATION_ATTEMPT",
"PROCESS_STARTED",
"PROCESS_TERMINATED",
"ERROR_OCCURRED",
}
if self.event_type not in valid_event_types:
raise ValidationError(f"Invalid event type: {self.event_type}")
def get_event_summary(self) -> str:
"""
Get human-readable event summary.
Returns:
str: Event summary for logging/display
"""
summary_parts = [f"[{self.timestamp.isoformat()}]", f"{self.event_type}:"]
# Add key event data
if "agent_name" in self.event_data:
summary_parts.append(f"Agent={self.event_data['agent_name']}")
if "session_id" in self.event_data:
summary_parts.append(f"Session={self.event_data['session_id'][:8]}...")
if "error_message" in self.event_data:
summary_parts.append(f"Error={self.event_data['error_message'][:50]}...")
return " ".join(summary_parts)
def is_signed(self) -> bool:
"""Check if audit entry has cryptographic signature."""
return self.signature is not None
@dataclass(frozen=True)
class SecurityContext:
"""
Complete security context with cryptographic protection and audit trail.
Architecture:
- Pattern: Security Boundary pattern with comprehensive validation
- Security: Multi-layer defense with cryptographic integrity
- Performance: Cached validation with O(1) security checks
- Integration: Central security enforcement for all operations
Contracts:
Preconditions:
- security_level is valid enumeration value
- cryptographic_keys are properly generated and validated
- security_boundary contains consistent rules
Postconditions:
- All security checks are deterministic and consistent
- Audit trail maintains cryptographic integrity
- Security violations are properly detected and logged
Invariants:
- Security level never decreases during session
- Cryptographic keys remain consistent throughout session
- Audit trail is append-only with signature verification
Security Implementation:
- Cryptographic Protection: All sensitive data encrypted at rest
- Digital Signatures: Audit trail integrity with tamper detection
- Boundary Enforcement: Strict filesystem and process isolation
- Violation Detection: Real-time monitoring with automated response
"""
security_level: SecurityLevel
cryptographic_keys: CryptographicKeys
security_boundary: SecurityBoundary
audit_trail: List[AuditEntry] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
max_audit_entries: int = 10000
def __post_init__(self):
"""Validate security context configuration."""
# Audit trail validation
if len(self.audit_trail) > self.max_audit_entries:
raise ValidationError(
f"Audit trail exceeds maximum {self.max_audit_entries} entries"
)
# Validate audit trail ordering
for i in range(1, len(self.audit_trail)):
if self.audit_trail[i].timestamp < self.audit_trail[i - 1].timestamp:
raise ValidationError(
"Audit trail entries must be chronologically ordered"
)
# Validate maximum audit entries
if self.max_audit_entries <= 0 or self.max_audit_entries > 100000:
raise ValidationError("Max audit entries must be in range (0, 100000]")
def add_audit_entry(
self,
event_type: str,
event_data: Dict[str, Any],
user_context: Optional[str] = None,
sign_entry: bool = True,
) -> "SecurityContext":
"""
Create new security context with added audit entry.
Args:
event_type: Type of event being audited
event_data: Event-specific data
user_context: Optional user/agent context
sign_entry: Whether to cryptographically sign the entry
Returns:
SecurityContext: New context with audit entry added
"""
# Create audit entry
entry = AuditEntry(
timestamp=datetime.now(),
event_type=event_type,
event_data=event_data,
user_context=user_context,
)
# Sign entry if requested
if sign_entry:
entry_data = f"{entry.timestamp.isoformat()}|{entry.event_type}|{str(entry.event_data)}"
signature = sign_data(entry_data.encode("utf-8"), self.cryptographic_keys)
entry = AuditEntry(
timestamp=entry.timestamp,
event_type=entry.event_type,
event_data=entry.event_data,
user_context=entry.user_context,
signature=signature,
)
# Add to audit trail
new_audit_trail = list(self.audit_trail)
new_audit_trail.append(entry)
# Trim if exceeding limits
while len(new_audit_trail) > self.max_audit_entries:
new_audit_trail.pop(0) # Remove oldest entry
return SecurityContext(
security_level=self.security_level,
cryptographic_keys=self.cryptographic_keys,
security_boundary=self.security_boundary,
audit_trail=new_audit_trail,
created_at=self.created_at,
max_audit_entries=self.max_audit_entries,
)
def validate_path_access(self, path: Path, operation: str = "read") -> bool:
"""
Validate path access with comprehensive security checks.
Args:
path: Path to validate
operation: Type of operation (read, write, execute, delete)
Returns:
bool: True if access is allowed
"""
# Basic boundary check
if not self.security_boundary.is_path_allowed(path):
# Log security violation
self.add_audit_entry(
"SECURITY_VIOLATION",
{
"violation_type": "path_access_denied",
"path": str(path),
"operation": operation,
"reason": "outside_security_boundary",
},
)
return False
# Additional checks based on security level
if self.security_level in [SecurityLevel.HIGH, SecurityLevel.MAXIMUM]:
# Check file size limits for write operations
if operation in ["write", "append"] and path.exists():
file_size_mb = path.stat().st_size / (1024 * 1024)
if file_size_mb > self.security_boundary.max_file_size_mb:
self.add_audit_entry(
"SECURITY_VIOLATION",
{
"violation_type": "file_size_exceeded",
"path": str(path),
"file_size_mb": file_size_mb,
"max_size_mb": self.security_boundary.max_file_size_mb,
},
)
return False
return True
def validate_executable_access(self, executable: str) -> bool:
"""
Validate executable access with security policy enforcement.
Args:
executable: Executable to validate
Returns:
bool: True if execution is allowed
"""
if not self.security_boundary.is_executable_allowed(executable):
self.add_audit_entry(
"SECURITY_VIOLATION",
{
"violation_type": "executable_access_denied",
"executable": executable,
"reason": "not_in_allowed_list",
},
)
return False
return True
def encrypt_sensitive_data(self, data: bytes) -> EncryptionResult:
"""
Encrypt sensitive data using session cryptographic keys.
Args:
data: Sensitive data to encrypt
Returns:
EncryptionResult: Encrypted data with metadata
"""
return encrypt_data(data, self.cryptographic_keys)
def decrypt_sensitive_data(self, encryption_result: EncryptionResult) -> bytes:
"""
Decrypt sensitive data using session cryptographic keys.
Args:
encryption_result: Encrypted data to decrypt
Returns:
bytes: Decrypted sensitive data
"""
return decrypt_data(encryption_result, self.cryptographic_keys)
def verify_audit_integrity(self) -> bool:
"""
Verify cryptographic integrity of audit trail.
Returns:
bool: True if all signed entries are valid
"""
for entry in self.audit_trail:
if entry.signature:
entry_data = f"{entry.timestamp.isoformat()}|{entry.event_type}|{str(entry.event_data)}"
if not verify_signature(
entry_data.encode("utf-8"), entry.signature, self.cryptographic_keys
):
return False
return True
def get_security_summary(self) -> Dict[str, Any]:
"""
Get comprehensive security context summary.
Returns:
Dict[str, Any]: Security summary information
"""
signed_entries = sum(1 for entry in self.audit_trail if entry.is_signed())
return {
"security_level": self.security_level.value,
"cryptographic_strength": self.cryptographic_keys.strength.value,
"encryption_fingerprint": self.cryptographic_keys.get_encryption_fingerprint(),
"signing_fingerprint": self.cryptographic_keys.get_signing_fingerprint(),
"audit_entries": len(self.audit_trail),
"signed_entries": signed_entries,
"audit_integrity": self.verify_audit_integrity(),
"keys_age_hours": (
datetime.now() - self.cryptographic_keys.created_at
).total_seconds()
/ 3600,
"network_access": self.security_boundary.network_access,
"shell_access": self.security_boundary.shell_access,
"allowed_paths": len(self.security_boundary.allowed_paths),
"forbidden_paths": len(self.security_boundary.forbidden_paths),
}
@dataclass(frozen=True)
class AuditEvent:
"""
Immutable audit event for security logging.
Contracts:
Invariants:
- Event ID is non-empty and follows naming convention
- Timestamp represents accurate event occurrence time
- Metadata is validated and sanitized
"""
event_id: str
timestamp: datetime = field(default_factory=datetime.utcnow)
user_id: str = ""
operation: str = ""
resource_type: str = ""
resource_id: str = ""
success: bool = True
error_message: Optional[str] = None
security_level: SecurityLevel = SecurityLevel.MEDIUM
metadata: Dict[str, Any] = field(default_factory=dict)
signature: Optional[str] = None
def __post_init__(self):
"""Validate audit event structure."""
if not self.event_id or not self.event_id.strip():
raise ValidationError("Event ID cannot be empty")
if not self.operation or not self.operation.strip():
raise ValidationError("Operation cannot be empty")
# Metadata validation
if len(self.metadata) > 50:
raise ValidationError("Too many metadata entries")
for key, value in self.metadata.items():
if not isinstance(key, str) or len(key) > 100:
raise ValidationError(f"Invalid metadata key: {key}")
if not isinstance(value, (str, int, float, bool)):
raise ValidationError(f"Invalid metadata value type for key {key}")
def get_event_data(self) -> str:
"""Get serialized event data for signing."""
return f"{self.timestamp.isoformat()}|{self.event_id}|{self.operation}|{self.user_id}|{str(self.metadata)}"
@dataclass(frozen=True)
class EncryptionContext:
"""
Encryption context for cryptographic operations.
Contains all necessary information for encryption/decryption
operations including keys, algorithms, and security parameters.
"""
encryption_key: AESKey
initialization_vector: bytes = field(
default_factory=lambda: secrets.token_bytes(16)
)
algorithm: str = "AES-256-GCM"
key_derivation: str = "PBKDF2"
def __post_init__(self):
"""Validate encryption context."""
if not self.algorithm:
raise ValidationError("Algorithm cannot be empty")
valid_algorithms = {
"AES-256-GCM",
"AES-256-CBC",
"AES-GCM",
"AES-CBC",
"ChaCha20-Poly1305",
}
if self.algorithm not in valid_algorithms:
raise ValidationError(f"Invalid algorithm: {self.algorithm}")
if len(self.initialization_vector) != 16:
raise ValueError(
f"Initialization vector must be 16 bytes, got {len(self.initialization_vector)}"
)
def is_secure_algorithm(self) -> bool:
"""Check if algorithm provides authenticated encryption."""
return self.algorithm in {"AES-256-GCM", "AES-GCM", "ChaCha20-Poly1305"}
# ============================================================================
# CRYPTOGRAPHIC UTILITY FUNCTIONS - Key Generation and Management
# ============================================================================
def create_aes_key(key_bytes: Optional[bytes] = None) -> AESKey:
"""
Create AES-256 key from provided bytes or generate new one.
Args:
key_bytes: Optional key bytes (must be 32 bytes for AES-256)
Returns:
AESKey: 256-bit AES key
Raises:
ValueError: If key_bytes length is invalid
"""
if key_bytes is not None:
if len(key_bytes) != 32:
raise ValueError(f"AES-256 key must be 32 bytes, got {len(key_bytes)}")
return AESKey(key_bytes)
import secrets
return AESKey(secrets.token_bytes(32))
def create_jwt_token(payload: Dict[str, Any]) -> JWTToken:
"""
Create JWT token with payload.
Args:
payload: Token payload
Returns:
JWTToken: Encoded JWT token
"""
import base64
import json
# Simple JWT implementation for testing
header = {"alg": "HS256", "typ": "JWT"}
header_b64 = (
base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip("=")
)
payload_b64 = (
base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip("=")
)
return JWTToken(f"{header_b64}.{payload_b64}.signature")
def create_security_context(
security_level: SecurityLevel, session_root: Path
) -> SecurityContext:
"""
Create new security context with appropriate configuration for level.
Args:
security_level: Desired security level
session_root: Root path for session boundaries
Returns:
SecurityContext: New security context
"""
# Generate cryptographic keys based on security level
crypto_strength = {
SecurityLevel.LOW: CryptographicStrength.STANDARD,
SecurityLevel.MEDIUM: CryptographicStrength.STANDARD,
SecurityLevel.HIGH: CryptographicStrength.HIGH,
SecurityLevel.MAXIMUM: CryptographicStrength.MAXIMUM,
SecurityLevel.INTERNAL: CryptographicStrength.HIGH,
SecurityLevel.CONFIDENTIAL: CryptographicStrength.HIGH,
SecurityLevel.SECRET: CryptographicStrength.MAXIMUM,
}[security_level]
keys = generate_cryptographic_keys(crypto_strength)
# Create security boundary based on level
allowed_paths = {session_root}
max_file_size = {
SecurityLevel.LOW: 1024, # 1GB
SecurityLevel.MEDIUM: 512, # 512MB
SecurityLevel.HIGH: 256, # 256MB
SecurityLevel.MAXIMUM: 100, # 100MB
SecurityLevel.INTERNAL: 512, # 512MB for internal ops
SecurityLevel.CONFIDENTIAL: 256, # 256MB for confidential
SecurityLevel.SECRET: 100, # 100MB for secret
}[security_level]
network_allowed = security_level in [
SecurityLevel.LOW,
SecurityLevel.MEDIUM,
SecurityLevel.INTERNAL,
]
shell_allowed = security_level in [SecurityLevel.LOW, SecurityLevel.INTERNAL]
boundary = SecurityBoundary(
allowed_paths=allowed_paths,
max_file_size_mb=max_file_size,
network_access=network_allowed,
shell_access=shell_allowed,
)
context = SecurityContext(
security_level=security_level,
cryptographic_keys=keys,
security_boundary=boundary,
)
# Add initial audit entry
return context.add_audit_entry(
"SESSION_CREATED",
{
"security_level": security_level.value,
"session_root": str(session_root),
"cryptographic_strength": crypto_strength.value,
},
)