Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_telemetry_adapter_coverage.py26.4 kB
"""Comprehensive tests for telemetry_adapter.py to increase coverage.""" from datetime import datetime from unittest.mock import Mock, patch from adversary_mcp_server.application.adapters.telemetry_adapter import TelemetryAdapter from adversary_mcp_server.domain.value_objects.severity_level import SeverityLevel class TestTelemetryAdapter: """Tests for TelemetryAdapter class.""" def test_init_without_telemetry_service(self): """Test TelemetryAdapter initialization without telemetry service.""" adapter = TelemetryAdapter() assert adapter._telemetry_service is None def test_init_with_telemetry_service(self): """Test TelemetryAdapter initialization with telemetry service.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) assert adapter._telemetry_service == mock_service def test_record_scan_completion_no_telemetry_service(self): """Test record_scan_completion when no telemetry service is available.""" adapter = TelemetryAdapter() mock_scan_result = Mock() # Should not crash and return early adapter.record_scan_completion(mock_scan_result) # No assertion needed - just ensuring it doesn't crash def test_record_scan_completion_success(self): """Test successful scan completion recording.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) # Create mock threat mock_threat = Mock() mock_threat.rule_id = "test-rule" mock_threat.rule_name = "Test Rule" mock_threat.category = "injection" mock_threat.severity = SeverityLevel.HIGH mock_threat.file_path = "test.py" mock_threat.line_number = 10 mock_threat.confidence = Mock() mock_threat.confidence.get_value.return_value = 0.8 mock_threat.source_scanner = "semgrep" mock_threat.is_false_positive = False # Create mock scan result mock_scan_result = Mock() mock_scan_result.scan_id = "scan-123" mock_scan_result.threats = [mock_threat] mock_scan_result.metadata = { "scan_type": "file", "target_info": {"path": "/test/file.py", "language": "python"}, "execution_stats": {"total_duration_ms": 1500}, } with patch( "adversary_mcp_server.application.adapters.telemetry_adapter.safe_convert_to_input_model" ) as mock_convert: mock_metadata_input = Mock() mock_metadata_input.scan_type = "file" mock_metadata_input.target_info = { "path": "/test/file.py", "language": "python", } mock_metadata_input.execution_stats = {"total_duration_ms": 1500} mock_convert.return_value = mock_metadata_input adapter.record_scan_completion(mock_scan_result) # Verify scan event was recorded mock_service.record_scan_event.assert_called_once_with( scan_id="scan-123", scan_type="file", target_path="/test/file.py", language="python", threats_found=1, duration_ms=1500, success=True, ) # Verify threat detection was recorded mock_service.record_threat_detection.assert_called_once() def test_record_scan_completion_no_scan_id(self): """Test scan completion recording with no scan ID.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) # Create mock scan result without scan_id mock_scan_result = Mock() mock_scan_result.scan_id = None mock_scan_result.threats = [] mock_scan_result.metadata = {} with patch( "adversary_mcp_server.application.adapters.telemetry_adapter.safe_convert_to_input_model" ) as mock_convert: mock_metadata_input = Mock() mock_metadata_input.scan_type = "unknown" mock_metadata_input.target_info = {} mock_metadata_input.execution_stats = {} mock_convert.return_value = mock_metadata_input adapter.record_scan_completion(mock_scan_result) # Should use "unknown" as scan_id mock_service.record_scan_event.assert_called_once() args, kwargs = mock_service.record_scan_event.call_args assert kwargs["scan_id"] == "unknown" def test_record_scan_completion_no_metadata(self): """Test scan completion recording with no metadata.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) # Create mock scan result without metadata mock_scan_result = Mock() mock_scan_result.scan_id = "scan-123" mock_scan_result.threats = [] mock_scan_result.metadata = None with ( patch( "adversary_mcp_server.application.adapters.telemetry_adapter.safe_convert_to_input_model" ) as mock_convert, patch( "adversary_mcp_server.application.adapters.telemetry_adapter.MetadataInput" ) as mock_metadata_class, ): mock_metadata_input = Mock() mock_metadata_input.scan_type = "unknown" mock_metadata_input.target_info = {} mock_metadata_input.execution_stats = {} mock_metadata_class.return_value = mock_metadata_input adapter.record_scan_completion(mock_scan_result) # Should create default MetadataInput mock_metadata_class.assert_called_once() mock_service.record_scan_event.assert_called_once() def test_record_scan_completion_exception_handling(self): """Test exception handling in record_scan_completion.""" mock_service = Mock() mock_service.record_scan_event.side_effect = Exception("Service error") adapter = TelemetryAdapter(telemetry_service=mock_service) mock_scan_result = Mock() mock_scan_result.scan_id = "scan-123" mock_scan_result.threats = [] mock_scan_result.metadata = {} with patch( "adversary_mcp_server.application.adapters.telemetry_adapter.safe_convert_to_input_model" ): # Should not crash despite exception adapter.record_scan_completion(mock_scan_result) def test_record_threat_detection_no_telemetry_service(self): """Test record_threat_detection when no telemetry service is available.""" adapter = TelemetryAdapter() mock_threat = Mock() # Should not crash and return early adapter.record_threat_detection(mock_threat, "scan-123") def test_record_threat_detection_no_threat(self): """Test record_threat_detection with None threat.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) # Should not crash and return early adapter.record_threat_detection(None, "scan-123") mock_service.record_threat_detection.assert_not_called() def test_record_threat_detection_success(self): """Test successful threat detection recording.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) # Create mock threat mock_threat = Mock() mock_threat.rule_id = "test-rule" mock_threat.rule_name = "Test Rule" mock_threat.category = "injection" mock_threat.severity = SeverityLevel.HIGH mock_threat.file_path = "test.py" mock_threat.line_number = 10 mock_threat.confidence = Mock() mock_threat.confidence.get_value.return_value = 0.8 mock_threat.source_scanner = "semgrep" mock_threat.is_false_positive = False adapter.record_threat_detection(mock_threat, "scan-123") mock_service.record_threat_detection.assert_called_once_with( scan_id="scan-123", rule_id="test-rule", rule_name="Test Rule", category="injection", severity="high", file_path="test.py", line_number=10, confidence=0.8, scanner="semgrep", is_false_positive=False, ) def test_record_threat_detection_exception_handling(self): """Test exception handling in record_threat_detection.""" mock_service = Mock() mock_service.record_threat_detection.side_effect = Exception("Service error") adapter = TelemetryAdapter(telemetry_service=mock_service) mock_threat = Mock() mock_threat.rule_id = "test-rule" mock_threat.rule_name = "Test Rule" mock_threat.category = "injection" mock_threat.severity = SeverityLevel.HIGH mock_threat.file_path = "test.py" mock_threat.line_number = 10 mock_threat.confidence = Mock() mock_threat.confidence.get_value.return_value = 0.8 mock_threat.source_scanner = "semgrep" mock_threat.is_false_positive = False # Should not crash despite exception adapter.record_threat_detection(mock_threat, "scan-123") def test_record_scan_performance_no_telemetry_service(self): """Test record_scan_performance when no telemetry service is available.""" adapter = TelemetryAdapter() mock_scan_result = Mock() # Should not crash and return early adapter.record_scan_performance(mock_scan_result) def test_record_scan_performance_no_metadata(self): """Test record_scan_performance with no metadata.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) mock_scan_result = Mock() mock_scan_result.metadata = None # Should return early without calling service adapter.record_scan_performance(mock_scan_result) mock_service.record_performance_metrics.assert_not_called() def test_record_scan_performance_success(self): """Test successful scan performance recording.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) mock_scan_result = Mock() mock_scan_result.scan_id = "test-scan-123" mock_scan_result.threats = [] mock_scan_result.metadata = { "execution_stats": { "total_duration_ms": 2500, "semgrep_duration_ms": 1000, "llm_duration_ms": 1500, }, "target_info": {"size": 1024}, } with patch( "adversary_mcp_server.application.adapters.telemetry_adapter.safe_convert_to_input_model" ) as mock_convert: mock_metadata_input = Mock() mock_metadata_input.execution_stats = { "total_duration_ms": 2500, "semgrep_duration_ms": 1000, "llm_duration_ms": 1500, } mock_metadata_input.target_info = {"size": 1024} mock_convert.return_value = mock_metadata_input adapter.record_scan_performance(mock_scan_result) mock_service.record_performance_metrics.assert_called_once_with( scan_id="test-scan-123", total_duration_ms=2500, component_timings={ "total_duration_ms": 2500, "semgrep_duration_ms": 1000, "llm_duration_ms": 1500, }, file_size_bytes=1024, threat_count=0, ) def test_record_scan_performance_exception_handling(self): """Test exception handling in record_scan_performance.""" mock_service = Mock() mock_service.record_performance_metrics.side_effect = Exception("Service error") adapter = TelemetryAdapter(telemetry_service=mock_service) mock_scan_result = Mock() mock_scan_result.metadata = {"execution_stats": {}} with patch( "adversary_mcp_server.application.adapters.telemetry_adapter.safe_convert_to_input_model" ): # Should not crash despite exception adapter.record_scan_performance(mock_scan_result) def test_record_validation_results_no_telemetry_service(self): """Test record_validation_results when no telemetry service is available.""" adapter = TelemetryAdapter() # Should not crash and return early adapter.record_validation_results([], [], "scan-123") def test_record_validation_results_empty_threats(self): """Test record_validation_results with empty threat lists.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) # Should return early without calling service adapter.record_validation_results([], [], "scan-123") mock_service.record_validation_event.assert_not_called() def test_record_validation_results_success(self): """Test successful validation results recording.""" mock_service = Mock() adapter = TelemetryAdapter(telemetry_service=mock_service) # Create original threats original_threat1 = Mock() original_threat1.rule_id = "rule-1" original_threat1.confidence = Mock() original_threat1.confidence.get_value.return_value = 0.6 original_threat1.severity = SeverityLevel.HIGH original_threat2 = Mock() original_threat2.rule_id = "rule-2" original_threat2.confidence = Mock() original_threat2.confidence.get_value.return_value = 0.7 original_threat2.severity = SeverityLevel.MEDIUM # Create validated threats (one is filtered out as false positive) validated_threat = Mock() validated_threat.rule_id = "rule-1" validated_threat.confidence = Mock() validated_threat.confidence.get_value.return_value = 0.9 # Improved confidence validated_threat.severity = SeverityLevel.CRITICAL # Adjusted severity adapter.record_validation_results( [original_threat1, original_threat2], [validated_threat], "scan-123" ) mock_service.record_validation_event.assert_called_once_with( scan_id="scan-123", original_threat_count=2, validated_threat_count=1, false_positive_count=1, confidence_improvements=1, severity_adjustments=1, ) def test_record_validation_results_exception_handling(self): """Test exception handling in record_validation_results.""" mock_service = Mock() mock_service.record_validation_event.side_effect = Exception("Service error") adapter = TelemetryAdapter(telemetry_service=mock_service) mock_threat = Mock() mock_threat.rule_id = "test" mock_threat.confidence = Mock() mock_threat.confidence.get_value.return_value = 0.5 mock_threat.severity = SeverityLevel.LOW # Should not crash despite exception adapter.record_validation_results([mock_threat], [mock_threat], "scan-123") class TestDomainTelemetryAdapter: """Tests for DomainTelemetryAdapter class.""" def test_extract_scan_metrics_basic(self): """Test basic scan metrics extraction.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() # Create mock scan result mock_scan_result = Mock() mock_scan_result.scan_request = Mock() mock_scan_result.scan_request.context = Mock() mock_scan_result.scan_request.context.metadata = Mock() mock_scan_result.scan_request.context.metadata.scan_id = "scan-123" mock_scan_result.scan_request.context.metadata.scan_type = "file" mock_scan_result.scan_request.context.metadata.timestamp = datetime.now() mock_scan_result.scan_request.context.metadata.requester = "test-user" mock_scan_result.scan_request.context.metadata.timeout_seconds = 30 mock_scan_result.scan_request.context.target_path = "test.py" mock_scan_result.scan_request.context.language = "python" mock_scan_result.scan_request.enable_semgrep = True mock_scan_result.scan_request.enable_llm = False mock_scan_result.scan_request.enable_validation = True mock_scan_result.scan_request.severity_threshold = SeverityLevel.MEDIUM mock_scan_result.get_statistics.return_value = { "total_threats": 3, "by_severity": {"high": 1, "medium": 2}, "by_category": {"injection": 2, "xss": 1}, } mock_scan_result.get_active_scanners.return_value = ["semgrep"] mock_scan_result.has_critical_threats.return_value = False mock_scan_result.filter_by_confidence.return_value = Mock(threats=[]) mock_scan_result.threats = [] mock_scan_result.scan_metadata = {"scan_duration_ms": 1500} with ( patch.object(adapter, "_get_file_size", return_value=1024), patch.object(adapter, "_extract_scanner_metrics", return_value={}), patch.object(adapter, "_extract_validation_metrics", return_value=None), ): metrics = adapter.extract_scan_metrics(mock_scan_result) assert metrics["scan_id"] == "scan-123" assert metrics["scan_type"] == "file" assert metrics["requester"] == "test-user" assert metrics["total_threats"] == 3 assert metrics["file_size_bytes"] == 1024 def test_extract_threat_metrics_basic(self): """Test basic threat metrics extraction.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() # Create mock threat mock_threat = Mock() mock_threat.rule_id = "test-rule" mock_threat.rule_name = "Test Rule" mock_threat.get_fingerprint.return_value = "fingerprint-123" mock_threat.severity = ( SeverityLevel.HIGH ) # Use actual SeverityLevel instead of Mock mock_threat.category = "injection" mock_threat.confidence = Mock() mock_threat.confidence.get_decimal.return_value = 0.8 mock_threat.confidence.get_quality_level.return_value = "high" mock_threat.file_path = "test.py" mock_threat.line_number = 10 mock_threat.column_number = 5 mock_threat.source_scanner = "semgrep" mock_threat.metadata = {} mock_threat.is_false_positive = False mock_threat.exploit_examples = [] mock_threat.remediation_advice = "Fix this issue" mock_threat.code_snippet = "vulnerable code" with patch.object( adapter, "_extract_semgrep_threat_metrics", return_value={"semgrep_rule": "test"}, ): metrics = adapter.extract_threat_metrics([mock_threat]) assert len(metrics) == 1 threat_metric = metrics[0] assert threat_metric["rule_id"] == "test-rule" assert threat_metric["severity"] == "high" assert threat_metric["confidence"] == 0.8 assert threat_metric["source_scanner"] == "semgrep" def test_extract_performance_metrics_basic(self): """Test basic performance metrics extraction.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() # Create mock scan result mock_scan_result = Mock() mock_scan_result.scan_metadata = { "scan_duration_ms": 2000, "semgrep_duration_ms": 800, "llm_duration_ms": 1200, "validation_duration_ms": 300, } mock_scan_result.scan_request = Mock() mock_scan_result.scan_request.enable_semgrep = True mock_scan_result.scan_request.enable_llm = True mock_scan_result.scan_request.enable_validation = True mock_scan_result.threats = [] with ( patch.object(adapter, "_calculate_throughput", return_value=1.5), patch.object(adapter, "_estimate_lines_scanned", return_value=100), patch.object(adapter, "_count_files_processed", return_value=1), ): metrics = adapter.extract_performance_metrics(mock_scan_result) assert metrics["total_scan_time_ms"] == 2000 assert metrics["scanner_performance"]["semgrep"]["duration_ms"] == 800 assert metrics["scanner_performance"]["llm"]["duration_ms"] == 1200 assert metrics["validation_performance"]["duration_ms"] == 300 assert metrics["throughput"]["threats_per_second"] == 1.5 def test_extract_quality_metrics_no_threats(self): """Test quality metrics extraction with no threats.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() # Create mock scan result with no threats mock_scan_result = Mock() mock_scan_result.threats = [] metrics = adapter.extract_quality_metrics(mock_scan_result) assert metrics["security_score"] == 100.0 assert metrics["quality_rating"] == "A" assert metrics["risk_level"] == "low" def test_extract_quality_metrics_with_threats(self): """Test quality metrics extraction with threats.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() # Create mock threats critical_threat = Mock() critical_threat.confidence = Mock() critical_threat.confidence.get_decimal.return_value = 0.9 critical_threat.confidence.is_very_low.return_value = False critical_threat.confidence.is_low.return_value = False critical_threat.confidence.is_medium.return_value = False critical_threat.confidence.is_high.return_value = True critical_threat.confidence.is_very_high.return_value = False critical_threat.confidence.is_actionable.return_value = True high_threat = Mock() high_threat.confidence = Mock() high_threat.confidence.get_decimal.return_value = 0.8 high_threat.confidence.is_very_low.return_value = False high_threat.confidence.is_low.return_value = False high_threat.confidence.is_medium.return_value = False high_threat.confidence.is_high.return_value = True high_threat.confidence.is_very_high.return_value = False high_threat.confidence.is_actionable.return_value = True # Create mock scan result mock_scan_result = Mock() mock_scan_result.threats = [critical_threat, high_threat] def mock_get_threats_by_severity(severity): if severity == SeverityLevel.from_string("critical"): return [critical_threat] elif severity == SeverityLevel.from_string("high"): return [high_threat] elif severity == SeverityLevel.from_string("medium"): return [] elif severity == SeverityLevel.from_string("low"): return [] else: return [] mock_scan_result.get_threats_by_severity.side_effect = ( mock_get_threats_by_severity ) metrics = adapter.extract_quality_metrics(mock_scan_result) assert metrics["security_score"] == 60 # 100 - (1*25 + 1*15) = 60 assert metrics["quality_rating"] == "D" assert metrics["risk_level"] == "critical" assert abs(metrics["average_confidence"] - 0.85) < 0.01 # (0.9 + 0.8) / 2 assert metrics["actionable_threats"] == 2 def test_private_helper_methods(self): """Test private helper methods.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() # Test _get_category_breakdown mock_threat1 = Mock() mock_threat1.category = "injection" mock_threat2 = Mock() mock_threat2.category = "injection" mock_threat3 = Mock() mock_threat3.category = "xss" breakdown = adapter._get_category_breakdown( [mock_threat1, mock_threat2, mock_threat3] ) assert breakdown["injection"] == 2 assert breakdown["xss"] == 1 def test_file_size_with_content(self): """Test _get_file_size with content.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() mock_scan_result = Mock() mock_scan_result.scan_request = Mock() mock_scan_result.scan_request.context = Mock() mock_scan_result.scan_request.context.content = "test content" size = adapter._get_file_size(mock_scan_result) assert size == len(b"test content") def test_file_size_with_file_path(self): """Test _get_file_size with file path.""" from adversary_mcp_server.application.adapters.telemetry_adapter import ( DomainTelemetryAdapter, ) adapter = DomainTelemetryAdapter() mock_scan_result = Mock() mock_scan_result.scan_request = Mock() mock_scan_result.scan_request.context = Mock() mock_scan_result.scan_request.context.content = None mock_scan_result.scan_request.context.target_path = Mock() mock_scan_result.scan_request.context.target_path.exists.return_value = True mock_scan_result.scan_request.context.target_path.is_file.return_value = True mock_scan_result.scan_request.context.target_path.get_size_bytes.return_value = ( 2048 ) size = adapter._get_file_size(mock_scan_result) assert size == 2048

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server