"""
Contract Verification and Security Integration Tests
Architecture Integration:
- Testing Patterns: Contract verification with real security implementations
- Security Model: End-to-end testing of all security contracts and boundaries
- Integration: Full system testing with all security components
Technical Decisions:
- Contract Testing: Verification that all security contracts hold under real conditions
- Integration Testing: Testing interaction between all security modules
- Failure Testing: Ensuring security failures are handled correctly
Dependencies & Integration:
- External: pytest for test framework, tempfile for isolated test environments
- Internal: All security modules for comprehensive integration testing
Quality Assurance:
- Test Coverage: All security contracts tested in realistic scenarios
- Security Focus: Tests designed to verify security boundaries under stress
Author: Adder_2 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import pytest
import tempfile
import asyncio
import json
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import secrets
# Import all security modules for integration testing
from src.boundaries.crypto import (
KeyManager, StateEncryption, AuditSigning,
EncryptionResult, SignatureResult, CryptoError
)
from src.validators.filesystem import (
FilesystemBoundaryEnforcer, SecurityContext, ResourceLimits,
FilesystemSecurityError, PermissionLevel
)
from src.validators.input import (
InputSanitizer, ValidationError, InputType,
SanitizedString, ValidationResult
)
from src.contracts.security import (
SecurityViolation, SecurityViolationType, SecurityLevel,
ContractValidationContext, SecurityContractError,
security_contract, agent_creation_contract, agent_deletion_contract,
validate_agent_name, validate_session_name, set_security_context,
get_security_context
)
class TestSecurityContractIntegration:
"""Integration tests for security contracts with real implementations."""
def setup_method(self):
"""Setup complete security environment for testing."""
self.temp_dir = Path(tempfile.mkdtemp())
# Initialize all security components
self.key_manager = KeyManager(self.temp_dir / "keys")
self.state_encryption = StateEncryption(self.key_manager)
self.audit_signing = AuditSigning(self.key_manager)
# Setup filesystem security
self.session_root = self.temp_dir / "session"
self.session_root.mkdir()
self.security_context_fs = SecurityContext(
session_root=self.session_root,
resource_limits=ResourceLimits()
)
self.fs_enforcer = FilesystemBoundaryEnforcer(self.security_context_fs)
# Setup input sanitizer
self.input_sanitizer = InputSanitizer()
# Setup contract validation context
self.contract_context = ContractValidationContext(
agent_id="Agent_1",
session_id="test_session_123",
security_level=SecurityLevel.CONFIDENTIAL,
filesystem_enforcer=self.fs_enforcer,
key_manager=self.key_manager,
audit_signing=self.audit_signing,
state_encryption=self.state_encryption
)
# Set global security context
set_security_context(self.contract_context)
def teardown_method(self):
"""Cleanup test environment."""
set_security_context(None)
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_agent_creation_contract_success(self):
"""Test successful agent creation with all security contracts."""
@agent_creation_contract
def create_test_agent(agent_name: str, session_id: str) -> Dict[str, Any]:
"""Mock agent creation function with security contracts."""
# Validate inputs using our security validators
sanitized_name = self.input_sanitizer.sanitize_agent_name(agent_name)
# Create agent state
agent_state = {
'agent_id': sanitized_name,
'session_id': session_id,
'name': sanitized_name,
'status': 'CREATED',
'created_at': datetime.utcnow().isoformat()
}
# Encrypt agent state
key_id, _ = self.key_manager.generate_encryption_key()
encrypted_state = self.state_encryption.encrypt_state(agent_state, key_id)
return {
'agent_id': sanitized_name,
'encrypted_state': encrypted_state.encrypted_data,
'key_id': key_id
}
# Test successful creation
result = create_test_agent("Agent_42", "test_session_123")
assert result['agent_id'] == "Agent_42"
assert 'encrypted_state' in result
assert 'key_id' in result
# Verify no security violations
context = get_security_context()
assert not context.has_violations()
def test_agent_creation_contract_failure_invalid_name(self):
"""Test agent creation contract failure with invalid name."""
@agent_creation_contract
def create_test_agent_bad_name(agent_name: str, session_id: str) -> Dict[str, Any]:
# This should fail validation
sanitized_name = self.input_sanitizer.sanitize_agent_name(agent_name)
return {'agent_id': sanitized_name}
# Test with invalid agent name
with pytest.raises(ValidationError):
create_test_agent_bad_name("invalid_name", "test_session_123")
def test_agent_creation_contract_failure_no_context(self):
"""Test agent creation contract failure without proper security context."""
# Clear security context
set_security_context(None)
@agent_creation_contract
def create_test_agent_no_context(agent_name: str, session_id: str) -> Dict[str, Any]:
return {'agent_id': agent_name}
# Should fail due to missing security context
with pytest.raises(SecurityContractError):
create_test_agent_no_context("Agent_1", "test_session")
def test_agent_deletion_contract_success(self):
"""Test successful agent deletion with security contracts."""
@agent_deletion_contract
def delete_test_agent(agent_id: str) -> Dict[str, Any]:
"""Mock agent deletion with security contracts."""
# Validate agent exists and can be deleted
sanitized_id = self.input_sanitizer.sanitize_agent_name(agent_id)
# Create deletion audit entry
deletion_data = {
'operation': 'agent_deletion',
'agent_id': sanitized_id,
'timestamp': datetime.utcnow().isoformat(),
'deleted_by': self.contract_context.agent_id
}
# Sign audit entry
key_id, _, _ = self.key_manager.generate_signing_key()
signature_result = self.audit_signing.sign_audit_entry(deletion_data, key_id)
return {
'agent_id': sanitized_id,
'deleted': True,
'audit_signature': signature_result.signature
}
# Test successful deletion
result = delete_test_agent("Agent_42")
assert result['agent_id'] == "Agent_42"
assert result['deleted'] is True
assert 'audit_signature' in result
def test_filesystem_contract_integration(self):
"""Test filesystem access contracts with boundary enforcement."""
@security_contract(
operation="filesystem_access",
required_level=SecurityLevel.INTERNAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def access_file(file_path: str) -> str:
"""Mock file access with security contracts."""
# Validate and sanitize file path
sanitized_path = self.input_sanitizer.sanitize_file_path(file_path)
# Check filesystem boundaries
validated_path = self.fs_enforcer.validate_path(sanitized_path, "file_read")
# Create test file
test_file = validated_path
test_file.parent.mkdir(parents=True, exist_ok=True)
test_file.write_text("test content")
return test_file.read_text()
# Test successful file access
test_path = "test_subdir/test_file.txt"
content = access_file(test_path)
assert content == "test content"
def test_filesystem_contract_boundary_violation(self):
"""Test filesystem access contract with boundary violation."""
@security_contract(
operation="filesystem_access",
required_level=SecurityLevel.INTERNAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def access_file_dangerous(file_path: str) -> str:
"""Mock file access that should fail."""
sanitized_path = self.input_sanitizer.sanitize_file_path(file_path)
validated_path = self.fs_enforcer.validate_path(sanitized_path, "file_read")
return "should_not_reach_here"
# Test with path traversal attempt
with pytest.raises(ValidationError):
access_file_dangerous("../../../etc/passwd")
def test_state_encryption_contract_integration(self):
"""Test state encryption with security contracts."""
@security_contract(
operation="state_persistence",
required_level=SecurityLevel.CONFIDENTIAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def save_agent_state(agent_state: Dict[str, Any]) -> EncryptionResult:
"""Mock state saving with encryption."""
# Generate encryption key
key_id, _ = self.key_manager.generate_encryption_key()
# Encrypt state
encryption_result = self.state_encryption.encrypt_state(agent_state, key_id)
return encryption_result
# Test state encryption
test_state = {
'agent_id': 'Agent_1',
'status': 'ACTIVE',
'last_update': datetime.utcnow().isoformat()
}
encrypted_result = save_agent_state(test_state)
assert encrypted_result.encrypted_data is not None
assert encrypted_result.key_id is not None
# Test decryption
decrypted_state = self.state_encryption.decrypt_state(encrypted_result)
assert decrypted_state == test_state
def test_audit_logging_contract_integration(self):
"""Test audit logging with security contracts."""
@security_contract(
operation="audit_logging",
required_level=SecurityLevel.INTERNAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def log_security_event(event_data: Dict[str, Any]) -> SignatureResult:
"""Mock security event logging."""
# Generate signing key
key_id, _, _ = self.key_manager.generate_signing_key()
# Sign audit data
signature_result = self.audit_signing.sign_audit_entry(event_data, key_id)
return signature_result
# Test audit logging
event_data = {
'event_type': 'AGENT_CREATED',
'agent_id': 'Agent_1',
'timestamp': datetime.utcnow().isoformat(),
'details': 'Agent successfully created'
}
signature_result = log_security_event(event_data)
assert signature_result.signature is not None
assert signature_result.key_id is not None
# Verify signature
is_valid = self.audit_signing.verify_audit_signature(signature_result, event_data)
assert is_valid is True
def test_security_level_enforcement(self):
"""Test security level enforcement in contracts."""
# Lower security context to INTERNAL
low_context = ContractValidationContext(
agent_id="Agent_1",
session_id="test_session",
security_level=SecurityLevel.INTERNAL, # Lower than required
key_manager=self.key_manager,
audit_signing=self.audit_signing
)
set_security_context(low_context)
@security_contract(
operation="high_security_operation",
required_level=SecurityLevel.SECRET, # Requires higher level
requires_agent=True,
requires_session=True,
audit_required=True
)
def high_security_operation() -> str:
return "sensitive_data"
# Should fail due to insufficient security level
with pytest.raises(SecurityContractError) as exc_info:
high_security_operation()
assert exc_info.value.violation.violation_type == SecurityViolationType.AUTHORIZATION_FAILURE
def test_contract_violation_handling(self):
"""Test contract violation handling and recovery."""
# Create context that will fail requirements
bad_context = ContractValidationContext(
agent_id=None, # Missing agent
session_id="test_session",
security_level=SecurityLevel.INTERNAL
# Missing required components
)
set_security_context(bad_context)
@security_contract(
operation="requires_all_context",
required_level=SecurityLevel.INTERNAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def operation_requiring_full_context() -> str:
return "success"
# Should fail and accumulate violations
with pytest.raises(SecurityContractError):
operation_requiring_full_context()
# Check that violations were recorded
context = get_security_context()
assert context.has_violations()
assert len(context.violations) > 0
def test_input_validation_contract_integration(self):
"""Test input validation integration with contracts."""
@security_contract(
operation="message_processing",
required_level=SecurityLevel.INTERNAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def process_message(message: str, agent_name: str) -> Dict[str, Any]:
"""Process message with full validation."""
# Validate all inputs
sanitized_message = self.input_sanitizer.sanitize_message_content(message)
sanitized_agent = self.input_sanitizer.sanitize_agent_name(agent_name)
return {
'message': sanitized_message,
'agent': sanitized_agent,
'processed_at': datetime.utcnow().isoformat()
}
# Test with valid inputs
result = process_message("Hello, this is a test message!", "Agent_42")
assert result['message'] is not None
assert result['agent'] == "Agent_42"
assert 'processed_at' in result
def test_end_to_end_security_workflow(self):
"""Test complete end-to-end security workflow."""
# Step 1: Agent Creation
@agent_creation_contract
def create_agent(agent_name: str, session_id: str) -> Dict[str, Any]:
sanitized_name = self.input_sanitizer.sanitize_agent_name(agent_name)
agent_state = {
'agent_id': sanitized_name,
'session_id': session_id,
'status': 'CREATED',
'created_at': datetime.utcnow().isoformat()
}
# Encrypt state
key_id, _ = self.key_manager.generate_encryption_key()
encrypted_state = self.state_encryption.encrypt_state(agent_state, key_id)
return {
'agent_id': sanitized_name,
'state_key_id': key_id,
'encrypted_state': encrypted_state
}
# Step 2: Message Processing
@security_contract(
operation="message_transmission",
required_level=SecurityLevel.INTERNAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def send_message(agent_id: str, message: str) -> Dict[str, Any]:
sanitized_message = self.input_sanitizer.sanitize_message_content(message)
message_data = {
'from_agent': self.contract_context.agent_id,
'to_agent': agent_id,
'message': sanitized_message,
'timestamp': datetime.utcnow().isoformat()
}
# Encrypt message
key_id, _ = self.key_manager.generate_encryption_key()
encrypted_message = self.state_encryption.encrypt_state(message_data, key_id)
# Audit the transmission
audit_key_id, _, _ = self.key_manager.generate_signing_key()
audit_signature = self.audit_signing.sign_audit_entry(message_data, audit_key_id)
return {
'encrypted_message': encrypted_message,
'audit_signature': audit_signature
}
# Step 3: File Operations
@security_contract(
operation="filesystem_access",
required_level=SecurityLevel.INTERNAL,
requires_agent=True,
requires_session=True,
audit_required=True
)
def save_message_log(log_data: str) -> Path:
# Create safe log file
log_file = self.session_root / "message_logs" / "agent_messages.log"
validated_path = self.fs_enforcer.check_file_creation(log_file, len(log_data))
validated_path.parent.mkdir(parents=True, exist_ok=True)
validated_path.write_text(log_data)
return validated_path
# Execute complete workflow
# 1. Create agent
agent_result = create_agent("Agent_42", "test_session_123")
assert agent_result['agent_id'] == "Agent_42"
# 2. Send message
message_result = send_message("Agent_42", "Hello from security test!")
assert message_result['encrypted_message'] is not None
assert message_result['audit_signature'] is not None
# 3. Save log
log_path = save_message_log("Message sent successfully")
assert log_path.exists()
assert log_path.read_text() == "Message sent successfully"
# 4. Verify no security violations occurred
context = get_security_context()
assert not context.has_violations()
class TestSecurityBoundaryStress:
"""Stress testing for security boundaries under load."""
def setup_method(self):
"""Setup stress test environment."""
self.temp_dir = Path(tempfile.mkdtemp())
self.key_manager = KeyManager(self.temp_dir / "keys")
self.state_encryption = StateEncryption(self.key_manager)
def teardown_method(self):
"""Cleanup stress test environment."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_concurrent_encryption_operations(self):
"""Test concurrent encryption operations for race conditions."""
import threading
import time
results = []
errors = []
def encrypt_worker(worker_id: int):
try:
for i in range(100):
# Generate key
key_id, _ = self.key_manager.generate_encryption_key()
# Encrypt data
data = f"worker_{worker_id}_data_{i}"
encrypted = self.state_encryption.encrypt_state(data, key_id)
# Decrypt to verify
decrypted = self.state_encryption.decrypt_state(encrypted)
assert decrypted == data
results.append((worker_id, i))
except Exception as e:
errors.append((worker_id, str(e)))
# Run 10 concurrent workers
threads = []
for i in range(10):
thread = threading.Thread(target=encrypt_worker, args=(i,))
threads.append(thread)
thread.start()
# Wait for completion
for thread in threads:
thread.join()
# Verify results
assert len(errors) == 0, f"Encryption errors: {errors}"
assert len(results) == 1000 # 10 workers * 100 operations each
def test_key_rotation_under_load(self):
"""Test key rotation behavior under heavy load."""
import threading
# Generate initial keys
encryption_keys = []
for _ in range(10):
key_id, _ = self.key_manager.generate_encryption_key(validity_hours=1)
encryption_keys.append(key_id)
# Test concurrent access to keys
results = []
errors = []
def key_access_worker(worker_id: int):
try:
for i in range(50):
for key_id in encryption_keys:
try:
key, metadata = self.key_manager.get_encryption_key(key_id)
results.append((worker_id, key_id))
except Exception as e:
# Key rotation errors are acceptable
if "expired" in str(e).lower():
continue
else:
raise e
except Exception as e:
errors.append((worker_id, str(e)))
# Run concurrent workers
threads = []
for i in range(5):
thread = threading.Thread(target=key_access_worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# Should have no unexpected errors
assert len(errors) == 0, f"Key access errors: {errors}"
def test_filesystem_boundary_stress(self):
"""Test filesystem boundary enforcement under stress."""
session_root = self.temp_dir / "session"
session_root.mkdir()
context = SecurityContext(
session_root=session_root,
resource_limits=ResourceLimits(max_total_files=1000)
)
enforcer = FilesystemBoundaryEnforcer(context)
# Test many file creations
for i in range(100):
file_path = f"test_files/file_{i}.txt"
try:
validated_path = enforcer.check_file_creation(file_path, 1024)
# Create the file
validated_path.parent.mkdir(parents=True, exist_ok=True)
validated_path.write_text(f"Content for file {i}")
# Update resource usage
enforcer.update_resource_usage(validated_path, "file_created")
except Exception as e:
# Resource limit errors are acceptable
if "limit" in str(e).lower():
break
else:
raise e
# Verify resource tracking
usage = enforcer.get_resource_usage()
assert usage['file_count'] >= 0
assert usage['total_size'] >= 0
# Export test classes
__all__ = [
'TestSecurityContractIntegration',
'TestSecurityBoundaryStress'
]
if __name__ == "__main__":
# Run integration tests
pytest.main([__file__, "-v"])