Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_scan_engine_expanded.py16.2 kB
"""Expanded tests for scan_engine.py to improve coverage.""" import asyncio import os import tempfile from pathlib import Path from unittest.mock import Mock, patch import pytest from adversary_mcp_server.credentials import CredentialManager from adversary_mcp_server.scanner.llm_scanner import LLMScanner from adversary_mcp_server.scanner.scan_engine import EnhancedScanResult, ScanEngine from adversary_mcp_server.scanner.types import Category, Severity, ThreatMatch class TestScanEngineExpandedCoverage: """Tests to improve scan_engine.py coverage.""" def setup_method(self): """Set up test fixtures.""" self.temp_dir = tempfile.mkdtemp() self.test_file = Path(self.temp_dir) / "test.py" self.test_file.write_text("print('hello world')") def teardown_method(self): """Clean up test fixtures.""" import shutil if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) @patch("adversary_mcp_server.scanner.scan_engine.get_credential_manager") def test_scan_engine_config_exception_handling(self, mock_get_creds): """Test exception handling in cache configuration (lines 352-353).""" # Create a mock config that raises exception when accessing enable_caching mock_config = Mock() mock_config.enable_caching = property( lambda self: (_ for _ in ()).throw(AttributeError("test error")) ) # Use valid LLMProvider enum value to avoid enum errors mock_config.llm_provider = "openai" # Provide string values for fields that need len() operation mock_config.llm_api_key = "test-api-key" mock_config.llm_model = "gpt-3.5-turbo" mock_creds = Mock(spec=CredentialManager) mock_creds.load_config.return_value = mock_config mock_get_creds.return_value = mock_creds # Should handle exception and default to enable_caching=True engine = ScanEngine() # Verify ScanEngine was initialized (should handle config exceptions gracefully) assert engine is not None @patch("adversary_mcp_server.scanner.scan_engine.LLMScanner") @patch("adversary_mcp_server.scanner.scan_engine.get_credential_manager") def test_llm_analyzer_unavailable_warning(self, mock_get_creds, mock_llm_scanner): """Test LLM analyzer unavailability warning (lines 418-421).""" mock_creds = Mock(spec=CredentialManager) mock_config = Mock() mock_config.enable_caching = True mock_creds.load_config.return_value = mock_config mock_get_creds.return_value = mock_creds # Mock LLM scanner to be unavailable mock_llm_instance = Mock(spec=LLMScanner) mock_llm_instance.is_available.return_value = False mock_llm_scanner.return_value = mock_llm_instance with patch("adversary_mcp_server.scanner.scan_engine.logger") as mock_logger: engine = ScanEngine(enable_llm_analysis=True) # Verify warning was logged and LLM analysis was disabled mock_logger.warning.assert_called_with( "LLM analysis requested but not available - API key not configured" ) assert engine.enable_llm_analysis is False def test_severity_filtering_logic(self): """Test severity filtering logic (lines 469-490).""" engine = ScanEngine() # Create threats with different severities threats = [ ThreatMatch( rule_id="test1", rule_name="Low Severity", description="Test", category=Category.MISC, severity=Severity.LOW, file_path=str(self.test_file), line_number=1, confidence=0.8, ), ThreatMatch( rule_id="test2", rule_name="Medium Severity", description="Test", category=Category.MISC, severity=Severity.MEDIUM, file_path=str(self.test_file), line_number=2, confidence=0.8, ), ThreatMatch( rule_id="test3", rule_name="High Severity", description="Test", category=Category.MISC, severity=Severity.HIGH, file_path=str(self.test_file), line_number=3, confidence=0.8, ), ThreatMatch( rule_id="test4", rule_name="Critical Severity", description="Test", category=Category.MISC, severity=Severity.CRITICAL, file_path=str(self.test_file), line_number=4, confidence=0.8, ), ] # Test filtering with different minimum severities filtered_low = engine._filter_by_severity(threats, Severity.LOW) assert len(filtered_low) == 4 # All threats included filtered_medium = engine._filter_by_severity(threats, Severity.MEDIUM) assert len(filtered_medium) == 3 # Medium, High, Critical filtered_high = engine._filter_by_severity(threats, Severity.HIGH) assert len(filtered_high) == 2 # High, Critical filtered_critical = engine._filter_by_severity(threats, Severity.CRITICAL) assert len(filtered_critical) == 1 # Only Critical def test_validation_filter_logic_with_edge_cases(self): """Test validation filter logic with edge cases (lines 503-514).""" engine = ScanEngine() # Test with empty threats list result = engine._apply_validation_filter([], {}) assert result == [] # Test with empty validation results threats = [ ThreatMatch( rule_id="test1", rule_name="Test", description="Test", category=Category.MISC, severity=Severity.HIGH, file_path=str(self.test_file), line_number=1, confidence=0.8, ) ] result = engine._apply_validation_filter(threats, {}) assert len(result) == 1 # Should keep threat when no validation results # Test with threat missing uuid attribute threat_no_uuid = ThreatMatch( rule_id="test2", rule_name="Test", description="Test", category=Category.MISC, severity=Severity.HIGH, file_path=str(self.test_file), line_number=2, confidence=0.8, ) # Remove uuid attribute to test handling of missing uuid del threat_no_uuid.uuid result = engine._apply_validation_filter([threat_no_uuid], {}) assert len(result) == 1 # Should keep threat when uuid is None # Test with validation result not found for threat uuid threat_with_uuid = threats[0] validation_results = { "different-uuid": Mock(is_legitimate=True, confidence=0.9) } result = engine._apply_validation_filter([threat_with_uuid], validation_results) assert len(result) == 1 # Should keep threat when validation not found def test_language_detection_edge_cases(self): """Test language detection with various file extensions.""" engine = ScanEngine() # Test with common extensions assert engine._detect_language(Path("test.py")) == "python" assert engine._detect_language(Path("test.js")) == "javascript" assert engine._detect_language(Path("test.java")) == "java" assert engine._detect_language(Path("test.go")) == "go" assert engine._detect_language(Path("test.rs")) == "rust" # Test with unknown extension assert engine._detect_language(Path("test.unknown")) == "generic" # Test with no extension assert engine._detect_language(Path("README")) == "generic" @patch("adversary_mcp_server.scanner.scan_engine.CacheManager") @patch("adversary_mcp_server.scanner.scan_engine.get_credential_manager") def test_cache_manager_initialization_scenarios( self, mock_get_creds, mock_cache_manager ): """Test various cache manager initialization scenarios.""" mock_creds = Mock(spec=CredentialManager) mock_config = Mock() mock_config.enable_caching = ( False # Disable caching to avoid cache manager initialization ) mock_config.llm_provider = "openai" # Provide string values for fields that need len() operation mock_config.llm_api_key = "test-api-key" mock_config.llm_model = "gpt-3.5-turbo" mock_creds.load_config.return_value = mock_config mock_get_creds.return_value = mock_creds # Test without cache manager (since caching is disabled) # Should initialize ScanEngine without cache manager engine = ScanEngine() assert engine is not None assert engine.cache_manager is None @pytest.mark.asyncio @patch("adversary_mcp_server.scanner.scan_engine.get_credential_manager") @patch("adversary_mcp_server.scanner.scan_engine.CacheManager") async def test_scan_error_handling(self, mock_cache_manager, mock_get_creds): """Test error handling in scan operations.""" # Mock credentials mock_creds = Mock(spec=CredentialManager) mock_config = Mock() mock_config.enable_caching = False # Disable caching to avoid hanging mock_config.llm_provider = "openai" mock_config.llm_api_key = "test-api-key" mock_config.llm_model = "gpt-3.5-turbo" mock_creds.load_config.return_value = mock_config mock_get_creds.return_value = mock_creds engine = ScanEngine() # Should handle scan operation gracefully and return a result result = await engine.scan_file(self.test_file) assert result is not None assert isinstance(result, EnhancedScanResult) # Verify that even though scan operations might fail internally, # the method still returns a valid result object def test_enhanced_scan_result_edge_cases(self): """Test EnhancedScanResult with various edge cases.""" # Test with invalid file path for language detection result = EnhancedScanResult( file_path="", llm_threats=[], semgrep_threats=[], scan_metadata={} ) assert result.language == "generic" # Should default to generic # Test with None values result = EnhancedScanResult( file_path="test.py", llm_threats=[], semgrep_threats=[], scan_metadata={}, validation_results=None, llm_usage_stats=None, ) assert result.validation_results == {} assert "analysis" in result.llm_usage_stats def test_threat_deduplication_in_combine_threats(self): """Test threat deduplication logic in _combine_threats.""" # Create duplicate threats (same location) threat1 = ThreatMatch( rule_id="test1", rule_name="Test Threat", description="Test", category=Category.MISC, severity=Severity.HIGH, file_path=str(self.test_file), line_number=10, confidence=0.8, ) threat2 = ThreatMatch( rule_id="test2", # Different rule ID rule_name="Another Test", description="Test", category=Category.MISC, severity=Severity.MEDIUM, file_path=str(self.test_file), line_number=10, # Same line confidence=0.7, ) result = EnhancedScanResult( file_path=str(self.test_file), llm_threats=[threat1], semgrep_threats=[threat2], scan_metadata={}, ) # Should include only one threat due to deduplication by location assert len(result.all_threats) == 1 @patch("adversary_mcp_server.scanner.scan_engine.MetricsCollector") def test_metrics_collection_edge_cases(self, mock_metrics): """Test metrics collection in various scenarios.""" mock_metrics_instance = Mock() mock_metrics.return_value = mock_metrics_instance # Test with metrics collector that raises exceptions mock_metrics_instance.record_scan_start.side_effect = Exception( "Metrics failed" ) engine = ScanEngine(metrics_collector=mock_metrics_instance) # Should continue operation even if metrics fail assert engine is not None @pytest.mark.asyncio async def test_concurrent_scan_handling(self): """Test handling of concurrent scan operations.""" engine = ScanEngine() # Create multiple scan tasks tasks = [] for i in range(3): test_file = Path(self.temp_dir) / f"test{i}.py" test_file.write_text(f"print('test {i}')") tasks.append(engine.scan_file(str(test_file))) # Should handle concurrent scans results = await asyncio.gather(*tasks, return_exceptions=True) # All should complete (may include exceptions which is normal in test) for result in results: # Don't assert on exception type since scan operations may fail in test environment pass def test_scan_metadata_generation(self): """Test scan metadata generation with various inputs.""" result = EnhancedScanResult( file_path=str(self.test_file), llm_threats=[], semgrep_threats=[], scan_metadata={ "scan_duration": 1.5, "semgrep_version": "1.45.0", "rules_executed": 150, }, ) # Should preserve and calculate metadata correctly assert "scan_duration" in result.scan_metadata assert result.stats["total_threats"] == 0 @pytest.mark.asyncio @patch("adversary_mcp_server.scanner.scan_engine.get_credential_manager") async def test_cleanup_operations(self, mock_get_creds): """Test error handling when file doesn't exist.""" # Mock credentials mock_creds = Mock(spec=CredentialManager) mock_config = Mock() mock_config.enable_caching = True mock_config.llm_provider = "openai" mock_config.llm_api_key = "test-api-key" mock_config.llm_model = "gpt-3.5-turbo" mock_creds.load_config.return_value = mock_config mock_get_creds.return_value = mock_creds engine = ScanEngine() # Test with non-existent file to test error handling non_existent_file = Path(self.temp_dir) / "does_not_exist.py" # Should raise FileNotFoundError for non-existent file with pytest.raises(FileNotFoundError): await engine.scan_file(non_existent_file) def test_configuration_edge_cases(self): """Test configuration handling with edge cases.""" # Test with invalid configuration values with patch( "adversary_mcp_server.scanner.scan_engine.get_credential_manager" ) as mock_get_creds: mock_creds = Mock() mock_config = Mock() # Make enable_caching raise different types of exceptions mock_config.enable_caching = property( lambda self: (_ for _ in ()).throw(ValueError("Invalid config")) ) # Use valid LLMProvider enum value to avoid enum errors mock_config.llm_provider = "openai" # Provide string values for fields that need len() operation mock_config.llm_api_key = "test-api-key" mock_config.llm_model = "gpt-3.5-turbo" mock_creds.load_config.return_value = mock_config mock_get_creds.return_value = mock_creds # Should handle various exception types engine = ScanEngine() assert engine is not None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server