Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
test_security_integration.py31.9 kB
""" Security Integration Tests Tests security features in realistic scenarios with proper integration testing. """ import pytest import asyncio import tempfile import os from datetime import datetime, timedelta from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch from typing import Dict, Any, List from src.config.settings import Settings, Environment from src.config.auth import AuthManager, AuthProvider from src.utils.encryption import EncryptionManager from src.utils.audit import AuditLogger, AuditEvent, EventType from src.integrations.client_manager import APIClientManager class TestSecurityIntegrationScenarios: """Test realistic security scenarios""" @pytest.fixture async def secure_environment(self, temp_dir): """Set up a secure test environment""" settings = Settings( environment=Environment.PRODUCTION, encryption__master_key="secure_test_key_32_bytes_for_testing", database__data_directory=temp_dir / "data", database__cache_directory=temp_dir / "cache", database__logs_directory=temp_dir / "logs", security__require_mfa=True, security__session_timeout_minutes=30, security__max_login_attempts=3, privacy__anonymize_logs=True, audit__audit_enabled=True, audit__track_api_calls=True, audit__track_data_access=True ) # Ensure directories exist with proper permissions for directory in [settings.database.data_directory, settings.database.cache_directory, settings.database.logs_directory]: directory.mkdir(parents=True, exist_ok=True) os.chmod(directory, 0o700) # Owner read/write/execute only return settings @pytest.mark.asyncio async def test_secure_authentication_flow(self, secure_environment): """Test complete secure authentication flow with audit logging""" auth_manager = AuthManager(secure_environment) await auth_manager.initialize() audit_logger = AuditLogger(secure_environment) encryption_manager = EncryptionManager(secure_environment.encryption) # Step 1: Attempt authentication with invalid credentials with pytest.raises(Exception): await auth_manager.validate_token( provider=AuthProvider.GMAIL, user_id="test_user", token="invalid_token" ) # Log failed authentication attempt failed_auth_event = AuditEvent( event_type=EventType.SECURITY_EVENT, user_id="test_user", action="authentication_failed", resource="gmail_api", timestamp=datetime.now(), ip_address="192.168.1.100", details={"reason": "invalid_token", "provider": "gmail"} ) audit_logger.log_event(failed_auth_event) # Step 2: Store encrypted credentials sensitive_credentials = { 'access_token': 'highly_sensitive_access_token_12345', 'refresh_token': 'highly_sensitive_refresh_token_67890', 'client_secret': 'client_secret_should_be_encrypted', 'expires_at': datetime.now() + timedelta(hours=1) } # Encrypt sensitive parts before storage encrypted_credentials = sensitive_credentials.copy() encrypted_credentials['access_token'] = encryption_manager.encrypt_data(sensitive_credentials['access_token']) encrypted_credentials['refresh_token'] = encryption_manager.encrypt_data(sensitive_credentials['refresh_token']) encrypted_credentials['client_secret'] = encryption_manager.encrypt_data(sensitive_credentials['client_secret']) await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="test_user", credentials=encrypted_credentials ) # Log successful credential storage success_auth_event = AuditEvent( event_type=EventType.AUTHENTICATION, user_id="test_user", action="credentials_stored", resource="gmail_api", timestamp=datetime.now(), ip_address="192.168.1.100", details={"provider": "gmail", "encrypted": True} ) audit_logger.log_event(success_auth_event) # Step 3: Retrieve and decrypt credentials retrieved_credentials = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="test_user" ) assert retrieved_credentials is not None # Decrypt for verification decrypted_access_token = encryption_manager.decrypt_data(retrieved_credentials['access_token']) decrypted_refresh_token = encryption_manager.decrypt_data(retrieved_credentials['refresh_token']) assert decrypted_access_token == sensitive_credentials['access_token'] assert decrypted_refresh_token == sensitive_credentials['refresh_token'] # Step 4: Verify audit trail exists and is properly anonymized log_files = list(secure_environment.database.logs_directory.glob("audit_*.log")) assert len(log_files) > 0 log_content = log_files[0].read_text() # Should contain audit events assert "authentication_failed" in log_content assert "credentials_stored" in log_content # Should NOT contain sensitive data (anonymized) assert "highly_sensitive_access_token" not in log_content assert "192.168.1.100" not in log_content # IP should be anonymized # Should contain anonymized identifiers assert "test_user" not in log_content or log_content.count("test_user") < 2 # May be anonymized @pytest.mark.asyncio async def test_security_incident_detection_and_response(self, secure_environment): """Test security incident detection and automated response""" auth_manager = AuthManager(secure_environment) await auth_manager.initialize() audit_logger = AuditLogger(secure_environment) # Simulate multiple failed login attempts (brute force attack) attacker_ip = "10.0.0.1" attacker_user = "potential_attacker" failed_attempts = [] for attempt in range(5): # More than max_login_attempts (3) event = AuditEvent( event_type=EventType.SECURITY_EVENT, user_id=attacker_user, action="login_attempt_failed", resource="authentication", timestamp=datetime.now() - timedelta(minutes=5 - attempt), ip_address=attacker_ip, details={ "attempt_number": attempt + 1, "reason": "invalid_password", "user_agent": "Automated Tool v1.0" } ) audit_logger.log_event(event) failed_attempts.append(event) # Simulate account lockout trigger lockout_event = AuditEvent( event_type=EventType.SECURITY_EVENT, user_id=attacker_user, action="account_locked", resource="authentication", timestamp=datetime.now(), ip_address=attacker_ip, details={ "lockout_duration_minutes": secure_environment.security.lockout_duration_minutes, "failed_attempts_count": len(failed_attempts), "automated_response": True } ) audit_logger.log_event(lockout_event) # Verify security events were logged log_files = list(secure_environment.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() assert "login_attempt_failed" in log_content assert "account_locked" in log_content assert "automated_response" in log_content # Verify IP and user info is anonymized in production assert attacker_ip not in log_content # Should be anonymized # Test legitimate user can still authenticate after incident legitimate_credentials = { 'access_token': 'legitimate_token', 'expires_at': datetime.now() + timedelta(hours=1) } await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="legitimate_user", credentials=legitimate_credentials ) # Should work fine for legitimate user is_valid = await auth_manager.validate_token( provider=AuthProvider.GMAIL, user_id="legitimate_user", token="legitimate_token" ) assert is_valid is True @pytest.mark.asyncio async def test_data_access_privacy_controls(self, secure_environment): """Test privacy controls during data access operations""" audit_logger = AuditLogger(secure_environment) # Simulate API client manager with privacy controls with patch('src.integrations.client_manager.get_auth_manager') as mock_auth_factory: mock_auth_manager = AsyncMock() mock_auth_manager.list_tokens.return_value = [ {"status": "valid", "provider": "gmail", "expires_at": datetime.now() + timedelta(hours=1)} ] mock_auth_factory.return_value = mock_auth_manager client_manager = APIClientManager() await client_manager.initialize() client_manager.configure_service( service="gmail", client_id="test_client_id", client_secret="test_client_secret", scopes=["https://www.googleapis.com/auth/gmail.readonly"] ) # Mock Gmail client with data access logging with patch('src.integrations.gmail_client.GmailClient') as mock_gmail_class: mock_gmail_client = AsyncMock() # Mock sensitive email data sensitive_emails = [ { 'id': 'email1', 'subject': 'Confidential: Salary Information', 'from': 'hr@company.com', 'to': ['employee@company.com'], 'body': 'Your salary has been increased to $75,000', 'date': datetime.now() }, { 'id': 'email2', 'subject': 'Medical Records Request', 'from': 'doctor@clinic.com', 'to': ['patient@email.com'], 'body': 'Please find your test results attached', 'date': datetime.now() } ] mock_gmail_client.search_messages.return_value = sensitive_emails mock_gmail_class.return_value = mock_gmail_client # Perform data access with privacy controls client = await client_manager.get_client("gmail") # Log data access attempt access_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id="test_user", action="email_search_initiated", resource="gmail_messages", timestamp=datetime.now(), ip_address="192.168.1.50", details={ "query": "confidential OR medical", "max_results": 10, "privacy_mode": True } ) audit_logger.log_event(access_event) # Perform the search results = await client.search_messages(query="confidential OR medical", max_results=10) # Log data access results (with privacy controls) result_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id="test_user", action="email_search_completed", resource="gmail_messages", timestamp=datetime.now(), ip_address="192.168.1.50", details={ "results_count": len(results), "sensitive_data_detected": True, "privacy_controls_applied": True, "data_minimization": True } ) audit_logger.log_event(result_event) # Verify privacy controls are applied assert len(results) == 2 # Verify audit log contains access events log_files = list(secure_environment.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() assert "email_search_initiated" in log_content assert "email_search_completed" in log_content assert "privacy_controls_applied" in log_content # Verify sensitive data is NOT in logs assert "$75,000" not in log_content assert "test results" not in log_content assert "192.168.1.50" not in log_content # IP anonymized await client_manager.shutdown() @pytest.mark.asyncio async def test_encryption_key_rotation(self, secure_environment): """Test encryption key rotation process""" encryption_manager = EncryptionManager(secure_environment.encryption) auth_manager = AuthManager(secure_environment) await auth_manager.initialize() audit_logger = AuditLogger(secure_environment) # Step 1: Store data with original key original_data = "sensitive_information_to_be_re_encrypted" encrypted_with_old_key = encryption_manager.encrypt_data(original_data) # Store encrypted credentials credentials = { 'access_token': encrypted_with_old_key, 'expires_at': datetime.now() + timedelta(hours=1) } await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id="rotation_test_user", credentials=credentials ) # Step 2: Simulate key rotation # In real implementation, this would involve generating new key # and re-encrypting all stored data # Log key rotation event rotation_event = AuditEvent( event_type=EventType.CONFIGURATION_CHANGE, user_id="system_admin", action="encryption_key_rotation_initiated", resource="encryption_system", timestamp=datetime.now(), details={ "old_key_id": "key_001", "new_key_id": "key_002", "rotation_reason": "scheduled_rotation", "affected_records": 1 } ) audit_logger.log_event(rotation_event) # Step 3: Verify old data can still be decrypted retrieved_credentials = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id="rotation_test_user" ) decrypted_data = encryption_manager.decrypt_data(retrieved_credentials['access_token']) assert decrypted_data == original_data # Step 4: Encrypt new data with new key (simulated) new_data = "new_sensitive_information_with_new_key" encrypted_with_new_key = encryption_manager.encrypt_data(new_data) # Should be different from old encryption assert encrypted_with_new_key != encrypted_with_old_key # Both should decrypt correctly assert encryption_manager.decrypt_data(encrypted_with_old_key) == original_data assert encryption_manager.decrypt_data(encrypted_with_new_key) == new_data # Log completion completion_event = AuditEvent( event_type=EventType.CONFIGURATION_CHANGE, user_id="system_admin", action="encryption_key_rotation_completed", resource="encryption_system", timestamp=datetime.now(), details={ "new_key_id": "key_002", "records_migrated": 1, "migration_successful": True } ) audit_logger.log_event(completion_event) # Verify audit trail log_files = list(secure_environment.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() assert "encryption_key_rotation_initiated" in log_content assert "encryption_key_rotation_completed" in log_content @pytest.mark.asyncio async def test_gdpr_compliance_workflow(self, secure_environment): """Test GDPR compliance features (data export, deletion, consent)""" auth_manager = AuthManager(secure_environment) await auth_manager.initialize() audit_logger = AuditLogger(secure_environment) encryption_manager = EncryptionManager(secure_environment.encryption) user_id = "gdpr_test_user" # Step 1: Store user data with consent tracking user_data = { 'access_token': 'user_access_token', 'refresh_token': 'user_refresh_token', 'email': 'user@example.com', 'consent_given': True, 'consent_timestamp': datetime.now(), 'data_processing_purposes': ['email_management', 'productivity_analysis'], 'expires_at': datetime.now() + timedelta(hours=1) } # Encrypt sensitive data encrypted_data = user_data.copy() encrypted_data['access_token'] = encryption_manager.encrypt_data(user_data['access_token']) encrypted_data['refresh_token'] = encryption_manager.encrypt_data(user_data['refresh_token']) encrypted_data['email'] = encryption_manager.encrypt_data(user_data['email']) await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id=user_id, credentials=encrypted_data ) # Log consent consent_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id=user_id, action="consent_recorded", resource="user_data", timestamp=datetime.now(), details={ "consent_given": True, "purposes": user_data['data_processing_purposes'], "gdpr_compliant": True } ) audit_logger.log_event(consent_event) # Step 2: User requests data export (GDPR Article 20) export_request_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id=user_id, action="data_export_requested", resource="user_data", timestamp=datetime.now(), details={ "request_type": "gdpr_article_20", "format_requested": "json" } ) audit_logger.log_event(export_request_event) # Simulate data export exported_data = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id=user_id ) # Decrypt for export export_ready_data = { 'user_id': user_id, 'provider': 'gmail', 'email': encryption_manager.decrypt_data(exported_data['email']), 'consent_given': exported_data['consent_given'], 'consent_timestamp': exported_data['consent_timestamp'].isoformat(), 'data_processing_purposes': exported_data['data_processing_purposes'], 'export_timestamp': datetime.now().isoformat() } # Log successful export export_completion_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id=user_id, action="data_export_completed", resource="user_data", timestamp=datetime.now(), details={ "export_format": "json", "records_exported": 1, "sensitive_data_encrypted": False # Decrypted for export } ) audit_logger.log_event(export_completion_event) # Step 3: User requests data deletion (GDPR Article 17 - Right to be forgotten) deletion_request_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id=user_id, action="data_deletion_requested", resource="user_data", timestamp=datetime.now(), details={ "request_type": "gdpr_article_17", "reason": "user_request" } ) audit_logger.log_event(deletion_request_event) # Perform deletion deletion_success = await auth_manager.delete_credentials( provider=AuthProvider.GMAIL, user_id=user_id ) assert deletion_success is True # Verify deletion deleted_data = await auth_manager.get_user_credentials( provider=AuthProvider.GMAIL, user_id=user_id ) assert deleted_data is None # Log deletion completion deletion_completion_event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id=user_id, action="data_deletion_completed", resource="user_data", timestamp=datetime.now(), details={ "records_deleted": 1, "secure_deletion": True, "gdpr_compliant": True } ) audit_logger.log_event(deletion_completion_event) # Step 4: Verify GDPR compliance in audit logs log_files = list(secure_environment.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() # Should contain all GDPR-related events gdpr_events = [ "consent_recorded", "data_export_requested", "data_export_completed", "data_deletion_requested", "data_deletion_completed" ] for event in gdpr_events: assert event in log_content # Should indicate GDPR compliance assert "gdpr_compliant" in log_content # Should NOT contain sensitive exported data in logs assert "user@example.com" not in log_content # Should be anonymized @pytest.mark.asyncio async def test_security_monitoring_and_alerting(self, secure_environment): """Test security monitoring and alerting systems""" audit_logger = AuditLogger(secure_environment) # Enable security alerts secure_environment.audit.security_alerts_enabled = True secure_environment.audit.failed_login_alert_threshold = 3 # Simulate various security events security_events = [ # Unusual access patterns AuditEvent( event_type=EventType.SECURITY_EVENT, user_id="suspicious_user", action="unusual_access_pattern", resource="gmail_api", timestamp=datetime.now(), ip_address="203.0.113.1", # Example IP from different country details={ "pattern": "bulk_download", "data_volume": "100MB", "time_of_day": "03:00 AM", "risk_score": 0.8 } ), # Privilege escalation attempt AuditEvent( event_type=EventType.SECURITY_EVENT, user_id="standard_user", action="privilege_escalation_attempt", resource="admin_panel", timestamp=datetime.now(), details={ "attempted_action": "modify_security_settings", "current_privileges": "user", "requested_privileges": "admin", "blocked": True } ), # Data exfiltration attempt AuditEvent( event_type=EventType.SECURITY_EVENT, user_id="compromised_account", action="potential_data_exfiltration", resource="email_export", timestamp=datetime.now(), details={ "export_volume": "10000 emails", "export_destination": "external_service", "blocked": True, "alert_triggered": True } ) ] # Log all security events for event in security_events: audit_logger.log_event(event) # Simulate alert generation alert_event = AuditEvent( event_type=EventType.SECURITY_EVENT, user_id="security_system", action="security_alert_generated", resource="monitoring_system", timestamp=datetime.now(), details={ "alert_type": "multiple_security_incidents", "incidents_count": len(security_events), "risk_level": "high", "automated_response": "temporary_account_suspension", "notification_sent": True } ) audit_logger.log_event(alert_event) # Verify security events and alerts are logged log_files = list(secure_environment.database.logs_directory.glob("audit_*.log")) log_content = log_files[0].read_text() # Should contain all security events assert "unusual_access_pattern" in log_content assert "privilege_escalation_attempt" in log_content assert "potential_data_exfiltration" in log_content assert "security_alert_generated" in log_content # Should indicate protective actions were taken assert "blocked" in log_content assert "automated_response" in log_content # Sensitive data should be anonymized assert "203.0.113.1" not in log_content # IP should be anonymized in production class TestSecurityPerformance: """Test security feature performance characteristics""" @pytest.mark.asyncio async def test_encryption_performance(self, secure_environment): """Test encryption/decryption performance under load""" encryption_manager = EncryptionManager(secure_environment.encryption) # Test data of various sizes test_data_sizes = [ ("small", "small_data" * 10), # ~100 bytes ("medium", "medium_data" * 100), # ~1KB ("large", "large_data" * 1000), # ~10KB ] import time for size_name, test_data in test_data_sizes: # Measure encryption time start_time = time.time() encrypted_data = encryption_manager.encrypt_data(test_data) encryption_time = time.time() - start_time # Measure decryption time start_time = time.time() decrypted_data = encryption_manager.decrypt_data(encrypted_data) decryption_time = time.time() - start_time # Verify correctness assert decrypted_data == test_data # Performance assertions (generous limits for testing) assert encryption_time < 1.0, f"Encryption too slow for {size_name} data: {encryption_time:.3f}s" assert decryption_time < 1.0, f"Decryption too slow for {size_name} data: {decryption_time:.3f}s" @pytest.mark.asyncio async def test_concurrent_authentication_performance(self, secure_environment): """Test authentication performance with concurrent users""" auth_manager = AuthManager(secure_environment) await auth_manager.initialize() # Create multiple concurrent authentication tasks async def authenticate_user(user_id: str): credentials = { 'access_token': f'token_for_{user_id}', 'expires_at': datetime.now() + timedelta(hours=1) } await auth_manager.store_credentials( provider=AuthProvider.GMAIL, user_id=user_id, credentials=credentials ) is_valid = await auth_manager.validate_token( provider=AuthProvider.GMAIL, user_id=user_id, token=f'token_for_{user_id}' ) return is_valid # Test with multiple concurrent users import time user_count = 10 start_time = time.time() tasks = [authenticate_user(f"user_{i}") for i in range(user_count)] results = await asyncio.gather(*tasks) total_time = time.time() - start_time # Verify all authentications succeeded assert all(results), "All authentications should succeed" # Performance check - should handle concurrent load efficiently avg_time_per_user = total_time / user_count assert avg_time_per_user < 1.0, f"Average auth time too slow: {avg_time_per_user:.3f}s per user" assert total_time < 5.0, f"Total concurrent auth time too slow: {total_time:.3f}s" @pytest.mark.asyncio async def test_audit_logging_performance(self, secure_environment): """Test audit logging performance under high load""" audit_logger = AuditLogger(secure_environment) # Generate many audit events event_count = 1000 import time start_time = time.time() for i in range(event_count): event = AuditEvent( event_type=EventType.DATA_ACCESS, user_id=f"performance_test_user_{i % 10}", # 10 different users action="performance_test_action", resource="test_resource", timestamp=datetime.now(), details={"iteration": i, "test_data": f"data_{i}"} ) audit_logger.log_event(event) total_time = time.time() - start_time # Performance assertions events_per_second = event_count / total_time assert events_per_second > 100, f"Audit logging too slow: {events_per_second:.1f} events/sec" # Verify all events were logged log_files = list(secure_environment.database.logs_directory.glob("audit_*.log")) assert len(log_files) > 0 # Check that log files contain our test events total_log_content = "" for log_file in log_files: total_log_content += log_file.read_text() # Should contain performance test events assert "performance_test_action" in total_log_content

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server