"""
Core ID Types - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: NewType pattern for brand-based type safety
- Security Model: Input validation with comprehensive sanitization
- Performance Profile: O(1) validation for all ID operations
Technical Decisions:
- Branded Types: Prevent category errors between different ID types
- Validation Functions: Separate validation logic for testing and reuse
- Security Focus: Input sanitization to prevent injection attacks
Dependencies & Integration:
- External: None (stdlib only for maximum portability)
- Internal: Foundation types for all other modules
Quality Assurance:
- Test Coverage: Property-based testing for all ID operations
- Error Handling: Comprehensive validation with typed exceptions
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import re
import uuid
from dataclasses import dataclass
from datetime import datetime
from typing import NewType
# ============================================================================
# BRANDED ID TYPES - Maximum Type Safety
# ============================================================================
# Core branded types to prevent category errors
AgentId = NewType("AgentId", str)
SessionId = NewType("SessionId", str)
ProcessId = NewType("ProcessId", int)
TabId = NewType("TabId", str) # iTerm2 tab identifier
ITermTabId = TabId # Alias for backwards compatibility
MessageId = NewType("MessageId", str)
StateId = NewType("StateId", str) # State management identifier
BackupId = NewType("BackupId", str) # Backup identifier
# ============================================================================
# VALIDATION EXCEPTIONS - Typed Error Handling
# ============================================================================
class IDValidationError(Exception):
"""Base exception for ID validation failures."""
def __init__(self, id_type: str, value: str, reason: str):
self.id_type = id_type
self.value = value[:50] + "..." if len(value) > 50 else value
self.reason = reason
super().__init__(f"Invalid {id_type}: {reason} (value: {self.value})")
class AgentIDValidationError(IDValidationError):
"""Exception for agent ID validation failures."""
def __init__(self, value: str, reason: str):
super().__init__("AgentId", value, reason)
class SessionIDValidationError(IDValidationError):
"""Exception for session ID validation failures."""
def __init__(self, value: str, reason: str):
super().__init__("SessionId", value, reason)
# Alias for backwards compatibility
InvalidIdentifierError = IDValidationError
# ============================================================================
# VALIDATION PATTERNS - Security-Focused Input Validation
# ============================================================================
# Agent name pattern: Agent_[1-99]
AGENT_NAME_PATTERN = re.compile(r"^Agent_([1-9]|[1-9][0-9])$")
# Session ID pattern: session_[uuid4]
SESSION_ID_PATTERN = re.compile(
r"^session_[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
)
# Tab ID pattern: iTerm2 tab identifiers (alphanumeric with optional hyphens)
TAB_ID_PATTERN = re.compile(r"^[a-zA-Z0-9\-_]{1,64}$")
ITERM_TAB_ID_PATTERN = TAB_ID_PATTERN # Alias for backwards compatibility
# Agent ID pattern: Agent_[1-99]
AGENT_ID_PATTERN = AGENT_NAME_PATTERN
# Message ID pattern: msg_[timestamp]_[uuid4_short]
MESSAGE_ID_PATTERN = re.compile(r"^msg_\d{10}_[0-9a-f]{8}$")
# Security constraints
MAX_ID_LENGTH = 128
MIN_AGENT_NUMBER = 1
MAX_AGENT_NUMBER = 99
MAX_PROCESS_ID = 2**31 - 1 # Maximum safe 32-bit signed integer
# ============================================================================
# VALIDATION FUNCTIONS - Design by Contract Implementation
# ============================================================================
def validate_agent_name(agent_name: str) -> AgentId:
"""
Validate and convert agent name to AgentId with comprehensive security checks.
Contracts:
Preconditions:
- agent_name is not None and is string
- agent_name length <= MAX_ID_LENGTH
Postconditions:
- Returns valid AgentId with Agent_# format
- Agent number in range [1, 99]
- No injection patterns or malicious content
Invariants:
- Agent names remain consistent across validation calls
- No agent name can be validated twice with different results
Security Implementation:
- Input Sanitization: Strict pattern matching with whitelist validation
- Injection Prevention: No shell metacharacters or escape sequences
- Range Validation: Agent numbers within defined operational limits
- Length Limits: Prevent buffer overflow and resource exhaustion
Args:
agent_name: Raw agent name string to validate
Returns:
AgentId: Type-safe agent identifier
Raises:
AgentIDValidationError: Invalid format, range, or security violation
"""
# Precondition validation
if agent_name is None:
raise AgentIDValidationError("", "agent name cannot be None")
if not isinstance(agent_name, str):
raise AgentIDValidationError(
str(agent_name), f"must be string, got {type(agent_name)}"
)
if len(agent_name) > MAX_ID_LENGTH:
raise AgentIDValidationError(
agent_name, f"exceeds maximum length {MAX_ID_LENGTH}"
)
if len(agent_name.strip()) == 0:
raise AgentIDValidationError(agent_name, "cannot be empty or whitespace only")
# Security validation - pattern matching
if not AGENT_NAME_PATTERN.match(agent_name):
raise AgentIDValidationError(agent_name, "must match pattern Agent_[1-99]")
# Extract and validate agent number
try:
agent_number = int(agent_name.split("_")[1])
except (ValueError, IndexError):
raise AgentIDValidationError(agent_name, "invalid agent number format")
if not (MIN_AGENT_NUMBER <= agent_number <= MAX_AGENT_NUMBER):
raise AgentIDValidationError(
agent_name,
f"agent number {agent_number} not in range [{MIN_AGENT_NUMBER}, {MAX_AGENT_NUMBER}]",
)
# Security check for injection patterns
dangerous_chars = ["`", "$", "(", ")", ";", "|", "&", "<", ">", '"', "'", "\\"]
if any(char in agent_name for char in dangerous_chars):
raise AgentIDValidationError(agent_name, "contains dangerous characters")
return AgentId(agent_name)
def create_session_id() -> SessionId:
"""
Generate cryptographically secure session ID.
Contracts:
Preconditions: None
Postconditions:
- Returns unique SessionId with session_ prefix
- Contains valid UUID4 for cryptographic uniqueness
- Matches SESSION_ID_PATTERN exactly
Invariants:
- Each call generates unique session ID
- Session IDs are never reused within process lifetime
Security Implementation:
- Cryptographic Randomness: Uses uuid.uuid4() for secure random generation
- Collision Resistance: UUID4 provides 122 bits of randomness
- Format Consistency: Standardized session_ prefix for identification
Returns:
SessionId: Cryptographically unique session identifier
"""
session_uuid = str(uuid.uuid4())
session_id_str = f"session_{session_uuid}"
# Post-condition validation
assert SESSION_ID_PATTERN.match(
session_id_str
), "Generated session ID must match pattern"
return SessionId(session_id_str)
def validate_session_name(session_name: str) -> str:
"""
Validate session name format and security properties.
Contracts:
Preconditions:
- session_name is not None and is string
- session_name length <= 64 characters
Postconditions:
- Returns valid session name string
- No injection patterns or malicious content
Invariants:
- Valid session names remain valid across validation calls
- Validation is deterministic for same input
Args:
session_name: Raw session name string to validate
Returns:
str: Validated session name
Raises:
ValidationError: Invalid format or security violation
"""
# Precondition validation
if session_name is None:
raise IDValidationError("SessionName", "", "session name cannot be None")
if not isinstance(session_name, str):
raise IDValidationError(
"SessionName",
str(session_name),
f"must be string, got {type(session_name)}",
)
if len(session_name) > 64:
raise IDValidationError(
"SessionName", session_name, "exceeds maximum length 64"
)
if len(session_name.strip()) == 0:
raise IDValidationError(
"SessionName", session_name, "cannot be empty or whitespace only"
)
# Security validation
dangerous_chars = [
"`",
"$",
"(",
")",
";",
"|",
"&",
"<",
">",
'"',
"'",
"\\",
"/",
"..",
]
if any(char in session_name for char in dangerous_chars):
raise IDValidationError(
"SessionName", session_name, "contains dangerous characters"
)
# Must be alphanumeric with hyphens, underscores, and spaces
if not re.match(r"^[a-zA-Z0-9\s\-_]+$", session_name):
raise IDValidationError(
"SessionName",
session_name,
"must contain only alphanumeric characters, spaces, hyphens, and underscores",
)
return session_name.strip()
def validate_session_id(session_id_str: str) -> SessionId:
"""
Validate session ID format and security properties.
Contracts:
Preconditions:
- session_id_str is not None and is string
- session_id_str length <= MAX_ID_LENGTH
Postconditions:
- Returns valid SessionId with proper UUID4 format
- No injection patterns or malicious content
Invariants:
- Valid session IDs remain valid across validation calls
- Validation is deterministic for same input
Args:
session_id_str: Raw session ID string to validate
Returns:
SessionId: Type-safe session identifier
Raises:
SessionIDValidationError: Invalid format or security violation
"""
# Precondition validation
if session_id_str is None:
raise SessionIDValidationError("", "session ID cannot be None")
if not isinstance(session_id_str, str):
raise SessionIDValidationError(
str(session_id_str), f"must be string, got {type(session_id_str)}"
)
if len(session_id_str) > MAX_ID_LENGTH:
raise SessionIDValidationError(
session_id_str, f"exceeds maximum length {MAX_ID_LENGTH}"
)
if len(session_id_str.strip()) == 0:
raise SessionIDValidationError(
session_id_str, "cannot be empty or whitespace only"
)
# Pattern validation
if not SESSION_ID_PATTERN.match(session_id_str):
raise SessionIDValidationError(
session_id_str, "must match pattern session_[uuid4]"
)
# Security validation
dangerous_chars = ["`", "$", "(", ")", ";", "|", "&", "<", ">", '"', "'", "\\"]
if any(char in session_id_str for char in dangerous_chars):
raise SessionIDValidationError(session_id_str, "contains dangerous characters")
return SessionId(session_id_str)
def validate_process_id(process_id: int) -> ProcessId:
"""
Validate process ID with security and range checks.
Contracts:
Preconditions:
- process_id is not None and is integer
- process_id > 0 (valid process IDs are positive)
Postconditions:
- Returns valid ProcessId within system limits
- Process ID can be safely used in system operations
Invariants:
- Valid process IDs remain valid across validation calls
- Process ID validation is deterministic
Args:
process_id: Raw process ID integer to validate
Returns:
ProcessId: Type-safe process identifier
Raises:
IDValidationError: Invalid range or type
"""
# Precondition validation
if process_id is None:
raise IDValidationError(
"ProcessId", str(process_id), "process ID cannot be None"
)
if not isinstance(process_id, int):
raise IDValidationError(
"ProcessId", str(process_id), f"must be integer, got {type(process_id)}"
)
if process_id <= 0:
raise IDValidationError(
"ProcessId", str(process_id), "must be positive integer"
)
if process_id > MAX_PROCESS_ID:
raise IDValidationError(
"ProcessId", str(process_id), f"exceeds maximum {MAX_PROCESS_ID}"
)
return ProcessId(process_id)
def validate_tab_id(tab_id_str: str) -> TabId:
"""
Validate iTerm2 tab ID with security checks.
Contracts:
Preconditions:
- tab_id_str is not None and is string
- tab_id_str length <= 64 characters
Postconditions:
- Returns valid TabId for iTerm2 operations
- No injection patterns or malicious content
Invariants:
- Valid tab IDs remain valid across validation calls
- Tab ID validation is deterministic
Args:
tab_id_str: Raw tab ID string to validate
Returns:
TabId: Type-safe iTerm2 tab identifier
Raises:
IDValidationError: Invalid format or security violation
"""
# Precondition validation
if tab_id_str is None:
raise IDValidationError("TabId", "", "tab ID cannot be None")
if not isinstance(tab_id_str, str):
raise IDValidationError(
"TabId", str(tab_id_str), f"must be string, got {type(tab_id_str)}"
)
if len(tab_id_str.strip()) == 0:
raise IDValidationError(
"TabId", tab_id_str, "cannot be empty or whitespace only"
)
# Pattern and length validation
if not TAB_ID_PATTERN.match(tab_id_str):
raise IDValidationError(
"TabId",
tab_id_str,
"must contain only alphanumeric characters, hyphens, and underscores",
)
return TabId(tab_id_str)
def create_message_id() -> MessageId:
"""
Generate message ID with timestamp and random component.
Contracts:
Preconditions: None
Postconditions:
- Returns unique MessageId with timestamp prefix
- Contains random component for uniqueness
- Matches MESSAGE_ID_PATTERN exactly
Invariants:
- Message IDs are temporally ordered by timestamp
- Each call generates unique message ID
Security Implementation:
- Timestamp Component: Enables temporal ordering and debugging
- Random Component: Prevents collision and ensures uniqueness
- Format Consistency: Standardized msg_ prefix for identification
Returns:
MessageId: Unique message identifier
"""
timestamp = int(datetime.now().timestamp())
random_component = str(uuid.uuid4()).split("-")[0] # First 8 hex chars
message_id_str = f"msg_{timestamp}_{random_component}"
# Post-condition validation
assert MESSAGE_ID_PATTERN.match(
message_id_str
), "Generated message ID must match pattern"
return MessageId(message_id_str)
# ============================================================================
# ID METADATA - Structured Information Extraction
# ============================================================================
@dataclass(frozen=True)
class AgentIdInfo:
"""Immutable metadata extracted from AgentId."""
agent_id: AgentId
agent_number: int
display_name: str
def __post_init__(self):
"""Validate metadata consistency with source AgentId."""
# Validation that derived data matches source
expected_display_name = f"Agent_{self.agent_number}"
if self.display_name != expected_display_name:
raise ValueError(
f"Display name {self.display_name} inconsistent with agent number {self.agent_number}"
)
@dataclass(frozen=True)
class MessageIdInfo:
"""Immutable metadata extracted from MessageId."""
message_id: MessageId
timestamp: datetime
random_component: str
def __post_init__(self):
"""Validate metadata consistency with source MessageId."""
# Reconstruct expected message ID and verify consistency
expected_id = f"msg_{int(self.timestamp.timestamp())}_{self.random_component}"
if str(self.message_id) != expected_id:
raise ValueError(f"Metadata inconsistent with message ID {self.message_id}")
def extract_agent_info(agent_id: AgentId) -> AgentIdInfo:
"""
Extract structured metadata from validated AgentId.
Args:
agent_id: Validated AgentId
Returns:
AgentIdInfo: Immutable metadata structure
Raises:
ValueError: If AgentId format is corrupted
"""
agent_id_str = str(agent_id)
try:
agent_number = int(agent_id_str.split("_")[1])
return AgentIdInfo(
agent_id=agent_id, agent_number=agent_number, display_name=agent_id_str
)
except (ValueError, IndexError):
raise ValueError(f"Corrupted AgentId format: {agent_id}")
def extract_message_info(message_id: MessageId) -> MessageIdInfo:
"""
Extract structured metadata from MessageId.
Args:
message_id: MessageId to analyze
Returns:
MessageIdInfo: Immutable metadata structure
Raises:
ValueError: If MessageId format is corrupted
"""
message_id_str = str(message_id)
try:
parts = message_id_str.split("_")
timestamp = datetime.fromtimestamp(int(parts[1]))
random_component = parts[2]
return MessageIdInfo(
message_id=message_id,
timestamp=timestamp,
random_component=random_component,
)
except (ValueError, IndexError):
raise ValueError(f"Corrupted MessageId format: {message_id}")
# ============================================================================
# UTILITY FUNCTIONS - Type-Safe Operations
# ============================================================================
def is_valid_agent_name(agent_name: str) -> bool:
"""
Check if agent name is valid without raising exceptions.
Args:
agent_name: Agent name to check
Returns:
bool: True if valid, False otherwise
"""
try:
validate_agent_name(agent_name)
return True
except AgentIDValidationError:
return False
def is_valid_session_id(session_id_str: str) -> bool:
"""
Check if session ID is valid without raising exceptions.
Args:
session_id_str: Session ID to check
Returns:
bool: True if valid, False otherwise
"""
try:
validate_session_id(session_id_str)
return True
except SessionIDValidationError:
return False
def agent_ids_equal(agent_id1: AgentId, agent_id2: AgentId) -> bool:
"""
Type-safe equality comparison for AgentIds.
Args:
agent_id1: First AgentId
agent_id2: Second AgentId
Returns:
bool: True if equal, False otherwise
"""
return str(agent_id1) == str(agent_id2)
def session_ids_equal(session_id1: SessionId, session_id2: SessionId) -> bool:
"""
Type-safe equality comparison for SessionIds.
Args:
session_id1: First SessionId
session_id2: Second SessionId
Returns:
bool: True if equal, False otherwise
"""
return str(session_id1) == str(session_id2)
# ============================================================================
# CREATION FUNCTIONS - Test Support
# ============================================================================
def create_agent_id(agent_number: int = None) -> AgentId:
"""
Create AgentId for testing and default scenarios.
Args:
agent_number: Agent number (1-99), if None generates unique one
Returns:
AgentId: Valid agent identifier
"""
if agent_number is None:
# Generate unique agent number using UUID
import uuid
# Use last few digits of UUID to get a number in valid range
unique_num = (
int(str(uuid.uuid4()).replace("-", ""), 16) % MAX_AGENT_NUMBER
) + 1
agent_number = unique_num
if not (MIN_AGENT_NUMBER <= agent_number <= MAX_AGENT_NUMBER):
raise ValueError(
f"Agent number {agent_number} not in range [{MIN_AGENT_NUMBER}, {MAX_AGENT_NUMBER}]"
)
agent_name = f"Agent_{agent_number}"
return validate_agent_name(agent_name)
def create_iterm_tab_id() -> ITermTabId:
"""
Create ITermTabId for testing scenarios.
Returns:
ITermTabId: Valid iTerm2 tab identifier
"""
# Generate a tab ID with full UUID (remove dashes to fit pattern)
uuid_str = str(uuid.uuid4()).replace("-", "")
tab_id = f"tab_{uuid_str}"
return validate_tab_id(tab_id)
def create_process_id(pid: int = None) -> ProcessId:
"""
Create ProcessId for testing scenarios.
Args:
pid: Process ID number, if None uses current process
Returns:
ProcessId: Valid process identifier
"""
if pid is not None:
# Validate the provided PID
if not (1 <= pid <= 2147483647): # Valid PID range
raise ValueError(f"Process ID {pid} not in valid range [1, 2147483647]")
return ProcessId(pid)
# Generate a reasonable test process ID
import os
return ProcessId(os.getpid())
def is_valid_agent_id(agent_id_str: str) -> bool:
"""
Check if agent ID string is valid without raising exceptions.
Args:
agent_id_str: Agent ID string to check
Returns:
bool: True if valid, False otherwise
"""
return is_valid_agent_name(agent_id_str)