Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_semgrep_scanner.py69.5 kB
"""Tests for SemgrepScanner module.""" import os import subprocess import sys import time from unittest.mock import AsyncMock, MagicMock, mock_open, patch import pytest # Add the src directory to the path to import modules sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) from adversary_mcp_server.scanner.semgrep_scanner import SemgrepScanner from adversary_mcp_server.scanner.types import Category, Severity, ThreatMatch class TestSemgrepScanner: """Test SemgrepScanner class.""" def setup_method(self): """Set up test fixtures.""" self.scanner = SemgrepScanner() def teardown_method(self): """Clean up test fixtures.""" self.threat_engine = None self.scanner = None @patch("adversary_mcp_server.scanner.semgrep_scanner._SEMGREP_AVAILABLE", True) def test_check_semgrep_available_success(self): """Test successful Semgrep availability check.""" scanner = SemgrepScanner() assert scanner.is_available() is True @patch("adversary_mcp_server.scanner.semgrep_scanner._SEMGREP_AVAILABLE", False) def test_check_semgrep_available_failure(self): """Test failed Semgrep availability check.""" scanner = SemgrepScanner() assert scanner.is_available() is False def test_get_status_when_available(self): """Test get_status when Semgrep is available.""" with patch("subprocess.run") as mock_run: # Mock successful semgrep --version call mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "1.128.1" mock_run.return_value = mock_result status = self.scanner.get_status() assert status["available"] is True assert status["installation_status"] == "available" assert "1.128.1" in status["version"] assert "semgrep_path" in status assert status["has_pro_features"] is False # Conservative assumption def test_get_status_when_not_available(self): """Test get_status when Semgrep is not found.""" with patch( "subprocess.run", side_effect=FileNotFoundError("Semgrep not found") ): status = self.scanner.get_status() assert status["available"] is False assert status["installation_status"] == "not_installed" assert "Semgrep not found in PATH" in status["error"] assert "Install semgrep" in status["installation_guidance"] def test_get_status_timeout_handling(self): """Test get_status handles timeout properly.""" import subprocess with patch( "subprocess.run", side_effect=subprocess.TimeoutExpired("semgrep", 5) ): status = self.scanner.get_status() assert status["available"] is False assert status["installation_status"] == "not_installed" assert "Semgrep not found in PATH" in status["error"] def test_get_status_virtual_environment_priority(self): """Test get_status checks virtual environment first.""" with ( patch("subprocess.run") as mock_run, patch("sys.executable", "/some/venv/bin/python"), ): # Mock venv semgrep available mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "1.128.1 (venv)" mock_run.return_value = mock_result status = self.scanner.get_status() # Should call semgrep from virtual environment first mock_run.assert_called_with( ["/some/venv/bin/semgrep", "--version"], capture_output=True, text=True, timeout=5, ) assert status["available"] is True assert status["semgrep_path"] == "/some/venv/bin/semgrep" def test_get_status_fallback_to_system_path(self): """Test get_status falls back to system PATH when venv semgrep not found.""" with ( patch("subprocess.run") as mock_run, patch("sys.executable", "/some/venv/bin/python"), ): def mock_subprocess_run(cmd, **kwargs): if "/some/venv/bin/semgrep" in cmd: # Venv semgrep not found raise FileNotFoundError("Venv semgrep not found") elif "semgrep" in cmd: # System semgrep found mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "1.128.1 (system)" return mock_result else: raise FileNotFoundError("Command not found") mock_run.side_effect = mock_subprocess_run status = self.scanner.get_status() assert status["available"] is True assert status["semgrep_path"] == "semgrep" assert "system" in status["version"] def test_get_semgrep_env_info_with_api_key_from_credential_manager(self): """Test environment info with API key from credential manager (Semgrep Pro).""" mock_credential_manager = MagicMock() mock_credential_manager.get_semgrep_api_key.return_value = ( "sk-test-api-key-12345" ) scanner = SemgrepScanner(credential_manager=mock_credential_manager) env_info = scanner._get_semgrep_env_info() assert env_info["has_token"] == "true" assert env_info["semgrep_user_agent"] == "adversary-mcp-server" mock_credential_manager.get_semgrep_api_key.assert_called_once() def test_get_semgrep_env_info_without_api_key_from_credential_manager(self): """Test environment info without API key from credential manager (free tier).""" mock_credential_manager = MagicMock() mock_credential_manager.get_semgrep_api_key.return_value = None scanner = SemgrepScanner(credential_manager=mock_credential_manager) env_info = scanner._get_semgrep_env_info() assert env_info["has_token"] == "false" assert env_info["semgrep_user_agent"] == "adversary-mcp-server" mock_credential_manager.get_semgrep_api_key.assert_called_once() def test_get_semgrep_env_info_without_credential_manager(self): """Test environment info without credential manager (free tier fallback).""" scanner = SemgrepScanner(credential_manager=None) env_info = scanner._get_semgrep_env_info() assert env_info["has_token"] == "false" assert env_info["semgrep_user_agent"] == "adversary-mcp-server" def test_map_semgrep_severity(self): """Test Semgrep severity mapping.""" assert self.scanner._map_semgrep_severity("error") == Severity.CRITICAL assert self.scanner._map_semgrep_severity("critical") == Severity.CRITICAL assert self.scanner._map_semgrep_severity("warning") == Severity.HIGH assert self.scanner._map_semgrep_severity("info") == Severity.MEDIUM assert self.scanner._map_semgrep_severity("unknown") == Severity.LOW def test_map_semgrep_category(self): """Test Semgrep category mapping.""" # Test SQL injection assert ( self.scanner._map_semgrep_category("sql-injection", "SQL issue") == Category.INJECTION ) assert ( self.scanner._map_semgrep_category("sqli-test", "SQL problem") == Category.INJECTION ) # Test XSS assert ( self.scanner._map_semgrep_category("xss-vulnerability", "XSS issue") == Category.XSS ) assert ( self.scanner._map_semgrep_category("cross-site-scripting", "XSS") == Category.XSS ) # Test auth assert ( self.scanner._map_semgrep_category("auth-bypass", "Auth issue") == Category.AUTHENTICATION ) assert ( self.scanner._map_semgrep_category("jwt-vulnerability", "JWT") == Category.AUTHENTICATION ) # Test crypto assert ( self.scanner._map_semgrep_category("crypto-weakness", "Crypto") == Category.CRYPTOGRAPHY ) assert ( self.scanner._map_semgrep_category("weak-hash", "Hash") == Category.CRYPTOGRAPHY ) # Test default assert ( self.scanner._map_semgrep_category("unknown-rule", "Unknown") == Category.VALIDATION ) def test_convert_semgrep_finding_to_threat(self): """Test conversion of Semgrep finding to ThreatMatch.""" semgrep_finding = { "check_id": "python.lang.security.audit.dangerous-eval.dangerous-eval", "message": "Found 'eval' which can execute arbitrary code", "metadata": { "severity": "error", "cwe": ["CWE-95"], "owasp": "A03:2021", "references": ["https://example.com/eval-security"], }, "start": {"line": 15}, "end": {"line": 15}, "extra": {"lines": "eval(user_input)"}, } threat = self.scanner._convert_semgrep_finding_to_threat( semgrep_finding, "test.py" ) assert ( threat.rule_id == "semgrep-python.lang.security.audit.dangerous-eval.dangerous-eval" ) assert ( threat.rule_name == "Semgrep: python.lang.security.audit.dangerous-eval.dangerous-eval" ) assert threat.description == "Found 'eval' which can execute arbitrary code" assert threat.severity == Severity.CRITICAL assert threat.file_path == "test.py" assert threat.line_number == 15 assert threat.code_snippet == "eval(user_input)" # Confidence should be dynamic and higher than base due to security rule + severity assert 0.8 <= threat.confidence <= 0.95 assert threat.cwe_id == "CWE-95" assert threat.owasp_category == "A03:2021" assert threat.references == ["https://example.com/eval-security"] def test_get_file_extension(self): """Test file extension mapping.""" assert self.scanner._get_file_extension("python") == ".py" assert self.scanner._get_file_extension("javascript") == ".js" assert self.scanner._get_file_extension("typescript") == ".ts" @pytest.mark.asyncio async def test_scan_code_unavailable(self): """Test code scanning when Semgrep is unavailable.""" with patch.object(self.scanner, "is_available", return_value=False): with patch.object( self.scanner, "_perform_scan", side_effect=FileNotFoundError("semgrep not found"), ): source_code = "eval(user_input)" threats = await self.scanner.scan_code(source_code, "test.py", "python") assert threats == [] @pytest.mark.asyncio async def test_scan_file_unavailable(self): """Test file scanning when Semgrep is unavailable.""" with patch.object(self.scanner, "is_available", return_value=False): threats = await self.scanner.scan_file("test.py", "python") assert threats == [] class TestSemgrepScannerIntegration: """Integration tests for SemgrepScanner.""" def setup_method(self): """Set up test fixtures.""" self.scanner = SemgrepScanner() def teardown_method(self): """Clean up test fixtures.""" self.threat_engine = None self.scanner = None @pytest.mark.asyncio async def test_filter_by_severity_method(self): """Test the _filter_by_severity method directly.""" # Create test threats with different severities threats = [ ThreatMatch( rule_id="rule1", rule_name="Rule 1", description="Test", category=Category.INJECTION, severity=Severity.LOW, file_path="test.py", line_number=1, ), ThreatMatch( rule_id="rule2", rule_name="Rule 2", description="Test", category=Category.INJECTION, severity=Severity.MEDIUM, file_path="test.py", line_number=2, ), ThreatMatch( rule_id="rule3", rule_name="Rule 3", description="Test", category=Category.INJECTION, severity=Severity.HIGH, file_path="test.py", line_number=3, ), ThreatMatch( rule_id="rule4", rule_name="Rule 4", description="Test", category=Category.INJECTION, severity=Severity.CRITICAL, file_path="test.py", line_number=4, ), ] # Filter with MEDIUM threshold filtered = self.scanner._filter_by_severity(threats, Severity.MEDIUM) assert len(filtered) == 3 severities = [t.severity for t in filtered] assert Severity.LOW not in severities assert Severity.MEDIUM in severities assert Severity.HIGH in severities assert Severity.CRITICAL in severities # Filter with HIGH threshold filtered = self.scanner._filter_by_severity(threats, Severity.HIGH) assert len(filtered) == 2 severities = [t.severity for t in filtered] assert Severity.LOW not in severities assert Severity.MEDIUM not in severities assert Severity.HIGH in severities assert Severity.CRITICAL in severities @pytest.mark.asyncio async def test_severity_extraction_multiple_locations(self): """Test severity extraction from different locations in semgrep output.""" # Test severity in metadata.severity finding1 = { "check_id": "test.rule", "message": "Test message", "metadata": {"severity": "warning"}, "start": {"line": 1}, } threat1 = self.scanner._convert_semgrep_finding_to_threat(finding1, "test.py") assert threat1.severity == Severity.HIGH # Test severity in extra.severity finding2 = { "check_id": "test.rule", "message": "Test message", "extra": {"severity": "error"}, "start": {"line": 1}, } threat2 = self.scanner._convert_semgrep_finding_to_threat(finding2, "test.py") assert threat2.severity == Severity.CRITICAL # Test severity in top-level finding3 = { "check_id": "test.rule", "message": "Test message", "severity": "critical", "start": {"line": 1}, } threat3 = self.scanner._convert_semgrep_finding_to_threat(finding3, "test.py") assert threat3.severity == Severity.CRITICAL # Test fallback to default finding4 = { "check_id": "test.rule", "message": "Test message", "start": {"line": 1}, } threat4 = self.scanner._convert_semgrep_finding_to_threat(finding4, "test.py") assert ( threat4.severity == Severity.HIGH ) # Default is WARNING which maps to HIGH @pytest.mark.asyncio async def test_semgrep_severity_mapping_comprehensive(self): """Test comprehensive semgrep severity mapping.""" test_cases = [ ("error", Severity.CRITICAL), ("critical", Severity.CRITICAL), ("warning", Severity.HIGH), ("info", Severity.MEDIUM), ("low", Severity.LOW), ("unknown", Severity.LOW), # Fallback case ("", Severity.LOW), # Empty string fallback ] for semgrep_severity, expected_severity in test_cases: result = self.scanner._map_semgrep_severity(semgrep_severity) assert ( result == expected_severity ), f"Failed for severity: {semgrep_severity}" @pytest.mark.asyncio async def test_category_mapping_edge_cases(self): """Test category mapping with edge cases.""" test_cases = [ ("sql-injection", "SQL injection detected", Category.INJECTION), ("xss-stored", "Cross-site scripting found", Category.XSS), ("authentication-bypass", "Auth bypass", Category.AUTHENTICATION), ("crypto-weak", "Weak cryptography", Category.CRYPTOGRAPHY), ("path-traversal", "Directory traversal", Category.PATH_TRAVERSAL), ("rce-command", "Remote code execution", Category.RCE), ("ssrf-request", "Server-side request forgery", Category.SSRF), ("deserial-pickle", "Insecure deserialization", Category.DESERIALIZATION), ("secret-key", "Hardcoded secret", Category.SECRETS), ("csrf-missing", "CSRF protection missing", Category.CSRF), ("dos-regex", "ReDoS vulnerability", Category.DOS), ("config-debug", "Debug mode enabled", Category.CONFIGURATION), ("log-injection", "Log injection", Category.INJECTION), ("log-format", "Log format issue", Category.LOGGING), ("input-validation", "Input validation missing", Category.VALIDATION), ( "unknown-rule", "Unknown rule type", Category.VALIDATION, ), # Default fallback ] for rule_id, message, expected_category in test_cases: result = self.scanner._map_semgrep_category(rule_id, message) assert result == expected_category, f"Failed for rule_id: {rule_id}" @pytest.mark.asyncio async def test_get_file_extension_mapping(self): """Test file extension mapping for different languages.""" assert self.scanner._get_file_extension("python") == ".py" assert self.scanner._get_file_extension("javascript") == ".js" assert self.scanner._get_file_extension("typescript") == ".ts" @pytest.mark.asyncio async def test_scan_code_with_semgrep_unavailable(self): """Test scan_code when semgrep is not available.""" # Create scanner with semgrep unavailable with patch.object(self.scanner, "is_available", return_value=False): threats = await self.scanner.scan_code("test code", "test.py", "python") assert threats == [] @pytest.mark.asyncio async def test_scan_file_with_semgrep_unavailable(self): """Test scan_file when semgrep is not available.""" with patch.object(self.scanner, "is_available", return_value=False): threats = await self.scanner.scan_file("test.py", "python") assert threats == [] @pytest.mark.asyncio async def test_scan_directory_with_semgrep_unavailable(self): """Test scan_directory when semgrep is not available.""" with patch.object(self.scanner, "is_available", return_value=False): threats = await self.scanner.scan_directory("/test/dir") assert threats == [] class TestSemgrepScannerEdgeCases: """Test edge cases and error conditions for SemgrepScanner.""" def setup_method(self): """Set up test fixtures.""" self.scanner = SemgrepScanner() @patch("adversary_mcp_server.scanner.semgrep_scanner.subprocess") def test_module_availability_exception_handling(self, mock_subprocess): """Test module availability check exception handling.""" # Mock subprocess to raise an exception during module import mock_subprocess.run.side_effect = Exception("Unexpected error") # Import the module again to trigger the exception path import adversary_mcp_server.scanner.semgrep_scanner as scanner_module # Reload to test exception handling with patch.object(scanner_module, "_SEMGREP_AVAILABLE", False): scanner = SemgrepScanner() assert not scanner.is_available() @pytest.mark.asyncio async def test_find_semgrep_file_not_found_and_timeout(self): """Test _find_semgrep with FileNotFoundError and timeout.""" scanner = SemgrepScanner() scanner._semgrep_path = None # Clear cache with patch("asyncio.create_subprocess_exec") as mock_create: # All attempts fail - FileNotFoundError and then timeout mock_create.side_effect = [ FileNotFoundError("semgrep not found"), FileNotFoundError("semgrep not found"), FileNotFoundError("semgrep not found"), FileNotFoundError("semgrep not found"), FileNotFoundError("semgrep not found"), ] with pytest.raises(RuntimeError, match="Semgrep not found"): await scanner._find_semgrep() @pytest.mark.asyncio async def test_find_semgrep_success_after_failures(self): """Test _find_semgrep succeeds after some failures.""" scanner = SemgrepScanner() scanner._semgrep_path = None # Clear cache with patch("asyncio.create_subprocess_exec") as mock_create: # First attempt fails, second succeeds mock_proc_fail = AsyncMock() mock_proc_success = AsyncMock() mock_proc_success.returncode = 0 mock_create.side_effect = [ FileNotFoundError("first path fails"), mock_proc_success, ] with patch("asyncio.wait_for", return_value=None): result = await scanner._find_semgrep() assert result == "semgrep" assert scanner._semgrep_path == "semgrep" @pytest.mark.skip(reason="ScanResult class removed during cache migration") def test_cache_validation_edge_cases(self): """Test cache validation edge cases.""" # scanner = SemgrepScanner(cache_ttl=60) # # # Test invalid cache - different hash # old_result = ScanResult.__new__(ScanResult) # old_result.findings = [] # old_result.timestamp = time.time() # old_result.file_hash = "old_hash" # # assert not scanner._is_cache_valid(old_result, "new_hash") # # # Test expired cache - same hash but old timestamp # old_result.file_hash = "same_hash" # old_result.timestamp = time.time() - 120 # 2 minutes ago, TTL is 60s # # assert not scanner._is_cache_valid(old_result, "same_hash") # # # Test valid cache # fresh_result = ScanResult.__new__(ScanResult) # fresh_result.findings = [] # fresh_result.timestamp = time.time() # fresh_result.file_hash = "current_hash" # # assert scanner._is_cache_valid(fresh_result, "current_hash") pass def test_convert_finding_error_handling(self): """Test error handling in _convert_semgrep_finding_to_threat.""" scanner = SemgrepScanner() # Test malformed finding that causes exception malformed_finding = { "check_id": None, # This could cause issues "start": {"line": "not_a_number"}, # Invalid line number "metadata": "not_a_dict", # Invalid metadata } # Should return error threat instead of crashing threat = scanner._convert_semgrep_finding_to_threat( malformed_finding, "test.py" ) assert threat.rule_id == "semgrep_conversion_error" assert threat.rule_name == "Semgrep Finding Conversion Error" assert "Failed to convert Semgrep finding" in threat.description assert threat.severity == Severity.LOW assert threat.file_path == "test.py" def test_convert_finding_cwe_handling(self): """Test CWE handling in finding conversion.""" scanner = SemgrepScanner() # Test with CWE as list finding_with_cwe_list = { "check_id": "test.rule", "message": "Test message", "start": {"line": 1}, "metadata": {"cwe": ["CWE-89", "CWE-95"]}, } threat = scanner._convert_semgrep_finding_to_threat( finding_with_cwe_list, "test.py" ) assert threat.cwe_id == "CWE-89" # First one in list # Test with empty CWE list finding_with_empty_cwe = { "check_id": "test.rule", "message": "Test message", "start": {"line": 1}, "metadata": {"cwe": []}, } threat = scanner._convert_semgrep_finding_to_threat( finding_with_empty_cwe, "test.py" ) assert threat.cwe_id is None # Test with CWE as string finding_with_cwe_string = { "check_id": "test.rule", "message": "Test message", "start": {"line": 1}, "metadata": {"cwe": "CWE-79"}, } threat = scanner._convert_semgrep_finding_to_threat( finding_with_cwe_string, "test.py" ) assert threat.cwe_id == "CWE-79" @pytest.mark.asyncio @pytest.mark.skip(reason="_get_cache_key method removed during cache migration") async def test_scan_code_cache_hit_path(self): """Test scan_code cache hit execution path.""" scanner = SemgrepScanner() # Pre-populate cache cache_key = scanner._get_cache_key("test code", "test.py", "python") file_hash = scanner._get_file_hash("test code") cached_findings = [ { "check_id": "test.rule", "message": "Cached finding", "start": {"line": 1}, "metadata": {"severity": "warning"}, } ] # scanner._cache[cache_key] = ScanResult.__new__(ScanResult) # scanner._cache[cache_key].findings = cached_findings # scanner._cache[cache_key].timestamp = time.time() # scanner._cache[cache_key].file_hash = file_hash # Test cache hit with severity filtering threats = await scanner.scan_code( "test code", "test.py", "python", severity_threshold=Severity.MEDIUM ) assert len(threats) == 1 assert threats[0].description == "Cached finding" @pytest.mark.asyncio async def test_scan_code_conversion_error_handling(self): """Test scan_code handling of conversion errors.""" scanner = SemgrepScanner() with patch.object(scanner, "_perform_scan") as mock_perform: # Mock findings that will cause conversion errors mock_perform.return_value = [ {"malformed": "finding"}, # Will cause conversion error { "check_id": "good.rule", "message": "Good finding", "start": {"line": 1}, }, ] with patch.object( scanner, "_convert_semgrep_finding_to_threat" ) as mock_convert: # First call raises exception, second succeeds mock_convert.side_effect = [ Exception("Conversion failed"), ThreatMatch( rule_id="good.rule", rule_name="Good Rule", description="Good finding", category=Category.VALIDATION, severity=Severity.MEDIUM, file_path="test.py", line_number=1, ), ] threats = await scanner.scan_code("test", "test.py", "python") # Should get one threat (second one that succeeded) assert len(threats) == 1 assert threats[0].rule_id == "good.rule" @pytest.mark.asyncio async def test_scan_file_not_found_error(self): """Test scan_file with file not found.""" # Import the correct scanner class from adversary_mcp_server.scanner.semgrep_scanner import OptimizedSemgrepScanner scanner = OptimizedSemgrepScanner() # Expect that scanning a non-existent file will raise an error with pytest.raises( Exception ): # Broaden the exception type since different errors can occur await scanner.scan_file("nonexistent.py") @pytest.mark.asyncio async def test_scan_file_unicode_decode_error(self): """Test scan_file with unicode decode error.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isfile", return_value=True): with patch("builtins.open", mock_open()) as mock_file: mock_file.side_effect = UnicodeDecodeError( "utf-8", b"", 0, 1, "invalid start byte" ) threats = await scanner.scan_file("binary.py", "python") assert threats == [] # Should return empty list for binary files @pytest.mark.asyncio @pytest.mark.skip(reason="_get_cache_key method removed during cache migration") async def test_scan_file_cache_hit(self): """Test scan_file cache hit path.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isfile", return_value=True): with patch("builtins.open", mock_open(read_data="test code")): # Pre-populate cache cache_key = scanner._get_cache_key("test code", "test.py", "python") file_hash = scanner._get_file_hash("test code") cached_findings = [ { "check_id": "cached.rule", "message": "Cached finding", "start": {"line": 5}, } ] # scanner._cache[cache_key] = ScanResult.__new__(ScanResult) # scanner._cache[cache_key].findings = cached_findings # scanner._cache[cache_key].timestamp = time.time() # scanner._cache[cache_key].file_hash = file_hash threats = await scanner.scan_file("test.py", "python") assert len(threats) == 1 assert threats[0].description == "Cached finding" assert threats[0].line_number == 5 @pytest.mark.asyncio async def test_scan_file_with_severity_threshold(self): """Test scan_file with severity threshold filtering.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isfile", return_value=True): with patch("builtins.open", mock_open(read_data="test code")): with patch.object(scanner, "_perform_scan") as mock_perform: mock_perform.return_value = [ { "check_id": "low.rule", "message": "Low severity", "start": {"line": 1}, "metadata": {"severity": "info"}, }, { "check_id": "high.rule", "message": "High severity", "start": {"line": 2}, "metadata": {"severity": "error"}, }, ] threats = await scanner.scan_file( "test.py", "python", severity_threshold=Severity.HIGH ) # Should only get the high severity threat assert len(threats) == 1 assert threats[0].description == "High severity" @pytest.mark.asyncio async def test_scan_directory_not_found(self): """Test scan_directory with directory not found.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isdir", return_value=False): with pytest.raises(FileNotFoundError, match="Directory not found"): await scanner.scan_directory("/nonexistent/dir") @pytest.mark.asyncio @pytest.mark.skip(reason="_get_cache_key method removed during cache migration") async def test_scan_directory_cache_hit(self): """Test scan_directory cache hit path.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isdir", return_value=True): with patch.object( scanner, "_get_directory_hash", return_value="test_hash" ): # Pre-populate cache cache_key = scanner._get_cache_key("", "/test/dir", "directory") cached_findings = [ { "check_id": "dir.rule", "message": "Directory finding", "path": "/test/dir/file.py", "start": {"line": 10}, } ] # scanner._cache[cache_key] = ScanResult.__new__(ScanResult) # scanner._cache[cache_key].findings = cached_findings # scanner._cache[cache_key].timestamp = time.time() # scanner._cache[cache_key].file_hash = "test_hash" threats = await scanner.scan_directory("/test/dir") assert len(threats) == 1 assert threats[0].description == "Directory finding" assert threats[0].file_path == "/test/dir/file.py" @pytest.mark.asyncio async def test_scan_directory_with_filtering(self): """Test scan_directory with severity filtering.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isdir", return_value=True): with patch.object(scanner, "_perform_directory_scan") as mock_scan: mock_scan.return_value = [ { "check_id": "info.rule", "message": "Info level", "path": "/test/file1.py", "start": {"line": 1}, "metadata": {"severity": "info"}, }, { "check_id": "critical.rule", "message": "Critical level", "path": "/test/file2.py", "start": {"line": 1}, "metadata": {"severity": "error"}, }, ] threats = await scanner.scan_directory( "/test/dir", severity_threshold=Severity.HIGH ) # Should only get critical threat assert len(threats) == 1 assert threats[0].description == "Critical level" def test_get_directory_hash_os_error(self): """Test _get_directory_hash with OSError.""" scanner = SemgrepScanner() with patch("os.stat", side_effect=OSError("Access denied")): # Should fallback to timestamp-based hash hash1 = scanner._get_directory_hash("/test/dir") hash2 = scanner._get_directory_hash("/test/dir") # Hashes should be different (timestamp-based) assert hash1 != hash2 assert len(hash1) == 64 # SHA256 hex length @pytest.mark.asyncio async def test_perform_scan_json_decode_error(self): """Test _perform_scan with JSON parse error.""" scanner = SemgrepScanner() with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("tempfile.NamedTemporaryFile") as mock_temp: mock_file = MagicMock() mock_file.name = "/tmp/test.py" mock_temp.return_value.__enter__.return_value = mock_file with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = 0 mock_proc.communicate.return_value = (b"invalid json", b"") mock_create.return_value = mock_proc findings = await scanner._perform_scan("code", "test.py", "python") assert findings == [] @pytest.mark.asyncio async def test_perform_scan_nonzero_returncode(self): """Test _perform_scan with non-zero return code.""" scanner = SemgrepScanner() with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("tempfile.NamedTemporaryFile") as mock_temp: mock_file = MagicMock() mock_file.name = "/tmp/test.py" mock_temp.return_value.__enter__.return_value = mock_file with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = 1 mock_proc.communicate.return_value = (b"", b"Error occurred") mock_create.return_value = mock_proc findings = await scanner._perform_scan("code", "test.py", "python") assert findings == [] @pytest.mark.asyncio async def test_perform_scan_temp_file_cleanup(self): """Test _perform_scan cleans up temp file even on errors.""" scanner = SemgrepScanner() with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("tempfile.NamedTemporaryFile") as mock_temp: mock_file = MagicMock() mock_file.name = "/tmp/test.py" mock_temp.return_value.__enter__.return_value = mock_file mock_temp.return_value.__exit__.return_value = False with patch("os.unlink") as mock_unlink: with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = 0 mock_proc.communicate.return_value = (b"invalid json", b"") mock_create.return_value = mock_proc # Simulate JSON decode error which is caught findings = await scanner._perform_scan( "code", "test.py", "python" ) assert findings == [] mock_unlink.assert_called_with("/tmp/test.py") @pytest.mark.asyncio async def test_perform_scan_unlink_oserror(self): """Test _perform_scan handles OSError during temp file cleanup.""" scanner = SemgrepScanner() with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("tempfile.NamedTemporaryFile") as mock_temp: mock_file = MagicMock() mock_file.name = "/tmp/test.py" mock_temp.return_value.__enter__.return_value = mock_file with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = 0 mock_proc.communicate.return_value = (b'{"results": []}', b"") mock_create.return_value = mock_proc with patch("os.unlink", side_effect=OSError("Delete failed")): # Should not raise exception even if unlink fails findings = await scanner._perform_scan( "code", "test.py", "python" ) assert findings == [] @pytest.mark.asyncio async def test_perform_directory_scan_process_termination(self): """Test _perform_directory_scan handles process termination correctly.""" scanner = SemgrepScanner() with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = None # Process still running mock_proc.communicate.return_value = (b'{"results": []}', b"") mock_create.return_value = mock_proc findings = await scanner._perform_directory_scan("/test/dir", True) assert findings == [] mock_proc.terminate.assert_called_once() @pytest.mark.asyncio async def test_perform_directory_scan_json_error(self): """Test _perform_directory_scan with JSON parse error.""" scanner = SemgrepScanner() with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = 0 mock_proc.communicate.return_value = (b"malformed json", b"") mock_create.return_value = mock_proc findings = await scanner._perform_directory_scan("/test/dir", True) assert findings == [] @pytest.mark.asyncio async def test_perform_directory_scan_recursive_flag(self): """Test _perform_directory_scan handles recursive flag.""" scanner = SemgrepScanner() with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = 0 mock_proc.communicate.return_value = (b'{"results": []}', b"") mock_create.return_value = mock_proc # Test non-recursive scan await scanner._perform_directory_scan("/test/dir", False) # Should include --max-depth=1 in command call_args = mock_create.call_args[0] # Get positional args assert "--max-depth=1" in call_args def test_get_extension_for_language_edge_cases(self): """Test _get_extension_for_language with edge cases.""" scanner = SemgrepScanner() # Test None language assert scanner._get_extension_for_language(None) == ".txt" # Test unknown language assert scanner._get_extension_for_language("unknown") == ".txt" # Test case sensitivity assert scanner._get_extension_for_language("PYTHON") == ".py" assert scanner._get_extension_for_language("JavaScript") == ".js" def test_get_clean_env(self): """Test _get_clean_env removes semgrep metrics vars.""" scanner = SemgrepScanner() with patch.dict( os.environ, { "SEMGREP_SEND_METRICS": "1", "SEMGREP_METRICS_ON": "true", "OTHER_VAR": "keep_this", "SEMGREP_SOME_OTHER": "keep_this_too", }, ): env = scanner._get_clean_env() # Should remove metrics-related vars assert "SEMGREP_SEND_METRICS" not in env assert "SEMGREP_METRICS_ON" not in env # Should keep other vars assert env["OTHER_VAR"] == "keep_this" assert env["SEMGREP_SOME_OTHER"] == "keep_this_too" # Should set user agent assert env["SEMGREP_USER_AGENT_APPEND"] == "adversary-mcp-server" @pytest.mark.skip(reason="_cache attribute removed during cache migration") def test_clear_cache(self): """Test clear_cache functionality.""" scanner = SemgrepScanner() # Add some cache entries scanner._cache["key1"] = "value1" scanner._cache["key2"] = "value2" assert len(scanner._cache) == 2 scanner.clear_cache() assert len(scanner._cache) == 0 @pytest.mark.skip(reason="ScanResult class removed during cache migration") def test_get_cache_stats(self): """Test get_cache_stats functionality.""" # scanner = SemgrepScanner(cache_ttl=300) # # # Add cache entries # result1 = ScanResult.__new__(ScanResult) # result1.findings = [{"rule": "test1"}, {"rule": "test2"}] # result1.timestamp = time.time() - 60 # 1 minute ago # # result2 = ScanResult.__new__(ScanResult) # result2.findings = [{"rule": "test3"}] # result2.timestamp = time.time() - 10 # 10 seconds ago # # scanner._cache["key1"] = result1 # scanner._cache["key2"] = result2 # # stats = scanner.get_cache_stats() pass # assert stats["cache_size"] == 2 # assert stats["cache_ttl"] == 300 # assert len(stats["entries"]) == 2 # # # Check entry details # for entry in stats["entries"]: # assert "key" in entry # assert "findings_count" in entry # assert "age_seconds" in entry # assert entry["key"].endswith("...") # Truncated key # # # Check findings counts # findings_counts = [entry["findings_count"] for entry in stats["entries"]] # assert 2 in findings_counts # result1 has 2 findings # assert 1 in findings_counts # result2 has 1 finding class TestSemgrepScannerCompatibility: """Test compatibility methods and aliases.""" def setup_method(self): """Set up test fixtures.""" self.scanner = SemgrepScanner() def test_semgrep_scanner_alias(self): """Test that SemgrepScanner alias works.""" from adversary_mcp_server.scanner.semgrep_scanner import ( OptimizedSemgrepScanner, SemgrepScanner, ) # Should be the same class assert SemgrepScanner is OptimizedSemgrepScanner # Should create same type of instance scanner1 = SemgrepScanner() scanner2 = OptimizedSemgrepScanner() assert type(scanner1) is type(scanner2) def test_get_file_extension_compatibility(self): """Test _get_file_extension compatibility method.""" scanner = SemgrepScanner() # Test with Language enum assert scanner._get_file_extension("python") == ".py" assert scanner._get_file_extension("javascript") == ".js" # Test with string-like object class MockLanguage: value = "typescript" mock_lang = MockLanguage() assert scanner._get_file_extension(mock_lang) == ".ts" def test_initialization_with_optional_params(self): """Test initialization with optional threat_engine and credential_manager.""" mock_threat_engine = MagicMock() mock_credential_manager = MagicMock() scanner = SemgrepScanner( config="custom-config", cache_ttl=600, threat_engine=mock_threat_engine, credential_manager=mock_credential_manager, ) assert scanner.config == "custom-config" assert scanner.cache_ttl == 600 assert scanner.threat_engine is mock_threat_engine assert scanner.credential_manager is mock_credential_manager def test_get_clean_env_with_api_key_from_credential_manager(self): """Test _get_clean_env sets SEMGREP_APP_TOKEN from credential manager (Pro tier).""" mock_credential_manager = MagicMock() mock_credential_manager.get_semgrep_api_key.return_value = ( "sk-test-api-key-12345" ) scanner = SemgrepScanner(credential_manager=mock_credential_manager) with patch.dict(os.environ, {"EXISTING_VAR": "value"}, clear=True): env = scanner._get_clean_env() # Should set API token from credential manager assert env["SEMGREP_APP_TOKEN"] == "sk-test-api-key-12345" assert env["SEMGREP_USER_AGENT_APPEND"] == "adversary-mcp-server" mock_credential_manager.get_semgrep_api_key.assert_called_once() def test_get_clean_env_without_api_key_from_credential_manager(self): """Test _get_clean_env without API key from credential manager (free tier).""" mock_credential_manager = MagicMock() mock_credential_manager.get_semgrep_api_key.return_value = None scanner = SemgrepScanner(credential_manager=mock_credential_manager) with patch.dict(os.environ, {"SEMGREP_APP_TOKEN": "old_env_token"}, clear=True): env = scanner._get_clean_env() # Should remove any existing env var token assert "SEMGREP_APP_TOKEN" not in env assert env["SEMGREP_USER_AGENT_APPEND"] == "adversary-mcp-server" mock_credential_manager.get_semgrep_api_key.assert_called_once() def test_get_clean_env_without_credential_manager(self): """Test _get_clean_env without credential manager (no token management).""" scanner = SemgrepScanner(credential_manager=None) with patch.dict( os.environ, {"SEMGREP_APP_TOKEN": "env_token", "OTHER_VAR": "value"}, clear=True, ): env = scanner._get_clean_env() # Should preserve existing environment token when no credential manager assert env["SEMGREP_APP_TOKEN"] == "env_token" assert env["SEMGREP_USER_AGENT_APPEND"] == "adversary-mcp-server" def test_get_clean_env_removes_metrics_vars(self): """Test _get_clean_env removes SEMGREP metrics variables.""" mock_credential_manager = MagicMock() mock_credential_manager.get_semgrep_api_key.return_value = "test-key" scanner = SemgrepScanner(credential_manager=mock_credential_manager) test_env = { "SEMGREP_METRICS_ON": "true", "SEMGREP_ANONYMOUS_METRICS": "false", "SEMGREP_OTHER_VAR": "keep", "REGULAR_VAR": "keep", } with patch.dict(os.environ, test_env, clear=True): env = scanner._get_clean_env() # Should remove metrics variables assert "SEMGREP_METRICS_ON" not in env assert "SEMGREP_ANONYMOUS_METRICS" not in env # Should keep non-metrics SEMGREP vars and regular vars assert env["SEMGREP_OTHER_VAR"] == "keep" assert env["REGULAR_VAR"] == "keep" # Should set our custom vars assert env["SEMGREP_APP_TOKEN"] == "test-key" assert env["SEMGREP_USER_AGENT_APPEND"] == "adversary-mcp-server" class TestSemgrepScannerAdvancedCoverage: """Additional tests to improve coverage for missing lines.""" def setup_method(self): """Set up test fixtures.""" self.scanner = SemgrepScanner() def test_module_level_exception_during_availability_check(self): """Test module-level exception handling during semgrep availability check.""" # This tests lines 47 and 51 - exception handling in module initialization with patch("subprocess.run") as mock_run: # Mock subprocess.run to raise TimeoutExpired (line 47) mock_run.side_effect = subprocess.TimeoutExpired("semgrep", 5) # Re-import module to trigger the exception path import adversary_mcp_server.scanner.semgrep_scanner as scanner_module # Force module reload to test exception handling with patch.object(scanner_module, "_SEMGREP_AVAILABLE", False): scanner = SemgrepScanner() assert not scanner.is_available() def test_advanced_cache_manager_initialization_failure(self): """Test advanced cache manager initialization failure (lines 106-107).""" mock_credential_manager = MagicMock() mock_config = MagicMock() mock_config.enable_caching = True mock_config.cache_max_size_mb = 100 mock_config.cache_max_age_hours = 24 mock_credential_manager.load_config.return_value = mock_config # Mock CacheManager to raise exception during initialization with patch( "adversary_mcp_server.scanner.semgrep_scanner.CacheManager" ) as mock_cache_manager: mock_cache_manager.side_effect = ValueError("Cache initialization failed") with patch( "adversary_mcp_server.scanner.semgrep_scanner.get_app_cache_dir", return_value="/tmp/cache", ): # This should trigger the exception handling in lines 106-107 scanner = SemgrepScanner(credential_manager=mock_credential_manager) # Scanner should still initialize despite cache manager failure assert scanner.credential_manager is mock_credential_manager assert ( scanner.cache_manager is None ) # Should remain None due to exception @pytest.mark.asyncio async def test_find_semgrep_process_termination_handling(self): """Test process termination handling in _find_semgrep (lines 158, 162).""" scanner = SemgrepScanner() scanner._semgrep_path = None # Clear cache with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = None # Process still running mock_create.return_value = mock_proc # Mock wait_for to raise TimeoutError, then process termination with patch("asyncio.wait_for", side_effect=TimeoutError()): # Mock terminate and wait to test lines 158-162 mock_proc.terminate = AsyncMock() mock_proc.wait = AsyncMock() try: await scanner._find_semgrep() except RuntimeError: pass # Expected when no valid semgrep found # Process should be terminated due to timeout (called for each path tried) assert mock_proc.terminate.call_count > 0 @pytest.mark.asyncio async def test_find_semgrep_process_lookup_error_handling(self): """Test ProcessLookupError handling during process termination.""" scanner = SemgrepScanner() scanner._semgrep_path = None with patch("asyncio.create_subprocess_exec") as mock_create: mock_proc = AsyncMock() mock_proc.returncode = None mock_create.return_value = mock_proc with patch("asyncio.wait_for", side_effect=TimeoutError()): # Mock terminate to succeed but wait to raise ProcessLookupError mock_proc.terminate = AsyncMock() mock_proc.wait = AsyncMock( side_effect=ProcessLookupError("Process not found") ) try: await scanner._find_semgrep() except RuntimeError: pass # Expected # Should handle ProcessLookupError gracefully (called for multiple paths) assert mock_proc.terminate.call_count > 0 assert mock_proc.wait.call_count > 0 @pytest.mark.asyncio async def test_scan_code_with_advanced_cache_manager(self): """Test scan_code with advanced cache manager integration.""" mock_cache_manager = MagicMock() mock_hasher = MagicMock() mock_hasher.hash_content.return_value = "test_hash" mock_cache_manager.get_hasher.return_value = mock_hasher mock_cache_manager.get.return_value = None # Cache miss mock_cache_manager.put = MagicMock() # Mock credential manager to enable caching mock_credential_manager = MagicMock() mock_config = MagicMock() mock_config.enable_caching = True mock_credential_manager.load_config.return_value = mock_config scanner = SemgrepScanner( cache_manager=mock_cache_manager, credential_manager=mock_credential_manager ) with patch.object(scanner, "is_available", return_value=True): with patch.object(scanner, "_perform_scan", return_value=[]): await scanner.scan_code("test code", "test.py", "python") # Should interact with advanced cache manager when caching is enabled mock_cache_manager.get.assert_called() mock_cache_manager.put.assert_called() @pytest.mark.asyncio @pytest.mark.skip(reason="_cache attribute removed during cache migration") async def test_scan_file_with_legacy_cache_functionality(self): """Test scan_file with legacy cache functionality (scan_file doesn't use advanced cache manager).""" scanner = SemgrepScanner() # Pre-populate legacy cache with a result scanner._cache["test_cache_key"] = MagicMock() scanner._cache["test_cache_key"].findings = [ {"check_id": "test.rule", "message": "Test finding", "start": {"line": 1}} ] scanner._cache["test_cache_key"].timestamp = time.time() scanner._cache["test_cache_key"].file_hash = "test_hash" with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isfile", return_value=True): with patch("builtins.open", mock_open(read_data="test content")): with patch.object( scanner, "_get_cache_key", return_value="test_cache_key" ): with patch.object( scanner, "_get_file_hash", return_value="test_hash" ): with patch.object( scanner, "_is_cache_valid", return_value=True ): threats = await scanner.scan_file("test.py", "python") # Should return cached result assert len(threats) == 1 assert threats[0].rule_id == "semgrep-test.rule" def test_get_extension_for_language_with_none_input(self): """Test _get_extension_for_language with None input.""" scanner = SemgrepScanner() # Test None language input result = scanner._get_extension_for_language(None) assert result == ".txt" # Generic fallback instead of Python-centric def test_get_extension_for_language_with_object_having_value_attribute(self): """Test _get_extension_for_language with object having value attribute.""" scanner = SemgrepScanner() class MockLanguageEnum: value = "javascript" mock_lang = MockLanguageEnum() result = scanner._get_extension_for_language(mock_lang) assert result == ".js" @pytest.mark.asyncio async def test_scan_code_with_semgrep_not_found_exception(self): """Test scan_code when semgrep is not found.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): # Mock _find_semgrep to raise an exception (this tests the exception handling) with patch.object( scanner, "_find_semgrep", side_effect=RuntimeError("Semgrep not found") ): # The scan_code method should catch the RuntimeError and return empty list result = await scanner.scan_code("test code", "test.py", "python") assert result == [] @pytest.mark.asyncio async def test_scan_code_with_temp_file_write_error(self): """Test scan_code with temp file write error.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch.object(scanner, "_find_semgrep", return_value="semgrep"): with patch("tempfile.NamedTemporaryFile") as mock_temp: mock_file = MagicMock() mock_file.name = "/tmp/test.py" mock_file.write.side_effect = OSError("Write failed") mock_temp.return_value.__enter__.return_value = mock_file # The scan_code method should catch the exception and return empty list result = await scanner.scan_code("test code", "test.py", "python") assert result == [] @pytest.mark.asyncio async def test_scan_directory_with_semgrep_not_found_exception(self): """Test scan_directory when semgrep is not found.""" scanner = SemgrepScanner() with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isdir", return_value=True): with patch.object( scanner, "_find_semgrep", side_effect=RuntimeError("Semgrep not found"), ): # The scan_directory method should catch the exception and return empty list result = await scanner.scan_directory("/test/dir") assert result == [] def test_get_directory_hash_with_multiple_files(self): """Test _get_directory_hash with multiple files.""" scanner = SemgrepScanner() # Mock os.walk to return some files mock_walk_data = [ ("/test/dir", [], ["file1.py", "file2.js"]), ("/test/dir/subdir", [], ["file3.ts"]), ] with patch("os.walk", return_value=mock_walk_data): with patch("os.stat") as mock_stat: # Mock stat to return consistent mtime mock_stat_result = MagicMock() mock_stat_result.st_mtime = 1234567890.0 mock_stat.return_value = mock_stat_result hash1 = scanner._get_directory_hash("/test/dir") hash2 = scanner._get_directory_hash("/test/dir") # Should be same hash for same directory state assert hash1 == hash2 assert len(hash1) == 64 # SHA256 hex length def test_convert_finding_with_path_field(self): """Test _convert_semgrep_finding_to_threat with 'path' field.""" scanner = SemgrepScanner() # Test finding with 'path' field instead of separate file_path finding = { "check_id": "test.rule", "message": "Test finding", "path": "/project/src/file.py", "start": {"line": 10}, "metadata": {"severity": "warning"}, } # The method signature requires a file_path parameter threat = scanner._convert_semgrep_finding_to_threat( finding, "/project/src/file.py" ) assert threat.file_path == "/project/src/file.py" assert threat.line_number == 10 def test_convert_finding_with_extra_lines_field(self): """Test _convert_semgrep_finding_to_threat with extra.lines field.""" scanner = SemgrepScanner() finding = { "check_id": "test.rule", "message": "Test finding", "start": {"line": 5}, "extra": {"lines": "vulnerable_code_snippet()"}, } threat = scanner._convert_semgrep_finding_to_threat(finding, "test.py") assert threat.code_snippet == "vulnerable_code_snippet()" def test_convert_finding_with_owasp_as_list(self): """Test _convert_semgrep_finding_to_threat with OWASP as list.""" scanner = SemgrepScanner() finding = { "check_id": "test.rule", "message": "Test finding", "start": {"line": 1}, "metadata": {"owasp": ["A01:2021", "A03:2021"]}, } threat = scanner._convert_semgrep_finding_to_threat(finding, "test.py") # The code sets owasp_category directly as the whole list assert threat.owasp_category == ["A01:2021", "A03:2021"] def test_convert_finding_with_references_as_string(self): """Test _convert_semgrep_finding_to_threat with references as string.""" scanner = SemgrepScanner() finding = { "check_id": "test.rule", "message": "Test finding", "start": {"line": 1}, "metadata": {"references": "https://example.com/vuln-info"}, } threat = scanner._convert_semgrep_finding_to_threat(finding, "test.py") # The code sets references directly as provided assert threat.references == "https://example.com/vuln-info" @pytest.mark.asyncio @pytest.mark.skip(reason="_cache attribute removed during cache migration") async def test_scan_directory_with_legacy_cache_functionality(self): """Test scan_directory with legacy cache functionality.""" scanner = SemgrepScanner() # Pre-populate legacy cache with a result scanner._cache["test_cache_key"] = MagicMock() scanner._cache["test_cache_key"].findings = [ { "check_id": "test.rule", "message": "Test directory finding", "start": {"line": 5}, "path": "/test/dir/file.py", } ] scanner._cache["test_cache_key"].timestamp = time.time() scanner._cache["test_cache_key"].file_hash = "dir_hash" with patch.object(scanner, "is_available", return_value=True): with patch("os.path.isdir", return_value=True): with patch.object( scanner, "_get_cache_key", return_value="test_cache_key" ): with patch.object( scanner, "_get_directory_hash", return_value="dir_hash" ): with patch.object( scanner, "_is_cache_valid", return_value=True ): threats = await scanner.scan_directory("/test/dir") # Should return cached result assert len(threats) == 1 assert threats[0].description == "Test directory finding" assert threats[0].file_path == "/test/dir/file.py" def test_error_handler_initialization(self): """Test that ErrorHandler is properly initialized.""" scanner = SemgrepScanner() # ErrorHandler should be initialized assert scanner.error_handler is not None assert hasattr(scanner.error_handler, "config") @pytest.mark.skip(reason="_cache attribute removed during cache migration") def test_compatibility_properties(self): """Test backward compatibility properties.""" mock_credential_manager = MagicMock() mock_threat_engine = MagicMock() scanner = SemgrepScanner( config="custom-config", cache_ttl=600, threat_engine=mock_threat_engine, credential_manager=mock_credential_manager, ) # Check all properties are accessible assert scanner.config == "custom-config" assert scanner.cache_ttl == 600 assert scanner.threat_engine is mock_threat_engine assert scanner.credential_manager is mock_credential_manager assert isinstance(scanner._cache, dict) @pytest.mark.asyncio async def test_scan_methods_with_error_handler_circuit_breaker(self): """Test scan methods with error handler circuit breaker.""" scanner = SemgrepScanner() # Mock error handler to simulate circuit breaker open from adversary_mcp_server.resilience.types import RecoveryAction, RecoveryResult mock_error_handler = MagicMock() mock_recovery_result = RecoveryResult( success=False, action_taken=RecoveryAction.CIRCUIT_BREAK, result=[], # Empty list for failed scan error_message="Circuit breaker open", ) mock_error_handler.execute_with_recovery = AsyncMock( return_value=mock_recovery_result ) scanner.error_handler = mock_error_handler with patch.object(scanner, "is_available", return_value=True): # Test scan_code with circuit breaker threats = await scanner.scan_code("test", "test.py", "python") assert threats == [] # Test scan_file with circuit breaker with patch("os.path.isfile", return_value=True): with patch("builtins.open", mock_open(read_data="test")): threats = await scanner.scan_file("test.py", "python") assert threats == [] if __name__ == "__main__": pytest.main([__file__])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server