"""
Type Validation Functions - Agent Orchestration Platform
Comprehensive validation functions for all type operations with security focus,
property-based testing support, and defensive programming patterns.
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Type, TypeVar, Union
from src.contracts_compat import ensure, require
from .agent import AgentState, AgentStatus, ClaudeConfig
from .ids import AgentId, ProcessId, SessionId, TabId
from .iterm import TabState
from .mcp_results import MCPToolResult
from .security import Permission, SecurityContext
from .session import SecurityLevel, SessionState
T = TypeVar("T")
class ValidationError(ValueError):
"""Base exception for validation errors with detailed context."""
def __init__(self, message: str, field_name: str, value: Any):
self.field_name = field_name
self.value = str(value)[:100] # Truncate for security
super().__init__(f"Validation failed for {field_name}: {message}")
@dataclass(frozen=True)
class ValidationResult:
"""Result of validation operation with detailed information."""
is_valid: bool
message: str = ""
field_name: str = ""
error_code: str = ""
@classmethod
def success(cls, message: str = "Validation passed") -> "ValidationResult":
"""Create successful validation result."""
return cls(is_valid=True, message=message)
@classmethod
def failure(
cls, message: str, field_name: str = "", error_code: str = "VALIDATION_ERROR"
) -> "ValidationResult":
"""Create failed validation result."""
return cls(
is_valid=False,
message=message,
field_name=field_name,
error_code=error_code,
)
@require(lambda value: value is not None)
@ensure(lambda result: isinstance(result, bool))
def is_valid_type_instance(value: Any, expected_type: Type[T]) -> bool:
"""
Validate that value is instance of expected type with security checks.
Contracts:
Preconditions:
- value is not None
Postconditions:
- Result indicates type compliance
Args:
value: Value to validate
expected_type: Expected type class
Returns:
bool: True if value is valid instance
Security Implementation:
- Type Safety: Prevents type confusion attacks
- Null Checking: Prevents null pointer issues
- Safe Validation: No exception propagation
"""
try:
return isinstance(value, expected_type)
except Exception:
return False
@require(lambda agent_state: isinstance(agent_state, AgentState))
@ensure(lambda result: isinstance(result, bool))
def validate_agent_state_consistency(agent_state: AgentState) -> bool:
"""
Validate agent state internal consistency with comprehensive checks.
Contracts:
Preconditions:
- agent_state is AgentState instance
Postconditions:
- Result indicates state consistency
Args:
agent_state: Agent state to validate
Returns:
bool: True if state is internally consistent
Validation Checks:
- Status and process consistency
- Resource metrics validity
- Timestamp consistency
- Configuration compliance
"""
try:
# Check status and process consistency
if agent_state.status == AgentStatus.ACTIVE:
if agent_state.process_id is None:
return False
# Check resource metrics validity
if not agent_state.resource_metrics.is_within_limits(agent_state.claude_config):
if agent_state.status not in {AgentStatus.ERROR, AgentStatus.TERMINATED}:
return False
# Check timestamp consistency
if agent_state.last_heartbeat < agent_state.created_at:
return False
# Check conversation history
if len(agent_state.conversation_history) > 1000:
return False
return True
except Exception:
return False
@require(lambda session_state: isinstance(session_state, SessionState))
@ensure(lambda result: isinstance(result, bool))
def validate_session_state_consistency(session_state: SessionState) -> bool:
"""
Validate session state internal consistency with security checks.
Contracts:
Preconditions:
- session_state is SessionState instance
Postconditions:
- Result indicates state consistency
Args:
session_state: Session state to validate
Returns:
bool: True if state is internally consistent
Validation Checks:
- Agent count limits
- Security level compliance
- Filesystem boundary validation
- Metrics consistency
"""
try:
# Check agent count limits
if len(session_state.agents) > session_state.max_agents:
return False
if len(session_state.agents) > session_state.security_level.get_max_agents():
return False
# Check all agents belong to session
for agent_id, agent in session_state.agents.items():
if agent.session_id != session_state.session_id:
return False
# Check filesystem boundaries
if not session_state.filesystem_boundaries:
return False
# Check root path accessibility
if not session_state.root_path.exists():
return False
# Check metrics consistency
if not session_state.metrics.is_within_limits(session_state.security_level):
return False
return True
except Exception:
return False
@require(lambda security_context: isinstance(security_context, SecurityContext))
@ensure(lambda result: isinstance(result, bool))
def validate_security_context(security_context: SecurityContext) -> bool:
"""
Validate security context with comprehensive security checks.
Contracts:
Preconditions:
- security_context is SecurityContext instance
Postconditions:
- Result indicates security compliance
Args:
security_context: Security context to validate
Returns:
bool: True if security context is valid
Validation Checks:
- Token expiration
- Permission consistency
- User identification
- Session permission validity
"""
try:
# Check token validity
if not security_context.is_valid():
return False
# Check permission consistency
if not security_context.permissions:
return False
# Check user identification
if not security_context.user_id.strip():
return False
# Check session permissions
for session_id, session_perms in security_context.session_permissions.items():
if not session_perms:
return False
return True
except Exception:
return False
@require(lambda path: isinstance(path, (str, Path)))
@ensure(lambda result: isinstance(result, bool))
def validate_filesystem_path(path: Union[str, Path]) -> bool:
"""
Validate filesystem path with security checks.
Contracts:
Preconditions:
- path is string or Path object
Postconditions:
- Result indicates path safety
Args:
path: Filesystem path to validate
Returns:
bool: True if path is safe and accessible
Security Implementation:
- Path Traversal Prevention: Detects ../ attacks
- Symbolic Link Handling: Safely resolves links
- Access Validation: Checks path accessibility
"""
try:
# Convert to Path object
if isinstance(path, str):
path_obj = Path(path)
else:
path_obj = path
# Check for path traversal attempts
path_str = str(path_obj)
if ".." in path_str or path_str.startswith("/"):
# Allow absolute paths but check for traversal
if "../" in path_str or "/.." in path_str:
return False
# Resolve path safely
try:
resolved_path = path_obj.resolve()
except (OSError, ValueError):
return False
# Check if path exists and is accessible
return resolved_path.exists()
except Exception:
return False
@require(lambda config: isinstance(config, ClaudeConfig))
@ensure(lambda result: isinstance(result, bool))
def validate_claude_config(config: ClaudeConfig) -> bool:
"""
Validate Claude Code configuration with security enforcement.
Contracts:
Preconditions:
- config is ClaudeConfig instance
Postconditions:
- Result indicates configuration safety
Args:
config: Claude configuration to validate
Returns:
bool: True if configuration is safe
Validation Checks:
- Resource limit compliance
- Security setting validation
- Command safety verification
"""
try:
# Check resource limits
if config.max_memory_mb > 1024 or config.max_memory_mb < 64:
return False
if config.max_cpu_percent > 50.0 or config.max_cpu_percent < 1.0:
return False
# Check timeout bounds
if not (30 <= config.timeout_seconds <= 3600):
return False
# Check custom commands safety
for command in config.custom_commands:
if not validate_command_safety(command):
return False
return True
except Exception:
return False
@require(lambda command: isinstance(command, str))
@ensure(lambda result: isinstance(result, bool))
def validate_command_safety(command: str) -> bool:
"""
Validate command safety with injection prevention.
Contracts:
Preconditions:
- command is string
Postconditions:
- Result indicates command safety
Args:
command: Command string to validate
Returns:
bool: True if command is safe
Security Implementation:
- Injection Prevention: Detects dangerous patterns
- Length Validation: Prevents excessively long commands
- Character Filtering: Blocks dangerous characters
"""
try:
# Check command length
if len(command) > 1000:
return False
# Check for dangerous patterns
dangerous_patterns = [
r";\s*rm\s", # Command chaining with rm
r"&&\s*rm\s", # Command chaining with rm
r"\|\s*sh\s", # Pipe to shell
r"`.*`", # Command substitution
r"\$\(", # Command substitution
r">\s*/dev/", # Redirect to device files
r"<\s*/dev/", # Read from device files
]
for pattern in dangerous_patterns:
if re.search(pattern, command, re.IGNORECASE):
return False
# Check for excessive special characters
special_chars = command.count(";") + command.count("&") + command.count("|")
if special_chars > 3:
return False
return True
except Exception:
return False
@require(lambda data: data is not None)
@ensure(lambda result: isinstance(result, bool))
def validate_json_serializable(data: Any) -> bool:
"""
Validate that data is JSON serializable.
Args:
data: Data to validate
Returns:
bool: True if data can be JSON serialized
"""
try:
import json
json.dumps(data)
return True
except (TypeError, ValueError):
return False
# Validation registry for type-specific validators
TYPE_VALIDATORS = {
AgentState: validate_agent_state_consistency,
SessionState: validate_session_state_consistency,
SecurityContext: validate_security_context,
ClaudeConfig: validate_claude_config,
}
@require(lambda obj: obj is not None)
@ensure(lambda result: isinstance(result, bool))
def validate_object(obj: Any) -> bool:
"""
Validate object using appropriate type-specific validator.
Args:
obj: Object to validate
Returns:
bool: True if object is valid
"""
obj_type = type(obj)
if obj_type in TYPE_VALIDATORS:
return TYPE_VALIDATORS[obj_type](obj)
# Default validation - check basic integrity
try:
return obj is not None and hasattr(obj, "__dict__")
except Exception:
return False