"""
Message Sanitization Utilities - Agent Orchestration Platform
This module provides comprehensive message sanitization to prevent injection attacks
and ensure secure communication between Claude Desktop and agents.
Architecture Integration:
- Design Pattern: Strategy pattern for configurable sanitization rules
- Security Model: Defense-in-depth with multiple sanitization layers
- Performance Profile: O(n) linear sanitization with compiled patterns
Technical Decisions:
- Whitelist-based approach for maximum security
- Pattern compilation for performance optimization
- Configurable rules for different security levels
- Comprehensive logging of sanitization actions
Security Implementation:
- Command Injection Prevention: Remove shell metacharacters
- Escape Sequence Blocking: Prevent terminal control codes
- Length Limiting: Prevent buffer overflow attempts
- Pattern Matching: Block known attack patterns
Author: ADDER_5 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set
from src.models.security import SecurityLevel
from src.models.validation import ValidationError
from .contracts_shim import ensure, require
class SanitizationLevel(Enum):
"""Message sanitization levels."""
MINIMAL = "minimal" # Basic sanitization
STANDARD = "standard" # Default sanitization
STRICT = "strict" # Maximum security
@dataclass
class SanitizationConfig:
"""Configuration for message sanitization."""
level: SanitizationLevel = SanitizationLevel.STANDARD
max_length: int = 10000
allow_newlines: bool = True
allow_formatting: bool = True
forbidden_patterns: List[str] = field(default_factory=list)
escape_sequences: bool = True
remove_control_chars: bool = True
custom_rules: List[Any] = field(default_factory=list)
class MessageSanitizer:
"""
Comprehensive message sanitization for secure agent communication.
Implements multi-layer sanitization to prevent injection attacks
and ensure safe message delivery to Claude Code agents.
Contracts:
Preconditions:
- Configuration must be valid
- Patterns must be compilable
Postconditions:
- All dangerous characters removed or escaped
- Message length within limits
- No injection patterns present
Invariants:
- Sanitization is idempotent
- Original message never modified
- Sanitized output always safe
"""
# Shell metacharacters that could enable command injection
SHELL_METACHARACTERS = {";", "|", "&", "$", "`", "(", ")", "<", ">", "\\"}
# Control characters that could manipulate terminal
CONTROL_CHARS = set(chr(i) for i in range(0, 32)) - {"\n", "\t", "\r"}
# Common injection patterns
INJECTION_PATTERNS = [
r"\$\{.*?\}", # Bash variable expansion
r"\$\(.*?\)", # Command substitution
r"`.*?`", # Backtick command substitution
r"&&.*", # Command chaining
r"\|\|.*", # Command chaining
r";\s*\w+", # Semicolon command separation
r">\s*[/\w]", # Output redirection
r"<\s*[/\w]", # Input redirection
r"\b(rm|mv|cp|chmod|chown|kill|sudo)\b", # Dangerous commands
]
def __init__(self, config: Optional[SanitizationConfig] = None):
"""Initialize sanitizer with configuration."""
self.config = config or SanitizationConfig()
self._compiled_patterns = self._compile_patterns()
self._sanitization_stats: Dict[str, int] = {
"messages_processed": 0,
"characters_removed": 0,
"patterns_blocked": 0,
"messages_truncated": 0,
}
def _compile_patterns(self) -> List[re.Pattern]:
"""Compile regex patterns for performance."""
patterns = []
# Add default injection patterns based on level
if self.config.level in [SanitizationLevel.STANDARD, SanitizationLevel.STRICT]:
for pattern in self.INJECTION_PATTERNS:
try:
patterns.append(re.compile(pattern, re.IGNORECASE))
except re.error:
# Skip invalid patterns
pass
# Add custom forbidden patterns
for pattern in self.config.forbidden_patterns:
try:
patterns.append(re.compile(pattern))
except re.error:
# Skip invalid patterns
pass
return patterns
@require(lambda self, message: message is not None)
@ensure(lambda result, self, *args: len(result) <= self.config.max_length)
def sanitize(self, message: str) -> str:
"""
Sanitize message for safe delivery to agents.
Applies multi-layer sanitization based on configured security level
to ensure message cannot be used for injection attacks.
Args:
message: Raw message to sanitize
Returns:
Sanitized message safe for agent delivery
Raises:
ValidationError: If message contains unrecoverable dangerous content
"""
if not message:
return ""
self._sanitization_stats["messages_processed"] += 1
original_length = len(message)
sanitized = message
# Layer 1: Remove control characters
if self.config.remove_control_chars:
sanitized = self._remove_control_chars(sanitized)
# Layer 2: Handle newlines based on configuration
if not self.config.allow_newlines:
sanitized = sanitized.replace("\n", " ").replace("\r", " ")
# Layer 3: Remove shell metacharacters based on level
if self.config.level in [SanitizationLevel.STANDARD, SanitizationLevel.STRICT]:
sanitized = self._remove_shell_metacharacters(sanitized)
# Layer 4: Escape quotes
if self.config.escape_sequences:
sanitized = self._escape_quotes(sanitized)
# Layer 5: Block injection patterns
if self.config.level == SanitizationLevel.STRICT:
sanitized = self._block_injection_patterns(sanitized)
# Layer 6: Apply custom rules
for rule in self.config.custom_rules:
if callable(rule):
sanitized = rule(sanitized)
# Layer 7: Enforce length limit
if len(sanitized) > self.config.max_length:
sanitized = sanitized[: self.config.max_length] + "... (truncated)"
self._sanitization_stats["messages_truncated"] += 1
# Track characters removed
self._sanitization_stats["characters_removed"] += original_length - len(
sanitized
)
return sanitized
def _remove_control_chars(self, text: str) -> str:
"""Remove control characters except allowed ones."""
return "".join(char for char in text if char not in self.CONTROL_CHARS)
def _remove_shell_metacharacters(self, text: str) -> str:
"""Remove shell metacharacters based on security level."""
if self.config.level == SanitizationLevel.STRICT:
# Remove all metacharacters
return "".join(
char for char in text if char not in self.SHELL_METACHARACTERS
)
else:
# Escape metacharacters
for char in self.SHELL_METACHARACTERS:
text = text.replace(char, f"\\{char}")
return text
def _escape_quotes(self, text: str) -> str:
"""Escape quotes to prevent string injection."""
return text.replace('"', '\\"').replace("'", "\\'")
def _block_injection_patterns(self, text: str) -> str:
"""Block known injection patterns."""
for pattern in self._compiled_patterns:
matches = pattern.findall(text)
if matches:
self._sanitization_stats["patterns_blocked"] += len(matches)
text = pattern.sub("[BLOCKED]", text)
return text
@require(lambda self, message: message is not None)
def validate_safe(self, message: str) -> bool:
"""
Validate if message is safe without modification.
Args:
message: Message to validate
Returns:
True if message is safe, False otherwise
"""
# Check for control characters
if self.config.remove_control_chars:
if any(char in self.CONTROL_CHARS for char in message):
return False
# Check for shell metacharacters
if self.config.level in [SanitizationLevel.STANDARD, SanitizationLevel.STRICT]:
if any(char in self.SHELL_METACHARACTERS for char in message):
return False
# Check for injection patterns
if self.config.level == SanitizationLevel.STRICT:
for pattern in self._compiled_patterns:
if pattern.search(message):
return False
# Check length
if len(message) > self.config.max_length:
return False
return True
def get_sanitization_stats(self) -> Dict[str, Any]:
"""Get sanitization statistics."""
return {
**self._sanitization_stats,
"config": {
"level": self.config.level.value,
"max_length": self.config.max_length,
"patterns_count": len(self._compiled_patterns),
},
}
def reset_stats(self) -> None:
"""Reset sanitization statistics."""
self._sanitization_stats = {
"messages_processed": 0,
"characters_removed": 0,
"patterns_blocked": 0,
"messages_truncated": 0,
}
def create_sanitizer_for_security_level(
security_level: SecurityLevel,
) -> MessageSanitizer:
"""
Factory function to create sanitizer based on security level.
Args:
security_level: Security level for the session
Returns:
Configured message sanitizer
"""
if security_level == SecurityLevel.HIGH:
config = SanitizationConfig(
level=SanitizationLevel.STRICT,
max_length=5000,
allow_newlines=False,
allow_formatting=False,
escape_sequences=True,
remove_control_chars=True,
)
elif security_level == SecurityLevel.MEDIUM:
config = SanitizationConfig(
level=SanitizationLevel.STANDARD,
max_length=10000,
allow_newlines=True,
allow_formatting=True,
escape_sequences=True,
remove_control_chars=True,
)
else: # LOW
config = SanitizationConfig(
level=SanitizationLevel.MINIMAL,
max_length=20000,
allow_newlines=True,
allow_formatting=True,
escape_sequences=False,
remove_control_chars=False,
)
return MessageSanitizer(config)
# Export public API
__all__ = [
"MessageSanitizer",
"SanitizationConfig",
"SanitizationLevel",
"create_sanitizer_for_security_level",
]