"""
Property-Based Test Templates for Agent Orchestration Platform Security.
This module provides comprehensive property-based test templates that use Hypothesis
for security validation, including:
- Input validation property tests with malicious patterns
- Authentication and authorization property testing
- Resource boundary property validation
- State consistency property verification
- Cryptographic security property testing
- Concurrent operation safety properties
Templates are designed to be reusable across all system components.
Author: Adder_1 | Created: 2025-06-26 | Testing Infrastructure Task
"""
import pytest
import asyncio
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union
from hypothesis import given, strategies as st, assume, settings, HealthCheck
from hypothesis.stateful import RuleBasedStateMachine, invariant, rule, Bundle
from dataclasses import dataclass
import inspect
import functools
# Import our testing strategies and security framework
from tests.strategies.hypothesis_strategies import (
malicious_input_strategy, agent_name_strategy, invalid_agent_name_strategy,
agent_state_strategy, session_state_strategy, security_test_scenario_strategy,
claude_config_strategy, mcp_tool_request_strategy
)
from tests.security.penetration_tests import SecurityTestFramework
T = TypeVar('T')
# ============================================================================
# Property Test Decorators and Utilities
# ============================================================================
def property_test(
max_examples: int = 100,
deadline: int = 60000, # 60 seconds
suppress_health_checks: List[HealthCheck] = None
):
"""
Decorator for property-based tests with standard configuration.
Args:
max_examples: Maximum examples to test
deadline: Maximum time per test in milliseconds
suppress_health_checks: Health checks to suppress
"""
suppress_checks = suppress_health_checks or [HealthCheck.too_slow]
def decorator(test_func):
return settings(
max_examples=max_examples,
deadline=deadline,
suppress_health_check=suppress_checks
)(test_func)
return decorator
def security_property_test(max_examples: int = 50):
"""Decorator specifically for security property tests."""
return property_test(
max_examples=max_examples,
deadline=30000, # 30 seconds for security tests
suppress_health_checks=[HealthCheck.too_slow, HealthCheck.filter_too_much]
)
# ============================================================================
# Input Validation Property Test Templates
# ============================================================================
class InputValidationProperties:
"""Property-based test templates for input validation."""
@staticmethod
@security_property_test()
@given(malicious_input=malicious_input_strategy())
def test_input_sanitization_property(
sanitization_function: Callable[[str], str],
malicious_input: str
):
"""
Property: Input sanitization should neutralize all malicious patterns.
Args:
sanitization_function: Function to test for sanitization
malicious_input: Malicious input to test with
"""
assume(len(malicious_input) <= 10000) # Reasonable input size
try:
sanitized = sanitization_function(malicious_input)
# Property 1: Sanitized input should be safe
assert not InputValidationProperties._contains_malicious_patterns(sanitized)
# Property 2: Sanitized input should be a string
assert isinstance(sanitized, str)
# Property 3: Sanitization should not fail unexpectedly
assert sanitized is not None
except (ValueError, TypeError, SecurityError) as e:
# Expected exceptions for clearly malicious input
assert any(keyword in str(e).lower() for keyword in [
'malicious', 'invalid', 'security', 'unsafe'
])
@staticmethod
@security_property_test()
@given(
valid_name=agent_name_strategy(),
invalid_name=invalid_agent_name_strategy()
)
def test_agent_name_validation_property(
validation_function: Callable[[str], bool],
valid_name: str,
invalid_name: str
):
"""
Property: Agent name validation should accept valid names and reject invalid ones.
Args:
validation_function: Function to test for validation
valid_name: Valid agent name
invalid_name: Invalid agent name
"""
# Valid names should be accepted
assert validation_function(valid_name) == True
# Invalid names should be rejected
assert validation_function(invalid_name) == False
@staticmethod
@security_property_test()
@given(input_data=st.text(min_size=0, max_size=1000))
def test_length_validation_property(
validation_function: Callable[[str, int], bool],
input_data: str,
max_length: int = 500
):
"""
Property: Length validation should respect maximum limits.
Args:
validation_function: Function that validates input length
input_data: Input data to test
max_length: Maximum allowed length
"""
is_valid = validation_function(input_data, max_length)
if len(input_data) <= max_length:
# Should be valid if within limit
assert is_valid == True
else:
# Should be invalid if over limit
assert is_valid == False
@staticmethod
def _contains_malicious_patterns(text: str) -> bool:
"""Check if text contains malicious patterns."""
malicious_patterns = [
'<script', 'javascript:', 'data:text/html',
'DROP TABLE', 'DELETE FROM', 'INSERT INTO',
'../', '..\\\\', '/etc/passwd', 'c:\\\\windows',
'$(', '`', '${', '{{', '}}'
]
text_lower = text.lower()
return any(pattern.lower() in text_lower for pattern in malicious_patterns)
# ============================================================================
# Authentication and Authorization Property Templates
# ============================================================================
class AuthenticationProperties:
"""Property-based test templates for authentication."""
@staticmethod
@security_property_test()
@given(
username=st.text(min_size=1, max_size=50),
password=st.text(min_size=1, max_size=100),
is_valid_user=st.booleans()
)
def test_authentication_consistency_property(
auth_function: Callable[[str, str], bool],
username: str,
password: str,
is_valid_user: bool
):
"""
Property: Authentication should be consistent for same credentials.
Args:
auth_function: Authentication function to test
username: Username to test
password: Password to test
is_valid_user: Whether this should be a valid user
"""
# Mock the user database for testing
def mock_auth(user: str, pwd: str) -> bool:
if is_valid_user and user == username and pwd == password:
return True
return False
# Authentication should be consistent
result1 = mock_auth(username, password)
result2 = mock_auth(username, password)
assert result1 == result2
# Invalid credentials should fail
if not is_valid_user:
assert mock_auth(username, password) == False
@staticmethod
@security_property_test()
@given(
malicious_username=malicious_input_strategy(),
malicious_password=malicious_input_strategy()
)
def test_authentication_injection_resistance_property(
auth_function: Callable[[str, str], bool],
malicious_username: str,
malicious_password: str
):
"""
Property: Authentication should resist injection attacks.
Args:
auth_function: Authentication function to test
malicious_username: Malicious username input
malicious_password: Malicious password input
"""
assume(len(malicious_username) <= 1000)
assume(len(malicious_password) <= 1000)
try:
# Malicious credentials should never succeed in authentication
result = auth_function(malicious_username, malicious_password)
assert result == False
except (ValueError, TypeError, SecurityError):
# Expected exceptions for malicious input
pass
except Exception as e:
# Should not cause unexpected exceptions
pytest.fail(f"Unexpected exception: {type(e).__name__}: {e}")
# ============================================================================
# Resource Boundary Property Templates
# ============================================================================
class ResourceBoundaryProperties:
"""Property-based test templates for resource boundary validation."""
@staticmethod
@security_property_test()
@given(
resource_request=st.integers(min_value=0, max_value=10000),
resource_limit=st.integers(min_value=100, max_value=1000)
)
def test_resource_limit_enforcement_property(
resource_allocator: Callable[[int, int], bool],
resource_request: int,
resource_limit: int
):
"""
Property: Resource allocation should respect configured limits.
Args:
resource_allocator: Function that allocates resources
resource_request: Amount of resource requested
resource_limit: Maximum resource limit
"""
can_allocate = resource_allocator(resource_request, resource_limit)
if resource_request <= resource_limit:
# Should be able to allocate within limit
assert can_allocate == True
else:
# Should not be able to allocate over limit
assert can_allocate == False
@staticmethod
@security_property_test()
@given(
concurrent_requests=st.lists(
st.integers(min_value=1, max_value=100),
min_size=1,
max_size=20
),
total_limit=st.integers(min_value=500, max_value=2000)
)
def test_concurrent_resource_limits_property(
resource_manager: Callable[[List[int], int], Dict[int, bool]],
concurrent_requests: List[int],
total_limit: int
):
"""
Property: Concurrent resource allocation should respect total limits.
Args:
resource_manager: Function that manages concurrent allocations
concurrent_requests: List of concurrent resource requests
total_limit: Total resource limit
"""
allocation_results = resource_manager(concurrent_requests, total_limit)
# Calculate total allocated resources
total_allocated = sum(
request for request, allocated in zip(concurrent_requests, allocation_results.values())
if allocated
)
# Should not exceed total limit
assert total_allocated <= total_limit
# All results should be boolean
assert all(isinstance(result, bool) for result in allocation_results.values())
# ============================================================================
# State Consistency Property Templates
# ============================================================================
class StateConsistencyProperties:
"""Property-based test templates for state consistency validation."""
@staticmethod
@security_property_test()
@given(
initial_state=agent_state_strategy(),
operations=st.lists(
st.sampled_from(['activate', 'deactivate', 'update', 'restart']),
min_size=1,
max_size=10
)
)
def test_agent_state_consistency_property(
state_manager: Callable[[Dict[str, Any], List[str]], Dict[str, Any]],
initial_state: Dict[str, Any],
operations: List[str]
):
"""
Property: Agent state should remain consistent through operations.
Args:
state_manager: Function that manages agent state
initial_state: Initial agent state
operations: List of operations to perform
"""
final_state = state_manager(initial_state, operations)
# State should maintain required fields
required_fields = ['agent_id', 'session_id', 'name', 'status']
for field in required_fields:
assert field in final_state
assert final_state[field] is not None
# Agent ID should never change
assert final_state['agent_id'] == initial_state['agent_id']
# Status should be valid
valid_statuses = ['ACTIVE', 'IDLE', 'ERROR', 'TERMINATED']
assert final_state['status'] in valid_statuses
@staticmethod
@security_property_test()
@given(state_data=st.dictionaries(st.text(), st.text(), min_size=1, max_size=10))
def test_state_serialization_roundtrip_property(
serializer: Callable[[Dict[str, str]], str],
deserializer: Callable[[str], Dict[str, str]],
state_data: Dict[str, str]
):
"""
Property: State serialization should be roundtrip safe.
Args:
serializer: Function to serialize state
deserializer: Function to deserialize state
state_data: State data to test
"""
# Serialize and deserialize
serialized = serializer(state_data)
deserialized = deserializer(serialized)
# Should get back the same data
assert deserialized == state_data
# Serialized form should be a string
assert isinstance(serialized, str)
assert len(serialized) > 0
# ============================================================================
# Cryptographic Security Property Templates
# ============================================================================
class CryptographicProperties:
"""Property-based test templates for cryptographic security."""
@staticmethod
@security_property_test()
@given(
plaintext=st.binary(min_size=1, max_size=1000),
key=st.binary(min_size=16, max_size=64)
)
def test_encryption_roundtrip_property(
encrypt_function: Callable[[bytes, bytes], bytes],
decrypt_function: Callable[[bytes, bytes], bytes],
plaintext: bytes,
key: bytes
):
"""
Property: Encryption should be reversible with correct key.
Args:
encrypt_function: Function to encrypt data
decrypt_function: Function to decrypt data
plaintext: Data to encrypt
key: Encryption key
"""
# Encrypt and decrypt
ciphertext = encrypt_function(plaintext, key)
decrypted = decrypt_function(ciphertext, key)
# Should get back original plaintext
assert decrypted == plaintext
# Ciphertext should be different from plaintext (unless plaintext is very special)
if len(plaintext) > 16: # Avoid edge cases with very short plaintexts
assert ciphertext != plaintext
@staticmethod
@security_property_test()
@given(
data=st.binary(min_size=1, max_size=500),
correct_key=st.binary(min_size=16, max_size=64),
wrong_key=st.binary(min_size=16, max_size=64)
)
def test_encryption_key_sensitivity_property(
encrypt_function: Callable[[bytes, bytes], bytes],
decrypt_function: Callable[[bytes, bytes], bytes],
data: bytes,
correct_key: bytes,
wrong_key: bytes
):
"""
Property: Encryption should be sensitive to key changes.
Args:
encrypt_function: Function to encrypt data
decrypt_function: Function to decrypt data
data: Data to encrypt
correct_key: Correct encryption key
wrong_key: Wrong encryption key
"""
assume(correct_key != wrong_key)
# Encrypt with correct key
ciphertext = encrypt_function(data, correct_key)
try:
# Attempt to decrypt with wrong key
wrong_decryption = decrypt_function(ciphertext, wrong_key)
# Should not get back original data with wrong key
assert wrong_decryption != data
except (ValueError, TypeError, CryptographicError):
# Expected exception for wrong key
pass
# ============================================================================
# Stateful Property Testing Framework
# ============================================================================
class AgentOrchestrationStateMachine(RuleBasedStateMachine):
"""
Stateful property testing for agent orchestration system.
Tests complex interactions and state transitions in the system
using Hypothesis's stateful testing framework.
"""
def __init__(self):
super().__init__()
self.sessions: Dict[str, Dict[str, Any]] = {}
self.agents: Dict[str, Dict[str, Any]] = {}
self.operation_count = 0
# Define bundles for different types of objects
session_ids = Bundle('session_ids')
agent_ids = Bundle('agent_ids')
@rule(
target=session_ids,
session_name=st.text(min_size=1, max_size=50),
root_path=st.text(min_size=1, max_size=100)
)
def create_session(self, session_name: str, root_path: str) -> str:
"""Rule: Create a new session."""
session_id = f"session_{len(self.sessions)}"
self.sessions[session_id] = {
'session_id': session_id,
'name': session_name,
'root_path': root_path,
'agents': {},
'created_at': self.operation_count
}
self.operation_count += 1
return session_id
@rule(
target=agent_ids,
session_id=session_ids,
agent_name=agent_name_strategy()
)
def create_agent(self, session_id: str, agent_name: str) -> str:
"""Rule: Create a new agent in a session."""
assume(session_id in self.sessions)
agent_id = f"agent_{len(self.agents)}"
agent_data = {
'agent_id': agent_id,
'session_id': session_id,
'name': agent_name,
'status': 'ACTIVE',
'created_at': self.operation_count
}
self.agents[agent_id] = agent_data
self.sessions[session_id]['agents'][agent_id] = agent_data
self.operation_count += 1
return agent_id
@rule(agent_id=agent_ids)
def update_agent_status(self, agent_id: str):
"""Rule: Update agent status."""
assume(agent_id in self.agents)
new_status = st.sampled_from(['ACTIVE', 'IDLE', 'ERROR']).example()
self.agents[agent_id]['status'] = new_status
self.operation_count += 1
@rule(session_id=session_ids)
def delete_session(self, session_id: str):
"""Rule: Delete a session and all its agents."""
assume(session_id in self.sessions)
# Remove all agents in this session
agents_to_remove = [
agent_id for agent_id, agent in self.agents.items()
if agent['session_id'] == session_id
]
for agent_id in agents_to_remove:
del self.agents[agent_id]
del self.sessions[session_id]
self.operation_count += 1
@invariant()
def agent_session_consistency(self):
"""Invariant: All agents must belong to existing sessions."""
for agent in self.agents.values():
assert agent['session_id'] in self.sessions
assert agent['agent_id'] in self.sessions[agent['session_id']]['agents']
@invariant()
def session_agent_consistency(self):
"""Invariant: All session agent references must exist."""
for session in self.sessions.values():
for agent_id in session['agents']:
assert agent_id in self.agents
assert self.agents[agent_id]['session_id'] == session['session_id']
@invariant()
def unique_identifiers(self):
"""Invariant: All identifiers must be unique."""
session_ids = list(self.sessions.keys())
agent_ids = list(self.agents.keys())
assert len(session_ids) == len(set(session_ids))
assert len(agent_ids) == len(set(agent_ids))
# ============================================================================
# Property Test Template Factory
# ============================================================================
class PropertyTestTemplateFactory:
"""Factory for creating property-based test templates."""
@staticmethod
def create_validation_test(
function_to_test: Callable,
valid_strategy: st.SearchStrategy,
invalid_strategy: st.SearchStrategy
) -> Callable:
"""
Create a property test for validation functions.
Args:
function_to_test: Function to test
valid_strategy: Strategy for valid inputs
invalid_strategy: Strategy for invalid inputs
Returns:
Property test function
"""
@security_property_test()
@given(
valid_input=valid_strategy,
invalid_input=invalid_strategy
)
def test_validation_property(valid_input, invalid_input):
# Valid inputs should be accepted
assert function_to_test(valid_input) == True
# Invalid inputs should be rejected
assert function_to_test(invalid_input) == False
return test_validation_property
@staticmethod
def create_idempotence_test(
function_to_test: Callable,
input_strategy: st.SearchStrategy
) -> Callable:
"""
Create a property test for idempotent functions.
Args:
function_to_test: Function to test
input_strategy: Strategy for inputs
Returns:
Property test function
"""
@property_test()
@given(input_data=input_strategy)
def test_idempotence_property(input_data):
result1 = function_to_test(input_data)
result2 = function_to_test(result1)
# f(f(x)) should equal f(x)
assert result1 == result2
return test_idempotence_property
@staticmethod
def create_immutability_test(
function_to_test: Callable,
input_strategy: st.SearchStrategy
) -> Callable:
"""
Create a property test for immutability.
Args:
function_to_test: Function to test
input_strategy: Strategy for inputs
Returns:
Property test function
"""
@property_test()
@given(input_data=input_strategy)
def test_immutability_property(input_data):
# Capture original state
if hasattr(input_data, '__dict__'):
original_dict = input_data.__dict__.copy()
else:
original_repr = repr(input_data)
# Call function
result = function_to_test(input_data)
# Check that input wasn't modified
if hasattr(input_data, '__dict__'):
assert input_data.__dict__ == original_dict
else:
assert repr(input_data) == original_repr
return test_immutability_property
# Export main components
__all__ = [
'InputValidationProperties', 'AuthenticationProperties',
'ResourceBoundaryProperties', 'StateConsistencyProperties',
'CryptographicProperties', 'AgentOrchestrationStateMachine',
'PropertyTestTemplateFactory', 'property_test', 'security_property_test'
]