Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
test_security_features.py31 kB
""" Unit tests for Security Features Tests encryption, authentication, privacy controls, and audit logging. """ import pytest import os import tempfile from datetime import datetime, timedelta from pathlib import Path from unittest.mock import MagicMock, patch, mock_open from typing import Dict, Any from src.config.settings import Settings, Environment, EncryptionSettings, SecuritySettings, PrivacySettings from src.config.auth import AuthProvider, AuthManager, TokenInfo from src.utils.encryption import EncryptionManager, DataEncryption from src.utils.audit import AuditLogger, AuditEvent, EventType class TestEncryptionSettings: """Test encryption configuration and validation""" def test_encryption_settings_defaults(self): """Test encryption settings have secure defaults""" settings = EncryptionSettings() assert settings.pbkdf2_iterations >= 50000 assert settings.argon2_time_cost >= 2 assert settings.argon2_memory_cost >= 1024 assert settings.token_expiry_hours >= 1 assert settings.refresh_token_expiry_days >= 1 def test_encryption_settings_validation(self): """Test encryption settings validation""" # Test minimum values settings = EncryptionSettings( pbkdf2_iterations=100000, argon2_time_cost=2, argon2_memory_cost=65536 ) assert settings.pbkdf2_iterations == 100000 assert settings.argon2_time_cost == 2 assert settings.argon2_memory_cost == 65536 def test_master_key_generation(self): """Test master key is properly generated""" settings = EncryptionSettings() master_key = settings.master_key.get_secret_value() assert len(master_key) > 0 assert isinstance(master_key, str) # Should be different each time settings2 = EncryptionSettings() master_key2 = settings2.master_key.get_secret_value() assert master_key != master_key2 class TestSecuritySettings: """Test security configuration and validation""" def test_security_settings_defaults(self): """Test security settings have reasonable defaults""" settings = SecuritySettings() assert settings.session_timeout_minutes >= 5 assert settings.max_login_attempts >= 3 assert settings.lockout_duration_minutes >= 5 assert settings.api_rate_limit_per_minute >= 10 assert settings.max_file_size_mb >= 1 assert len(settings.allowed_file_types) > 0 def test_rate_limiting_configuration(self): """Test rate limiting configuration""" settings = SecuritySettings( api_rate_limit_per_minute=120, cache_rate_limit_per_hour=1000 ) assert settings.api_rate_limit_per_minute == 120 assert settings.cache_rate_limit_per_hour == 1000 def test_cors_configuration(self): """Test CORS configuration""" settings = SecuritySettings( allowed_origins=["https://example.com", "https://app.example.com"] ) assert "https://example.com" in settings.allowed_origins assert "https://app.example.com" in settings.allowed_origins def test_file_upload_security(self): """Test file upload security settings""" settings = SecuritySettings( max_file_size_mb=5, allowed_file_types=[".pdf", ".txt", ".json"] ) assert settings.max_file_size_mb == 5 assert ".pdf" in settings.allowed_file_types assert ".exe" not in settings.allowed_file_types class TestPrivacySettings: """Test privacy configuration and controls""" def test_privacy_settings_defaults(self): """Test privacy settings default to privacy-protective values""" settings = PrivacySettings() assert settings.anonymize_logs is True # Privacy by default assert settings.require_explicit_consent is True assert settings.enable_data_export is True # User rights assert settings.enable_data_deletion is True # User rights assert settings.cache_retention_days >= 1 assert settings.log_retention_days >= 30 def test_data_retention_configuration(self): """Test data retention configuration""" settings = PrivacySettings( cache_retention_days=14, log_retention_days=180, audit_retention_years=3 ) assert settings.cache_retention_days == 14 assert settings.log_retention_days == 180 assert settings.audit_retention_years == 3 def test_geographic_restrictions(self): """Test geographic restriction configuration""" settings = PrivacySettings( allowed_countries=["US", "CA", "GB"], blocked_countries=["XX", "YY"] ) assert "US" in settings.allowed_countries assert "XX" in settings.blocked_countries def test_processing_purposes(self): """Test data processing purposes configuration""" settings = PrivacySettings() assert "personal_knowledge_management" in settings.processing_purposes assert len(settings.processing_purposes) > 0 class TestEncryptionManager: """Test encryption functionality""" @pytest.fixture def encryption_manager(self, test_settings): """Create an encryption manager for testing""" return EncryptionManager(test_settings.encryption) def test_encryption_manager_initialization(self, encryption_manager): """Test encryption manager initializes properly""" assert encryption_manager is not None assert encryption_manager.settings is not None def test_data_encryption_decryption(self, encryption_manager): """Test data encryption and decryption""" original_data = "This is sensitive test data that needs encryption" # Encrypt data encrypted_data = encryption_manager.encrypt_data(original_data) assert encrypted_data != original_data assert len(encrypted_data) > 0 assert isinstance(encrypted_data, str) # Decrypt data decrypted_data = encryption_manager.decrypt_data(encrypted_data) assert decrypted_data == original_data def test_different_data_produces_different_ciphertext(self, encryption_manager): """Test that different data produces different ciphertext""" data1 = "First piece of data" data2 = "Second piece of data" encrypted1 = encryption_manager.encrypt_data(data1) encrypted2 = encryption_manager.encrypt_data(data2) assert encrypted1 != encrypted2 def test_same_data_produces_different_ciphertext(self, encryption_manager): """Test that same data produces different ciphertext (due to randomness)""" data = "Same data encrypted twice" encrypted1 = encryption_manager.encrypt_data(data) encrypted2 = encryption_manager.encrypt_data(data) # Should be different due to random IV/salt assert encrypted1 != encrypted2 # But both should decrypt to the same original data assert encryption_manager.decrypt_data(encrypted1) == data assert encryption_manager.decrypt_data(encrypted2) == data def test_password_hashing(self, encryption_manager): """Test password hashing functionality""" password = "secure_test_password_123" hashed_password = encryption_manager.hash_password(password) assert hashed_password != password assert len(hashed_password) > 0 assert isinstance(hashed_password, str) # Should be able to verify the password assert encryption_manager.verify_password(password, hashed_password) is True assert encryption_manager.verify_password("wrong_password", hashed_password) is False def test_key_derivation(self, encryption_manager): """Test key derivation functionality""" password = "test_password" salt = "test_salt" key1 = encryption_manager.derive_key(password, salt) key2 = encryption_manager.derive_key(password, salt) # Same password and salt should produce same key assert key1 == key2 # Different salt should produce different key key3 = encryption_manager.derive_key(password, "different_salt") assert key1 != key3 def test_secure_random_generation(self, encryption_manager): """Test secure random data generation""" random1 = encryption_manager.generate_secure_random(32) random2 = encryption_manager.generate_secure_random(32) assert len(random1) == 32 assert len(random2) == 32 assert random1 != random2 assert isinstance(random1, bytes) def test_encryption_with_invalid_data(self, encryption_manager): """Test encryption with invalid data""" # Should handle None gracefully with pytest.raises((TypeError, ValueError)): encryption_manager.encrypt_data(None) # Should handle empty string encrypted_empty = encryption_manager.encrypt_data("") decrypted_empty = encryption_manager.decrypt_data(encrypted_empty) assert decrypted_empty == "" def test_decryption_with_invalid_data(self, encryption_manager): """Test decryption with invalid data""" # Should raise exception for invalid ciphertext with pytest.raises(Exception): encryption_manager.decrypt_data("invalid_ciphertext") with pytest.raises((TypeError, ValueError)): encryption_manager.decrypt_data(None) class TestAuthManager: """Test authentication manager functionality""" @pytest.fixture async def auth_manager(self, test_settings, temp_dir): """Create an auth manager for testing""" settings = test_settings settings.database.data_directory = temp_dir manager = AuthManager(settings) await manager.initialize() return manager @pytest.mark.asyncio async def test_auth_manager_initialization(self, auth_manager): """Test auth manager initializes properly""" assert auth_manager is not None assert auth_manager.settings is not None assert auth_manager._initialized is True @pytest.mark.asyncio async def test_store_and_retrieve_credentials(self, auth_manager): """Test storing and retrieving credentials""" credentials = { 'access_token': 'test_access_token_123', 'refresh_token': 'test_refresh_token_456', 'expires_at': datetime.now() + timedelta(hours=1), 'token_type': 'Bearer', 'scope': ['read', 'write'] } # Store credentials await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="test_user", credentials=credentials ) # Retrieve credentials retrieved = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) assert retrieved is not None assert retrieved['access_token'] == 'test_access_token_123' assert retrieved['refresh_token'] == 'test_refresh_token_456' assert retrieved['token_type'] == 'Bearer' @pytest.mark.asyncio async def test_token_validation(self, auth_manager): """Test token validation functionality""" # Store valid token credentials = { 'access_token': 'valid_token', 'expires_at': datetime.now() + timedelta(hours=1) } await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="test_user", credentials=credentials ) # Validate token is_valid = await auth_manager.validate_token( provider=AuthProvider.GMAIL, user_id="test_user", token="valid_token" ) assert is_valid is True # Test invalid token is_invalid = await auth_manager.validate_token( provider=AuthProvider.GMAIL, user_id="test_user", token="invalid_token" ) assert is_invalid is False @pytest.mark.asyncio async def test_token_expiration_handling(self, auth_manager): """Test handling of expired tokens""" # Store expired token expired_credentials = { 'access_token': 'expired_token', 'expires_at': datetime.now() - timedelta(hours=1) # Expired } await auth_manager.store_credentials( provider=AuthProvider.TWITTER, user_id="test_user", credentials=expired_credentials ) # Should detect as expired is_valid = await auth_manager.validate_token( provider=AuthProvider.TWITTER, user_id="test_user", token="expired_token" ) assert is_valid is False @pytest.mark.asyncio async def test_token_refresh(self, auth_manager): """Test token refresh functionality""" old_credentials = { 'access_token': 'old_access_token', 'refresh_token': 'refresh_token_123', 'expires_at': datetime.now() - timedelta(minutes=5) # Recently expired } await auth_manager.store_credentials( provider=AuthProvider.LINKEDIN, user_id="test_user", credentials=old_credentials ) new_credentials = { 'access_token': 'new_access_token', 'refresh_token': 'new_refresh_token', 'expires_at': datetime.now() + timedelta(hours=1) } # Simulate token refresh success = await auth_manager.refresh_token( provider=AuthProvider.LINKEDIN, user_id="test_user", new_credentials=new_credentials ) assert success is True # Verify new credentials are stored retrieved = await auth_manager.get_user_credentials( provider=AuthProvider.LINKEDIN, user_id="test_user" ) assert retrieved['access_token'] == 'new_access_token' @pytest.mark.asyncio async def test_list_tokens(self, auth_manager): """Test listing stored tokens""" # Store multiple tokens providers_and_tokens = [ (AuthProvider.GMAIL, 'gmail_token'), (AuthProvider.DRIVE, 'drive_token'), (AuthProvider.TWITTER, 'twitter_token') ] for provider, token in providers_and_tokens: credentials = { 'access_token': token, 'expires_at': datetime.now() + timedelta(hours=1) } await auth_manager.store_credentials( provider=provider, user_id="test_user", credentials=credentials ) # List all tokens tokens = await auth_manager.list_tokens() assert len(tokens) >= 3 token_providers = [token['provider'] for token in tokens] assert 'gmail' in token_providers assert 'drive' in token_providers assert 'twitter' in token_providers @pytest.mark.asyncio async def test_delete_credentials(self, auth_manager): """Test deleting stored credentials""" credentials = { 'access_token': 'token_to_delete', 'expires_at': datetime.now() + timedelta(hours=1) } await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="test_user", credentials=credentials ) # Verify it exists retrieved = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) assert retrieved is not None # Delete it success = await auth_manager.delete_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) assert success is True # Verify it's gone retrieved_after = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) assert retrieved_after is None @pytest.mark.asyncio async def test_cleanup_expired_tokens(self, auth_manager): """Test cleanup of expired tokens""" # Store mix of valid and expired tokens valid_credentials = { 'access_token': 'valid_token', 'expires_at': datetime.now() + timedelta(hours=1) } expired_credentials = { 'access_token': 'expired_token', 'expires_at': datetime.now() - timedelta(hours=1) } await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="valid_user", credentials=valid_credentials ) await auth_manager.store_credentials( provider=AuthProvider.TWITTER, user_id="expired_user", credentials=expired_credentials ) # Clean up expired tokens cleaned_count = await auth_manager.cleanup_expired_tokens() assert cleaned_count >= 1 # Verify valid token still exists valid_still_exists = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="valid_user" ) assert valid_still_exists is not None # Verify expired token is gone expired_gone = await auth_manager.get_user_credentials( provider=AuthProvider.TWITTER, user_id="expired_user" ) assert expired_gone is None class TestAuditLogger: """Test audit logging functionality""" @pytest.fixture def audit_logger(self, test_settings, temp_dir): """Create an audit logger for testing""" settings = test_settings settings.database.logs_directory = temp_dir settings.audit.audit_enabled = True return AuditLogger(settings) def test_audit_logger_initialization(self, audit_logger): """Test audit logger initializes properly""" assert audit_logger is not None assert audit_logger.settings is not None assert audit_logger.settings.audit.audit_enabled is True def test_log_authentication_event(self, audit_logger): """Test logging authentication events""" event = AuditEvent( event_type=EventType.AUTHENTICATION, user_id="test_user", action="login_success", resource="gmail_api", timestamp=datetime.now(), ip_address="192.168.1.100", user_agent="Test User Agent", details={"provider": "gmail", "method": "oauth2"} ) # Should not raise any exceptions audit_logger.log_event(event) # Verify log file was created and contains the event log_files = list(audit_logger.settings.database.logs_directory.glob("audit_*.log")) assert len(log_files) > 0 # Read the log file and verify content log_content = log_files[0].read_text() assert "login_success" in log_content assert "test_user" in log_content assert "gmail_api" in log_content def test_log_data_access_event(self, audit_logger): """Test logging data access events""" event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id="test_user", action="email_search", resource="gmail_messages", timestamp=datetime.now(), details={ "query": "project alpha", "results_count": 15, "time_range": "last_30_days" } ) audit_logger.log_event(event) # Verify event was logged log_files = list(audit_logger.settings.database.logs_directory.glob("audit_*.log")) assert len(log_files) > 0 log_content = log_files[0].read_text() assert "email_search" in log_content assert "gmail_messages" in log_content def test_log_configuration_change_event(self, audit_logger): """Test logging configuration change events""" event = AuditEvent( event_type=EventType.CONFIGURATION_CHANGE, user_id="admin_user", action="privacy_setting_changed", resource="privacy_settings", timestamp=datetime.now(), details={ "setting": "anonymize_logs", "old_value": False, "new_value": True } ) audit_logger.log_event(event) log_files = list(audit_logger.settings.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() assert "privacy_setting_changed" in log_content assert "anonymize_logs" in log_content def test_log_security_event(self, audit_logger): """Test logging security events""" event = AuditEvent( event_type=EventType.SECURITY_EVENT, user_id="potential_attacker", action="failed_login_attempt", resource="authentication", timestamp=datetime.now(), ip_address="10.0.0.1", details={ "reason": "invalid_credentials", "attempt_count": 5, "lockout_triggered": True } ) audit_logger.log_event(event) log_files = list(audit_logger.settings.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() assert "failed_login_attempt" in log_content assert "lockout_triggered" in log_content def test_audit_log_rotation(self, audit_logger, temp_dir): """Test audit log file rotation""" # Configure small max file size to trigger rotation audit_logger.settings.audit.audit_max_file_size_mb = 0.001 # Very small for testing # Log many events to trigger rotation for i in range(100): event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id=f"user_{i}", action="test_action", resource="test_resource", timestamp=datetime.now(), details={"iteration": i, "data": "x" * 1000} # Make it large ) audit_logger.log_event(event) # Should have multiple log files due to rotation log_files = list(temp_dir.glob("audit_*.log")) assert len(log_files) >= 1 # At least one file should exist def test_audit_log_with_privacy_mode(self, audit_logger): """Test audit logging with privacy mode enabled""" # Enable privacy mode audit_logger.settings.privacy.anonymize_logs = True event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id="sensitive_user@example.com", action="email_access", resource="gmail_messages", timestamp=datetime.now(), ip_address="192.168.1.100", details={ "email_from": "secret@company.com", "email_subject": "Confidential Information" } ) audit_logger.log_event(event) # Verify sensitive information is anonymized log_files = list(audit_logger.settings.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() # Original sensitive data should not appear in logs assert "sensitive_user@example.com" not in log_content assert "secret@company.com" not in log_content assert "192.168.1.100" not in log_content # But action and resource should still be logged assert "email_access" in log_content assert "gmail_messages" in log_content def test_audit_log_integrity_check(self, audit_logger): """Test audit log integrity verification""" # Log some events events = [ AuditEvent( event_type=EventType.AUTHENTICATION, user_id="user1", action="login", resource="system", timestamp=datetime.now() ), AuditEvent( event_type=EventType.DATA_ACCESS, user_id="user1", action="search", resource="emails", timestamp=datetime.now() ) ] for event in events: audit_logger.log_event(event) # Verify integrity integrity_ok = audit_logger.verify_log_integrity() assert integrity_ok is True # Corrupt the log file log_files = list(audit_logger.settings.database.logs_directory.glob("audit_*.log")) if log_files: log_file = log_files[0] corrupted_content = log_file.read_text() + "\nCORRUPTED LINE" log_file.write_text(corrupted_content) # Integrity check should now fail integrity_after_corruption = audit_logger.verify_log_integrity() assert integrity_after_corruption is False class TestSecurityIntegration: """Test integration between security components""" @pytest.mark.asyncio async def test_end_to_end_security_flow(self, test_settings, temp_dir): """Test complete security flow from authentication to audit""" # Initialize all security components test_settings.database.data_directory = temp_dir test_settings.database.logs_directory = temp_dir encryption_manager = EncryptionManager(test_settings.encryption) auth_manager = AuthManager(test_settings) await auth_manager.initialize() audit_logger = AuditLogger(test_settings) # Step 1: Store encrypted credentials sensitive_credentials = { 'access_token': 'very_sensitive_access_token_12345', 'refresh_token': 'very_sensitive_refresh_token_67890', 'expires_at': datetime.now() + timedelta(hours=1) } await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="test_user", credentials=sensitive_credentials ) # Step 2: Log authentication event auth_event = AuditEvent( event_type=EventType.AUTHENTICATION, user_id="test_user", action="credentials_stored", resource="gmail_api", timestamp=datetime.now(), details={"provider": "gmail"} ) audit_logger.log_event(auth_event) # Step 3: Retrieve and verify credentials retrieved_credentials = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) assert retrieved_credentials is not None assert retrieved_credentials['access_token'] == sensitive_credentials['access_token'] # Step 4: Log data access event access_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id="test_user", action="credentials_retrieved", resource="gmail_api", timestamp=datetime.now(), details={"success": True} ) audit_logger.log_event(access_event) # Step 5: Verify audit trail exists log_files = list(temp_dir.glob("audit_*.log")) assert len(log_files) > 0 log_content = log_files[0].read_text() assert "credentials_stored" in log_content assert "credentials_retrieved" in log_content # Step 6: Clean up await auth_manager.delete_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) # Verify cleanup worked after_cleanup = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) assert after_cleanup is None def test_production_security_validation(self): """Test that production environment enforces strong security""" # Production settings should enforce strong security with pytest.raises(ValueError, match="Master encryption key must be set"): Settings( environment=Environment.PRODUCTION, encryption__master_key="" # Empty key should fail ) with pytest.raises(ValueError, match="Audit logging must be enabled"): Settings( environment=Environment.PRODUCTION, encryption__master_key="secure_production_key_32_bytes_test", audit__audit_enabled=False # Disabled audit should fail ) with pytest.raises(ValueError, match="Session timeout too long"): Settings( environment=Environment.PRODUCTION, encryption__master_key="secure_production_key_32_bytes_test", security__session_timeout_minutes=300 # Too long should fail ) def test_security_headers_generation(self, test_settings): """Test security headers are properly generated""" headers = test_settings.get_security_headers() required_headers = [ 'X-Content-Type-Options', 'X-Frame-Options', 'X-XSS-Protection', 'Strict-Transport-Security', 'Referrer-Policy' ] for header in required_headers: assert header in headers assert headers[header] is not None # Production should include CSP prod_settings = Settings( environment=Environment.PRODUCTION, encryption__master_key="production_key_32_bytes_for_test" ) prod_headers = prod_settings.get_security_headers() assert 'Content-Security-Policy' in prod_headers

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server