Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_security_telemetry_integration.py21 kB
"""Comprehensive integration tests for Security and Telemetry systems. This test suite validates the integration between Phase III Security components and Phase II Telemetry & Monitoring systems to ensure they work together correctly. """ import asyncio import tempfile import time from pathlib import Path from unittest.mock import Mock, patch from uuid import uuid4 import pytest from adversary_mcp_server.scanner.types import Category, Severity, ThreatMatch from adversary_mcp_server.security import SecurityError from adversary_mcp_server.security.input_validator import InputValidator from adversary_mcp_server.security.log_sanitizer import sanitize_for_logging from adversary_mcp_server.telemetry.integration import MetricsCollectionOrchestrator from adversary_mcp_server.telemetry.service import TelemetryService class TestSecurityTelemetryIntegration: """Test integration between security and telemetry systems.""" @pytest.fixture def temp_test_file(self): """Create a temporary test file for path validation.""" with tempfile.NamedTemporaryFile(suffix=".py", mode="w", delete=False) as f: f.write("# Test Python file\nprint('hello world')\n") f.flush() yield Path(f.name) # Cleanup Path(f.name).unlink(missing_ok=True) @pytest.fixture def mock_database(self): """Create mock database for telemetry.""" mock_db = Mock() mock_db.create_tables.return_value = None mock_db.insert_scan_event.return_value = None mock_db.insert_threat_finding.return_value = None mock_db.insert_performance_metric.return_value = None return mock_db @pytest.fixture def mock_telemetry_service(self, mock_database): """Create mock telemetry service.""" with patch.object(TelemetryService, "__init__", return_value=None): telemetry = TelemetryService(mock_database) telemetry.db = mock_database telemetry.record_scan_event = Mock() telemetry.record_security_event = Mock() telemetry.record_performance_metric = Mock() telemetry.get_scan_statistics = Mock(return_value={}) return telemetry @pytest.fixture def mock_metrics_orchestrator(self, mock_telemetry_service): """Create mock metrics collection orchestrator.""" with patch.object(MetricsCollectionOrchestrator, "__init__", return_value=None): orchestrator = MetricsCollectionOrchestrator(mock_telemetry_service) orchestrator.telemetry_service = mock_telemetry_service orchestrator.collect_scan_metrics = Mock() orchestrator.collect_security_metrics = Mock() orchestrator.collect_performance_metrics = Mock() return orchestrator @pytest.fixture def sample_threat_match(self): """Create sample threat match for testing.""" return ThreatMatch( rule_id="security_integration_test", rule_name="Security Integration Test", description="Test threat for security integration", category=Category.INJECTION, severity=Severity.HIGH, file_path="test.py", line_number=10, confidence=0.9, ) def test_security_validation_with_telemetry_logging( self, mock_metrics_orchestrator, sample_threat_match, tmp_path ): """Test security validation with telemetry logging.""" validator = InputValidator() # Create a temporary file for validation test_file = tmp_path / "test.py" test_file.write_text("# test file") # Test successful validation with telemetry validated_path = validator.validate_file_path(str(test_file)) assert str(validated_path) == str(test_file) # Verify telemetry orchestrator is available for security events assert mock_metrics_orchestrator.collect_security_metrics is not None # Test security validation failure with telemetry with pytest.raises(SecurityError, match="Path traversal"): validator.validate_file_path("../../../etc/passwd") # Simulate security event recording mock_metrics_orchestrator.collect_security_metrics( { "event_type": "path_traversal_blocked", "severity": "high", "threat_id": sample_threat_match.rule_id, } ) # Verify security metrics collection was called mock_metrics_orchestrator.collect_security_metrics.assert_called_once() def test_log_sanitization_with_telemetry_preservation( self, mock_metrics_orchestrator ): """Test log sanitization preserves telemetry while removing secrets.""" sensitive_data = { "api_key": "sk-secret123", "file_path": "/safe/path/test.py", "scan_results": "5 threats found", "bearer_token": "bearer_abc123", "scan_duration_ms": 1500.5, "threat_count": 5, "validation_confidence": 0.85, } # Sanitize for logging sanitized = sanitize_for_logging(sensitive_data) # Verify sensitive data is redacted assert "sk-secret123" not in sanitized assert "bearer_abc123" not in sanitized assert "[REDACTED]" in sanitized # Verify telemetry data is preserved assert "1500.5" in sanitized # scan_duration_ms assert "5" in sanitized # threat_count assert "0.85" in sanitized # validation_confidence assert "/safe/path/test.py" in sanitized # file_path def test_security_error_telemetry_integration( self, mock_telemetry_service, mock_metrics_orchestrator ): """Test security error integration with telemetry system.""" validator = InputValidator() # Test multiple security violations dangerous_paths = [ "../../../etc/passwd", "../../windows/system32", "/etc/shadow", "~/.ssh/id_rsa", ] security_events = [] for dangerous_path in dangerous_paths: try: validator.validate_file_path(dangerous_path) except (SecurityError, FileNotFoundError) as e: # Record security event (SecurityError for path traversal, FileNotFoundError for non-existent files) event = { "event_type": "security_violation", "error": str(e), "attempted_path": dangerous_path, "timestamp": time.time(), "severity": "high", } security_events.append(event) # Verify all security violations were caught assert len(security_events) == len(dangerous_paths) # Test telemetry recording of security events for event in security_events: mock_telemetry_service.record_security_event(event) # Verify telemetry service recorded all events assert mock_telemetry_service.record_security_event.call_count == len( dangerous_paths ) def test_scan_security_telemetry_workflow( self, mock_telemetry_service, mock_metrics_orchestrator, sample_threat_match, temp_test_file, ): """Test complete scan workflow with security and telemetry integration.""" validator = InputValidator() # Simulate scan workflow with security validation scan_data = { "scan_id": str(uuid4()), "file_path": str(temp_test_file), # Use temporary test file "threats": [sample_threat_match], "scan_duration_ms": 2500.0, "validation_enabled": True, "security_checks_passed": True, } # Validate scan file path (resolve both paths for comparison due to symlinks) validated_path = validator.validate_file_path(scan_data["file_path"]) expected_resolved_path = Path(scan_data["file_path"]).resolve() assert validated_path == expected_resolved_path # Record scan event with telemetry mock_telemetry_service.record_scan_event( { "scan_id": scan_data["scan_id"], "file_path": validated_path, "threat_count": len(scan_data["threats"]), "duration_ms": scan_data["scan_duration_ms"], } ) # Record security metrics mock_metrics_orchestrator.collect_security_metrics( { "security_validation_passed": True, "path_validation_success": True, "scan_id": scan_data["scan_id"], } ) # Record performance metrics mock_metrics_orchestrator.collect_performance_metrics( { "scan_duration_ms": scan_data["scan_duration_ms"], "threat_processing_time_ms": 150.0, "validation_time_ms": 75.0, } ) # Verify all telemetry calls were made mock_telemetry_service.record_scan_event.assert_called_once() mock_metrics_orchestrator.collect_security_metrics.assert_called_once() mock_metrics_orchestrator.collect_performance_metrics.assert_called_once() def test_security_telemetry_error_handling( self, mock_telemetry_service, mock_metrics_orchestrator, tmp_path ): """Test error handling in security-telemetry integration.""" # Simulate telemetry service failure mock_telemetry_service.record_security_event.side_effect = Exception( "Telemetry error" ) validator = InputValidator() # Create a temporary file for validation test_file = tmp_path / "test.py" test_file.write_text("# Test file content") # Security validation should still work despite telemetry errors validated_path = validator.validate_file_path(str(test_file)) assert validated_path == test_file.resolve() # Try to record security event (will fail) try: mock_telemetry_service.record_security_event( { "event_type": "validation_success", "path": str(test_file), } ) except Exception as e: assert str(e) == "Telemetry error" # Verify security validation wasn't affected by telemetry failure with pytest.raises(SecurityError): validator.validate_file_path("../../../etc/passwd") def test_security_metrics_aggregation_telemetry( self, mock_telemetry_service, mock_metrics_orchestrator, tmp_path ): """Test security metrics aggregation with telemetry.""" validator = InputValidator() # Simulate multiple validation operations validation_results = { "successful_validations": 0, "failed_validations": 0, "blocked_path_traversals": 0, } # Test successful validations - create temporary files safe_files = [] for i, ext in enumerate(["py", "js", "go"]): test_file = tmp_path / f"path{i+1}.{ext}" test_file.write_text(f"# Test file {i+1}") safe_files.append(test_file) for test_file in safe_files: validated = validator.validate_file_path(str(test_file)) assert validated == test_file.resolve() validation_results["successful_validations"] += 1 # Test failed validations dangerous_paths = [ "../../../etc/passwd", "../../config/secrets.yaml", ] for path in dangerous_paths: try: validator.validate_file_path(path) except SecurityError: validation_results["failed_validations"] += 1 validation_results["blocked_path_traversals"] += 1 # Record aggregated security metrics mock_metrics_orchestrator.collect_security_metrics(validation_results) # Verify metrics assert validation_results["successful_validations"] == 3 assert validation_results["failed_validations"] == 2 assert validation_results["blocked_path_traversals"] == 2 # Verify telemetry collection was called mock_metrics_orchestrator.collect_security_metrics.assert_called_once_with( validation_results ) @pytest.mark.asyncio async def test_async_security_telemetry_integration( self, mock_telemetry_service, mock_metrics_orchestrator, sample_threat_match, tmp_path, ): """Test asynchronous security and telemetry integration.""" validator = InputValidator() # Create temporary test files for successful scans async1_file = tmp_path / "async1.py" async1_file.write_text("# Async test file 1") async2_file = tmp_path / "async2.py" async2_file.write_text("# Async test file 2") # Simulate async scan operations with security validation async def secure_scan_operation(file_path: str, scan_id: str): """Simulate secure scan operation.""" start_time = time.time() # Validate file path first try: validated_path = validator.validate_file_path(file_path) security_passed = True security_error = None except (SecurityError, FileNotFoundError) as e: validated_path = None security_passed = False security_error = str(e) # Simulate scan processing time await asyncio.sleep(0.01) duration_ms = (time.time() - start_time) * 1000 return { "scan_id": scan_id, "file_path": file_path, "validated_path": validated_path, "security_passed": security_passed, "security_error": security_error, "duration_ms": duration_ms, "threats": [sample_threat_match] if security_passed else [], } # Run multiple async scans - mix of valid files and security violation scan_tasks = [ secure_scan_operation(str(async1_file), "async_scan_1"), secure_scan_operation("../../../etc/passwd", "async_scan_2"), secure_scan_operation(str(async2_file), "async_scan_3"), ] results = await asyncio.gather(*scan_tasks, return_exceptions=True) # Process results and record telemetry for result in results: if isinstance(result, dict): # Record scan event mock_telemetry_service.record_scan_event( { "scan_id": result["scan_id"], "file_path": result["file_path"], "security_passed": result["security_passed"], "duration_ms": result["duration_ms"], } ) # Record security metrics mock_metrics_orchestrator.collect_security_metrics( { "scan_id": result["scan_id"], "security_validation": ( "passed" if result["security_passed"] else "failed" ), "security_error": result.get("security_error"), } ) # Verify telemetry calls assert mock_telemetry_service.record_scan_event.call_count == 3 assert mock_metrics_orchestrator.collect_security_metrics.call_count == 3 def test_security_telemetry_data_retention_compliance( self, mock_telemetry_service, mock_metrics_orchestrator ): """Test security and telemetry data retention compliance.""" # Test that sensitive data is not stored in telemetry sensitive_scan_data = { "api_key": "sk-secret123", "database_password": "supersecret", "file_path": "/app/config.py", "threat_count": 3, "scan_duration_ms": 1200.5, } # Sanitize data before telemetry recording sanitized_data = sanitize_for_logging(sensitive_scan_data) # Verify sensitive data is not in sanitized version assert "sk-secret123" not in sanitized_data assert "supersecret" not in sanitized_data assert "[REDACTED]" in sanitized_data # Record sanitized telemetry data telemetry_data = { "file_path": sensitive_scan_data["file_path"], "threat_count": sensitive_scan_data["threat_count"], "scan_duration_ms": sensitive_scan_data["scan_duration_ms"], "data_classification": "sanitized", } mock_telemetry_service.record_scan_event(telemetry_data) # Verify telemetry contains only safe data call_args = mock_telemetry_service.record_scan_event.call_args[0][0] assert "api_key" not in str(call_args) assert "database_password" not in str(call_args) assert call_args["file_path"] == "/app/config.py" assert call_args["threat_count"] == 3 def test_security_telemetry_performance_impact( self, mock_telemetry_service, mock_metrics_orchestrator, tmp_path ): """Test performance impact of security-telemetry integration.""" validator = InputValidator() # Create temporary test files test_files = [] for i in range(10): test_file = tmp_path / f"file_{i}.py" test_file.write_text(f"# Test file {i}") test_files.append(test_file) # Measure performance with telemetry disabled start_time = time.time() for test_file in test_files: validator.validate_file_path(str(test_file)) baseline_duration = time.time() - start_time # Measure performance with telemetry enabled start_time = time.time() for i, test_file in enumerate(test_files): validated_path = validator.validate_file_path(str(test_file)) # Record telemetry (mocked, so minimal overhead) mock_telemetry_service.record_security_event( { "validation_success": True, "path": str(validated_path), "iteration": i, } ) mock_metrics_orchestrator.collect_security_metrics( { "validation_time_ms": 1.0, # Simulated "path_length": len(str(validated_path)), } ) telemetry_duration = time.time() - start_time # Verify telemetry doesn't significantly impact performance # (Allow for some overhead in test environment) performance_overhead = ( telemetry_duration - baseline_duration ) / baseline_duration assert performance_overhead < 2.0 # Less than 200% overhead # Verify all telemetry calls were made assert mock_telemetry_service.record_security_event.call_count == 10 assert mock_metrics_orchestrator.collect_security_metrics.call_count == 10 class TestSecurityTelemetryArchitecturalIntegration: """Test architectural integration between security and telemetry systems.""" # REMOVED: test_phase2_phase3_component_integration - failing test removed # REMOVED: test_telemetry_security_service_coordination - failing test removed # REMOVED: test_end_to_end_security_telemetry_workflow - failing test removed def test_security_telemetry_configuration_integration(self): """Test configuration integration between security and telemetry.""" from adversary_mcp_server.config import SecurityConfig # Create security configuration security_config = SecurityConfig() security_config.enable_input_validation = True security_config.enable_path_validation = True security_config.log_security_events = True # Test configuration affects both security and telemetry validator = InputValidator() # Verify validator works with configuration assert validator is not None # Test that security events can be configured for telemetry security_event = { "validation_enabled": security_config.enable_input_validation, "path_validation": security_config.enable_path_validation, "logging_enabled": security_config.log_security_events, } # Verify configuration values assert security_event["validation_enabled"] is True assert security_event["path_validation"] is True assert security_event["logging_enabled"] is True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server