"""
Property-Based Tests for Message Sanitization
This module provides comprehensive property-based testing for message
sanitization using Hypothesis to ensure security across all input spaces.
Author: ADDER_5 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import re
from typing import List
import pytest
from hypothesis import given, strategies as st, settings, assume
from hypothesis.stateful import RuleBasedStateMachine, rule, Bundle, initialize
from src.utils.message_sanitization import (
MessageSanitizer, SanitizationConfig, SanitizationLevel,
create_sanitizer_for_security_level
)
from src.models.security import SecurityLevel
class TestMessageSanitizationProperties:
"""Property-based tests for message sanitization."""
@given(st.text(min_size=0, max_size=10000))
@settings(max_examples=100)
def test_sanitization_idempotency(self, message):
"""Property: Sanitizing a message multiple times gives same result."""
sanitizer = MessageSanitizer()
first_sanitized = sanitizer.sanitize(message)
second_sanitized = sanitizer.sanitize(first_sanitized)
third_sanitized = sanitizer.sanitize(second_sanitized)
assert first_sanitized == second_sanitized == third_sanitized
@given(
message=st.text(min_size=0, max_size=20000),
max_length=st.integers(min_value=100, max_value=15000)
)
@settings(max_examples=50)
def test_length_constraint_respected(self, message, max_length):
"""Property: Sanitized messages never exceed configured max length."""
config = SanitizationConfig(max_length=max_length)
sanitizer = MessageSanitizer(config)
sanitized = sanitizer.sanitize(message)
assert len(sanitized) <= max_length
@given(
message=st.text(min_size=1, max_size=5000),
level=st.sampled_from(SanitizationLevel)
)
@settings(max_examples=75)
def test_sanitization_level_consistency(self, message, level):
"""Property: Higher security levels are more restrictive."""
minimal_config = SanitizationConfig(level=SanitizationLevel.MINIMAL)
standard_config = SanitizationConfig(level=SanitizationLevel.STANDARD)
strict_config = SanitizationConfig(level=SanitizationLevel.STRICT)
minimal_sanitized = MessageSanitizer(minimal_config).sanitize(message)
standard_sanitized = MessageSanitizer(standard_config).sanitize(message)
strict_sanitized = MessageSanitizer(strict_config).sanitize(message)
# Stricter levels should generally result in shorter or equal length messages
# (more characters removed)
assert len(strict_sanitized) <= len(standard_sanitized)
assert len(standard_sanitized) <= len(minimal_sanitized)
@given(
message=st.text(min_size=1, max_size=1000),
security_level=st.sampled_from(SecurityLevel)
)
@settings(max_examples=50)
def test_security_level_factory(self, message, security_level):
"""Property: Factory creates sanitizers with appropriate security."""
sanitizer = create_sanitizer_for_security_level(security_level)
sanitized = sanitizer.sanitize(message)
# All sanitizers should respect their configuration
assert len(sanitized) <= sanitizer.config.max_length
# HIGH security should be more restrictive
if security_level == SecurityLevel.HIGH:
assert sanitizer.config.level == SanitizationLevel.STRICT
elif security_level == SecurityLevel.MEDIUM:
assert sanitizer.config.level == SanitizationLevel.STANDARD
else: # LOW
assert sanitizer.config.level == SanitizationLevel.MINIMAL
@given(
message=st.text(
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd", "Zs")),
min_size=1,
max_size=1000
)
)
@settings(max_examples=50)
def test_safe_messages_preserved(self, message):
"""Property: Safe messages should be minimally altered."""
# Use only letters, numbers, and spaces - inherently safe
sanitizer = MessageSanitizer(
SanitizationConfig(level=SanitizationLevel.MINIMAL)
)
sanitized = sanitizer.sanitize(message)
# Safe messages should be preserved exactly or with minimal changes
# (whitespace normalization is acceptable)
assert len(sanitized) >= len(message) * 0.9 # Allow 10% shrinkage for whitespace
@given(
dangerous_chars=st.lists(
st.sampled_from([';', '|', '&', '$', '`', '(', ')', '<', '>', '\\']),
min_size=1,
max_size=20
),
safe_text=st.text(
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd", "Zs")),
min_size=5,
max_size=100
)
)
@settings(max_examples=30)
def test_dangerous_characters_removed(self, dangerous_chars, safe_text):
"""Property: Dangerous characters are removed in strict mode."""
# Construct message with dangerous characters
message = safe_text
for char in dangerous_chars:
message += char + safe_text[:10]
sanitizer = MessageSanitizer(
SanitizationConfig(level=SanitizationLevel.STRICT)
)
sanitized = sanitizer.sanitize(message)
# In strict mode, dangerous characters should be removed
for char in [';', '|', '&', '$', '`', '(', ')', '<', '>']:
assert char not in sanitized
@given(
injection_pattern=st.sampled_from([
"$(command)",
"${variable}",
"`command`",
"&& malicious",
"|| evil",
"; rm -rf",
"> /dev/null",
"< /etc/passwd"
]),
prefix=st.text(min_size=0, max_size=50),
suffix=st.text(min_size=0, max_size=50)
)
@settings(max_examples=40)
def test_injection_patterns_blocked(self, injection_pattern, prefix, suffix):
"""Property: Known injection patterns are blocked."""
message = prefix + injection_pattern + suffix
sanitizer = MessageSanitizer(
SanitizationConfig(level=SanitizationLevel.STRICT)
)
sanitized = sanitizer.sanitize(message)
# Injection patterns should be removed or blocked
assert "[BLOCKED]" in sanitized or injection_pattern not in sanitized
@given(
unicode_text=st.text(min_size=1, max_size=500),
emoji_text=st.text(
alphabet=st.characters(min_codepoint=0x1F600, max_codepoint=0x1F64F),
min_size=0,
max_size=20
)
)
@settings(max_examples=30)
def test_unicode_handling(self, unicode_text, emoji_text):
"""Property: Unicode text is handled safely."""
message = unicode_text + emoji_text
sanitizer = MessageSanitizer()
try:
sanitized = sanitizer.sanitize(message)
# Should not crash and should return valid string
assert isinstance(sanitized, str)
assert len(sanitized) <= sanitizer.config.max_length
except UnicodeError:
# Unicode errors are acceptable and should be handled gracefully
pass
@given(st.text(min_size=1, max_size=1000))
@settings(max_examples=50)
def test_validate_safe_consistency(self, message):
"""Property: validate_safe should be consistent with sanitization."""
sanitizer = MessageSanitizer()
is_safe = sanitizer.validate_safe(message)
sanitized = sanitizer.sanitize(message)
# If original is safe, sanitization should not change it much
if is_safe:
# Allow some minimal changes (whitespace, etc.)
assert len(sanitized) >= len(message) * 0.95
# Sanitized version should always be safe when re-validated
# (using a fresh sanitizer to avoid state issues)
fresh_sanitizer = MessageSanitizer(sanitizer.config)
assert fresh_sanitizer.validate_safe(sanitized)
class SanitizationStateMachine(RuleBasedStateMachine):
"""Stateful testing for message sanitization operations."""
messages = Bundle('messages')
sanitizers = Bundle('sanitizers')
def __init__(self):
super().__init__()
self.sanitized_messages = {}
self.sanitizer_stats = {}
@initialize()
def setup(self):
"""Initialize the state machine."""
self.sanitized_messages.clear()
self.sanitizer_stats.clear()
@rule(
target=sanitizers,
level=st.sampled_from(SanitizationLevel),
max_length=st.integers(min_value=100, max_value=5000)
)
def create_sanitizer(self, level, max_length):
"""Create a new sanitizer with given configuration."""
config = SanitizationConfig(level=level, max_length=max_length)
sanitizer = MessageSanitizer(config)
sanitizer_id = f"{level.value}_{max_length}"
self.sanitizer_stats[sanitizer_id] = {
'created': True,
'level': level,
'max_length': max_length
}
return sanitizer_id
@rule(
target=messages,
sanitizer_id=sanitizers,
message=st.text(min_size=1, max_size=2000)
)
def sanitize_message(self, sanitizer_id, message):
"""Sanitize a message using specified sanitizer."""
# Recreate sanitizer from stored config
stats = self.sanitizer_stats[sanitizer_id]
config = SanitizationConfig(
level=stats['level'],
max_length=stats['max_length']
)
sanitizer = MessageSanitizer(config)
sanitized = sanitizer.sanitize(message)
# Store sanitized message
message_id = f"{sanitizer_id}_{len(self.sanitized_messages)}"
self.sanitized_messages[message_id] = {
'original': message,
'sanitized': sanitized,
'sanitizer_id': sanitizer_id
}
# Verify invariants
assert len(sanitized) <= stats['max_length']
assert isinstance(sanitized, str)
return message_id
@rule(message_id=messages)
def verify_idempotency(self, message_id):
"""Verify that re-sanitizing gives same result."""
message_data = self.sanitized_messages[message_id]
sanitizer_stats = self.sanitizer_stats[message_data['sanitizer_id']]
# Recreate sanitizer
config = SanitizationConfig(
level=sanitizer_stats['level'],
max_length=sanitizer_stats['max_length']
)
sanitizer = MessageSanitizer(config)
# Re-sanitize the already sanitized message
re_sanitized = sanitizer.sanitize(message_data['sanitized'])
# Should be identical
assert re_sanitized == message_data['sanitized']
@rule(message_id=messages)
def verify_safety(self, message_id):
"""Verify that sanitized messages pass safety validation."""
message_data = self.sanitized_messages[message_id]
sanitizer_stats = self.sanitizer_stats[message_data['sanitizer_id']]
# Recreate sanitizer
config = SanitizationConfig(
level=sanitizer_stats['level'],
max_length=sanitizer_stats['max_length']
)
sanitizer = MessageSanitizer(config)
# Sanitized message should be considered safe
assert sanitizer.validate_safe(message_data['sanitized'])
class TestSanitizationStateMachine:
"""Test runner for stateful testing."""
def test_sanitization_state_machine(self):
"""Run the sanitization state machine test."""
# Run with smaller examples for faster execution
settings_obj = settings(max_examples=20, stateful_step_count=10)
with settings_obj:
run_state_machine_as_test(SanitizationStateMachine)
# Edge case tests
class TestSanitizationEdgeCases:
"""Tests for edge cases in message sanitization."""
def test_empty_message(self):
"""Test sanitization of empty message."""
sanitizer = MessageSanitizer()
result = sanitizer.sanitize("")
assert result == ""
def test_whitespace_only_message(self):
"""Test sanitization of whitespace-only message."""
sanitizer = MessageSanitizer()
result = sanitizer.sanitize(" \t\n\r ")
# Should handle gracefully, possibly normalize whitespace
assert len(result) <= len(" \t\n\r ")
def test_extremely_long_message(self):
"""Test sanitization of extremely long message."""
long_message = "A" * 100000
sanitizer = MessageSanitizer(SanitizationConfig(max_length=1000))
result = sanitizer.sanitize(long_message)
assert len(result) <= 1000
assert "truncated" in result
def test_all_dangerous_characters(self):
"""Test message with all dangerous characters."""
dangerous = ";|&$`()<>\\"
sanitizer = MessageSanitizer(
SanitizationConfig(level=SanitizationLevel.STRICT)
)
result = sanitizer.sanitize(dangerous)
# All dangerous characters should be removed
for char in dangerous:
assert char not in result
# Import for stateful testing
try:
from hypothesis.stateful import run_state_machine_as_test
except ImportError:
# Fallback for older hypothesis versions
def run_state_machine_as_test(machine_class):
machine = machine_class()
# Simple execution instead of full state machine
pass
# Test markers
pytestmark = [
pytest.mark.properties,
pytest.mark.security,
pytest.mark.message_sanitization
]