"""
Comprehensive Input Validation and Sanitization
This module provides secure input validation and sanitization for the Agent Orchestration Platform,
implementing whitelist-based validation with security-focused sanitization for all external inputs.
Architecture Integration:
- Design Patterns: Chain of Responsibility for multi-stage validation, Factory for validator creation
- Security Model: Whitelist-based validation with fail-safe defaults and comprehensive sanitization
- Performance Profile: O(n) validation where n = input length, with early termination for violations
Technical Decisions:
- Whitelist Approach: Only explicitly allowed patterns and characters permitted
- Multi-Stage Validation: Format validation, content validation, security validation, length validation
- Escape Sequences: Comprehensive escaping for all output contexts (HTML, shell, SQL, etc.)
- Unicode Normalization: Consistent Unicode handling to prevent bypass attacks
Dependencies & Integration:
- External: re, html, urllib.parse for sanitization functions
- Internal: contracts for validation enforcement, audit for security violation logging
Quality Assurance:
- Test Coverage: Property-based testing with hypothesis for edge cases and attack patterns
- Error Handling: Clear validation errors with security-focused failure modes
Author: Adder_4 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import base64
import html
import json
import re
import unicodedata
import urllib.parse
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Set, Type, Union
from src.boundaries.audit import AuditCategory, AuditLevel, get_audit_logger
from src.utils.contracts_shim import ensure, require
class InputType(Enum):
"""Types of input for specialized validation."""
AGENT_NAME = "agent_name"
SESSION_NAME = "session_name"
MESSAGE_CONTENT = "message_content"
FILE_PATH = "file_path"
USER_ID = "user_id"
EMAIL = "email"
URL = "url"
JSON_DATA = "json_data"
SHELL_COMMAND = "shell_command"
SQL_QUERY = "sql_query"
GENERAL_TEXT = "general_text"
class ValidationSeverity(Enum):
"""Severity levels for validation violations."""
INFO = auto()
WARNING = auto()
ERROR = auto()
CRITICAL = auto()
@dataclass(frozen=True)
class ValidationRule:
"""Individual validation rule with security configuration."""
name: str
pattern: Optional[re.Pattern] = None
max_length: Optional[int] = None
min_length: Optional[int] = None
allowed_chars: Optional[Set[str]] = None
blocked_chars: Optional[Set[str]] = None
required: bool = True
sanitize: bool = True
severity: ValidationSeverity = ValidationSeverity.ERROR
def __post_init__(self):
"""Validate rule configuration."""
if self.max_length is not None and self.min_length is not None:
if self.max_length < self.min_length:
raise ValueError("max_length must be >= min_length")
@dataclass
class ValidationResult:
"""Result of input validation with security details."""
is_valid: bool
original_input: str
sanitized_input: Optional[str] = None
violations: List[str] = field(default_factory=list)
security_issues: List[str] = field(default_factory=list)
severity: ValidationSeverity = ValidationSeverity.INFO
def add_violation(
self, message: str, severity: ValidationSeverity = ValidationSeverity.ERROR
):
"""Add validation violation."""
self.violations.append(message)
if severity.value > self.severity.value:
self.severity = severity
self.is_valid = False
def add_security_issue(self, message: str):
"""Add security issue."""
self.security_issues.append(message)
self.severity = ValidationSeverity.CRITICAL
self.is_valid = False
class InputValidationError(Exception):
"""Exception raised for input validation failures."""
def __init__(self, message: str, result: Optional["ValidationResult"] = None):
super().__init__(message)
self.result = result
# Alias for backward compatibility
ValidationError = InputValidationError
class InputSanitizer:
"""
Comprehensive input sanitizer with context-aware escaping.
Provides sanitization for different output contexts to prevent
injection attacks and ensure safe handling of user input.
"""
# Dangerous patterns that should be blocked
INJECTION_PATTERNS = [
r"<script[^>]*>.*?</script>", # Script tags
r"javascript:", # JavaScript URLs
r"vbscript:", # VBScript URLs
r"on\w+\s*=", # Event handlers
r"expression\s*\(", # CSS expressions
r"@import", # CSS imports
r"\beval\s*\(", # Eval calls
r"\bexec\s*\(", # Exec calls
r"\bsystem\s*\(", # System calls
r"\bpasswd\b", # Password files
r"/etc/shadow", # Shadow files
r"\.\./.*", # Path traversal
r"\\.\\.\\.*", # Windows path traversal
r"\$\{.*\}", # Variable expansion
r"`.*`", # Command substitution
r"\|\s*\w+", # Pipe commands
r"&&\s*\w+", # Command chaining
r";\s*\w+", # Command termination
r"SELECT.*FROM", # SQL injection
r"UNION.*SELECT", # SQL union attacks
r"DROP.*TABLE", # SQL drops
r"--\s*", # SQL comments
r"/\*.*\*/", # SQL block comments
]
# Compile patterns for performance
COMPILED_PATTERNS = [
re.compile(pattern, re.IGNORECASE | re.DOTALL) for pattern in INJECTION_PATTERNS
]
@staticmethod
def normalize_unicode(text: str) -> str:
"""Normalize Unicode to prevent encoding attacks."""
# Normalize to NFC form to prevent Unicode normalization attacks
normalized = unicodedata.normalize("NFC", text)
# Remove or replace dangerous Unicode categories
filtered_chars = []
for char in normalized:
category = unicodedata.category(char)
# Allow most printable characters, but block dangerous ones
if category.startswith(
("L", "N", "P", "S", "Z")
): # Letters, Numbers, Punctuation, Symbols, Separators
if category not in (
"Cc",
"Cf",
"Cs",
"Co",
"Cn",
): # Block control chars, format chars, etc.
filtered_chars.append(char)
elif char in (" ", "\t", "\n", "\r"): # Allow basic whitespace
filtered_chars.append(char)
return "".join(filtered_chars)
@staticmethod
def detect_injection_attempts(text: str) -> List[str]:
"""Detect potential injection attempts in input."""
violations = []
for pattern in InputSanitizer.COMPILED_PATTERNS:
if pattern.search(text):
violations.append(
f"Potential injection pattern detected: {pattern.pattern}"
)
return violations
@staticmethod
def sanitize_for_html(text: str) -> str:
"""Sanitize text for safe HTML output."""
# HTML escape basic characters
escaped = html.escape(text, quote=True)
# Additional escaping for dangerous characters
escaped = escaped.replace("&", "&")
escaped = escaped.replace('"', """)
escaped = escaped.replace("'", "'")
escaped = escaped.replace("/", "/")
return escaped
@staticmethod
def sanitize_for_shell(text: str) -> str:
"""Sanitize text for safe shell usage."""
# Only allow alphanumeric, basic punctuation, and safe characters
allowed_chars = set(
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789._-+= "
)
sanitized_chars = []
for char in text:
if char in allowed_chars:
sanitized_chars.append(char)
else:
# Replace dangerous characters with underscores
sanitized_chars.append("_")
return "".join(sanitized_chars)
@staticmethod
def sanitize_for_json(text: str) -> str:
"""Sanitize text for safe JSON inclusion."""
# Use JSON encoding to escape properly
return json.dumps(text)[1:-1] # Remove surrounding quotes
@staticmethod
def sanitize_file_path(path: str) -> str:
"""Sanitize file path to prevent traversal attacks."""
# Normalize path separators
normalized = path.replace("\\", "/").replace("//", "/")
# Remove dangerous path components
dangerous_components = ["..", ".", "~", "$"]
parts = normalized.split("/")
safe_parts = []
for part in parts:
if part not in dangerous_components and not part.startswith("."):
# Only allow alphanumeric, dash, underscore, and dot
safe_part = re.sub(r"[^a-zA-Z0-9._-]", "_", part)
safe_parts.append(safe_part)
return "/".join(safe_parts)
@staticmethod
def sanitize_agent_name(name: str) -> str:
"""Sanitize agent name to ensure it follows Agent_# format."""
# Extract numbers from the name
numbers = re.findall(r"\d+", name)
if numbers:
return f"Agent_{numbers[0]}"
else:
return "Agent_1" # Default fallback
class InputValidator:
"""
Comprehensive input validator with security-focused validation rules.
Implements multi-stage validation with whitelist-based approach
and comprehensive sanitization for all input types.
"""
def __init__(self):
"""Initialize validator with predefined rules."""
self.rules: Dict[InputType, List[ValidationRule]] = {}
self.sanitizer = InputSanitizer()
self._setup_default_rules()
def _setup_default_rules(self) -> None:
"""Setup default validation rules for each input type."""
# Agent name validation
self.rules[InputType.AGENT_NAME] = [
ValidationRule(
name="agent_name_format",
pattern=re.compile(r"^Agent_\d+$"),
max_length=20,
min_length=7,
severity=ValidationSeverity.CRITICAL,
)
]
# Session name validation
self.rules[InputType.SESSION_NAME] = [
ValidationRule(
name="session_name_format",
pattern=re.compile(r"^[a-zA-Z0-9_-]+$"),
max_length=50,
min_length=3,
severity=ValidationSeverity.ERROR,
)
]
# Message content validation
self.rules[InputType.MESSAGE_CONTENT] = [
ValidationRule(
name="message_length",
max_length=100000, # 100KB
min_length=1,
severity=ValidationSeverity.ERROR,
),
ValidationRule(
name="message_content_safe",
blocked_chars={
"\x00",
"\x01",
"\x02",
"\x03",
"\x04",
"\x05",
}, # Control characters
severity=ValidationSeverity.WARNING,
),
]
# File path validation
self.rules[InputType.FILE_PATH] = [
ValidationRule(
name="file_path_safe",
pattern=re.compile(r"^[a-zA-Z0-9._/-]+$"),
max_length=500,
severity=ValidationSeverity.CRITICAL,
)
]
# User ID validation
self.rules[InputType.USER_ID] = [
ValidationRule(
name="user_id_format",
pattern=re.compile(r"^[a-zA-Z0-9@._-]+$"),
max_length=100,
min_length=3,
severity=ValidationSeverity.ERROR,
)
]
# Email validation
self.rules[InputType.EMAIL] = [
ValidationRule(
name="email_format",
pattern=re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"),
max_length=320, # RFC 5321 limit
severity=ValidationSeverity.ERROR,
)
]
# URL validation
self.rules[InputType.URL] = [
ValidationRule(
name="url_format",
pattern=re.compile(r"^https?://[a-zA-Z0-9.-]+[a-zA-Z0-9._/-]*$"),
max_length=2048,
severity=ValidationSeverity.WARNING,
)
]
# General text validation
self.rules[InputType.GENERAL_TEXT] = [
ValidationRule(
name="general_text_safe",
max_length=10000,
blocked_chars={"\x00"}, # Null bytes
severity=ValidationSeverity.WARNING,
)
]
@require(lambda input_text: input_text is not None)
@ensure(lambda result: isinstance(result, ValidationResult))
def validate_input(
self,
input_text: str,
input_type: InputType,
additional_rules: Optional[List[ValidationRule]] = None,
context: Optional[str] = None,
) -> ValidationResult:
"""
Validate input against rules with comprehensive security checks.
Contracts:
Preconditions:
- Input text must not be None
Postconditions:
- Returns ValidationResult object
- All security checks performed
- Sanitized output provided if possible
Invariants:
- Injection attempts always detected
- Unicode normalization always applied
- Dangerous patterns always blocked
"""
result = ValidationResult(is_valid=True, original_input=input_text)
try:
# Step 1: Unicode normalization
normalized_input = self.sanitizer.normalize_unicode(input_text)
# Step 2: Injection detection
injection_attempts = self.sanitizer.detect_injection_attempts(
normalized_input
)
for attempt in injection_attempts:
result.add_security_issue(attempt)
# Step 3: Apply type-specific rules
type_rules = self.rules.get(input_type, [])
if additional_rules:
type_rules.extend(additional_rules)
for rule in type_rules:
self._apply_rule(rule, normalized_input, result)
# Step 4: Sanitization based on context
if result.is_valid or rule.sanitize:
result.sanitized_input = self._sanitize_for_context(
normalized_input, input_type, context
)
# Step 5: Log security violations
if result.security_issues:
self._log_security_violations(
input_text, input_type, result.security_issues
)
return result
except Exception as e:
result.add_violation(
f"Validation error: {str(e)}", ValidationSeverity.CRITICAL
)
return result
def _apply_rule(
self, rule: ValidationRule, input_text: str, result: ValidationResult
) -> None:
"""Apply individual validation rule."""
# Length checks
if rule.max_length is not None and len(input_text) > rule.max_length:
result.add_violation(
f"Input exceeds maximum length of {rule.max_length} characters",
rule.severity,
)
if rule.min_length is not None and len(input_text) < rule.min_length:
result.add_violation(
f"Input below minimum length of {rule.min_length} characters",
rule.severity,
)
# Pattern matching
if rule.pattern and not rule.pattern.match(input_text):
result.add_violation(
f"Input does not match required pattern for {rule.name}", rule.severity
)
# Character allowlist/blocklist
if rule.allowed_chars:
invalid_chars = set(input_text) - rule.allowed_chars
if invalid_chars:
result.add_violation(
f"Input contains disallowed characters: {', '.join(invalid_chars)}",
rule.severity,
)
if rule.blocked_chars:
found_blocked = set(input_text) & rule.blocked_chars
if found_blocked:
result.add_violation(
f"Input contains blocked characters: {', '.join(found_blocked)}",
rule.severity,
)
# Required field check
if rule.required and not input_text.strip():
result.add_violation("Required field cannot be empty", rule.severity)
def _sanitize_for_context(
self, input_text: str, input_type: InputType, context: Optional[str]
) -> str:
"""Sanitize input based on type and context."""
if input_type == InputType.AGENT_NAME:
return self.sanitizer.sanitize_agent_name(input_text)
elif input_type == InputType.FILE_PATH:
return self.sanitizer.sanitize_file_path(input_text)
elif input_type == InputType.MESSAGE_CONTENT:
if context == "html":
return self.sanitizer.sanitize_for_html(input_text)
elif context == "shell":
return self.sanitizer.sanitize_for_shell(input_text)
elif context == "json":
return self.sanitizer.sanitize_for_json(input_text)
else:
return input_text # Keep original for general message content
else:
# Default sanitization
return self.sanitizer.sanitize_for_html(input_text)
async def _log_security_violations(
self, input_text: str, input_type: InputType, violations: List[str]
) -> None:
"""Log security violations to audit system."""
try:
audit_logger = get_audit_logger()
await audit_logger.log_event(
level=AuditLevel.CRITICAL,
category=AuditCategory.SYSTEM_HEALTH,
operation="input_validation",
resource_type="user_input",
resource_id=input_type.value,
success=False,
error_message=f"Security violations: {'; '.join(violations)}",
metadata={
"input_type": input_type.value,
"input_length": len(input_text),
"violations": violations,
},
)
except Exception:
# Don't fail validation if audit logging fails
pass
def add_custom_rule(self, input_type: InputType, rule: ValidationRule) -> None:
"""Add custom validation rule for input type."""
if input_type not in self.rules:
self.rules[input_type] = []
self.rules[input_type].append(rule)
def get_supported_types(self) -> List[InputType]:
"""Get list of supported input types."""
return list(self.rules.keys())
# Global validator instance
_input_validator_instance: Optional[InputValidator] = None
def get_input_validator() -> InputValidator:
"""Get global input validator instance."""
global _input_validator_instance
if _input_validator_instance is None:
_input_validator_instance = InputValidator()
return _input_validator_instance
# Convenience functions for common validation scenarios
def validate_agent_name(name: str) -> ValidationResult:
"""Validate agent name with security checks."""
validator = get_input_validator()
return validator.validate_input(name, InputType.AGENT_NAME)
def validate_message_content(
content: str, context: str = "general"
) -> ValidationResult:
"""Validate message content with context-aware sanitization."""
validator = get_input_validator()
return validator.validate_input(content, InputType.MESSAGE_CONTENT, context=context)
def validate_file_path(path: str) -> ValidationResult:
"""Validate file path with traversal protection."""
validator = get_input_validator()
return validator.validate_input(path, InputType.FILE_PATH)
def validate_user_id(user_id: str) -> ValidationResult:
"""Validate user ID with format checks."""
validator = get_input_validator()
return validator.validate_input(user_id, InputType.USER_ID)
def sanitize_for_safe_output(text: str, output_context: str = "html") -> str:
"""Sanitize text for safe output in specified context."""
sanitizer = InputSanitizer()
if output_context == "html":
return sanitizer.sanitize_for_html(text)
elif output_context == "shell":
return sanitizer.sanitize_for_shell(text)
elif output_context == "json":
return sanitizer.sanitize_for_json(text)
else:
return sanitizer.sanitize_for_html(text) # Default to HTML
def validate_and_sanitize(
input_text: str,
input_type: InputType,
context: Optional[str] = None,
raise_on_invalid: bool = True,
) -> str:
"""
Validate input and return sanitized version.
Raises InputValidationError if validation fails and raise_on_invalid is True.
"""
validator = get_input_validator()
result = validator.validate_input(input_text, input_type, context=context)
if not result.is_valid and raise_on_invalid:
raise InputValidationError(
f"Input validation failed: {'; '.join(result.violations)}", result
)
return result.sanitized_input or input_text
def sanitize_user_input(user_input: str, context: Optional[str] = None) -> str:
"""
Sanitize user input for safe processing.
Args:
user_input: Raw user input
context: Optional context for validation
Returns:
str: Sanitized input safe for processing
"""
return validate_and_sanitize(
user_input, InputType.MESSAGE_CONTENT, context=context, raise_on_invalid=False
)
def validate_file_path(file_path: str) -> str:
"""
Validate and sanitize file path.
Args:
file_path: Raw file path
Returns:
str: Validated file path
Raises:
InputValidationError: If file path is invalid
"""
return validate_and_sanitize(file_path, InputType.FILE_PATH, raise_on_invalid=True)
# Additional utility class for tests
class SanitizedString(str):
"""String type that has been validated and sanitized."""
def __new__(cls, value: str, sanitizer_used: str = "default"):
instance = str.__new__(cls, value)
instance.sanitizer_used = sanitizer_used
instance.is_sanitized = True
return instance
class ValidatedEmail(str):
"""String type that has been validated as a proper email address."""
def __new__(cls, value: str):
# Validate email format
validator = get_input_validator()
result = validator.validate_input(value, InputType.EMAIL)
if not result.is_valid:
raise ValidationError(f"Invalid email address: {value}")
instance = str.__new__(cls, result.sanitized_input or value)
instance.is_validated = True
instance.validation_result = result
return instance