Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_llm_adapter.py25.8 kB
"""Tests for LLMScanStrategy adapter implementation.""" import tempfile from pathlib import Path from unittest.mock import AsyncMock, Mock, patch import pytest from adversary_mcp_server.application.adapters.llm_adapter import LLMScanStrategy from adversary_mcp_server.domain.entities.scan_request import ScanRequest from adversary_mcp_server.domain.interfaces import ScanError from adversary_mcp_server.domain.value_objects.severity_level import SeverityLevel class TestLLMScanStrategy: """Test the LLMScanStrategy adapter.""" @pytest.fixture def mock_llm_scanner(self): """Create a mock LLMScanner for testing.""" scanner = Mock() scanner.analyze_file = AsyncMock() scanner.analyze_directory = AsyncMock() scanner.analyze_code = Mock() scanner.analyze_file_with_context = AsyncMock() scanner.analyze_project_with_session = AsyncMock() scanner.is_session_aware_available = Mock(return_value=True) scanner.session_manager = Mock() scanner.session_manager.analyze_changes_incrementally = AsyncMock() return scanner @pytest.fixture def llm_strategy(self, mock_llm_scanner): """Create LLMScanStrategy with mocked scanner.""" return LLMScanStrategy(llm_scanner=mock_llm_scanner) @pytest.fixture def llm_strategy_no_scanner(self): """Create LLMScanStrategy without scanner.""" # Mock the credential manager to ensure it fails with patch( "adversary_mcp_server.credentials.get_credential_manager", side_effect=Exception("No credentials"), ): strategy = LLMScanStrategy(llm_scanner=None) # Ensure scanner is actually None strategy._scanner = None return strategy @pytest.fixture def sample_scan_request(self): """Create a sample scan request for testing.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: f.write("print('hello world')\n") f.flush() request = ScanRequest.for_file_scan( file_path=f.name, requester="test", enable_semgrep=False, enable_llm=True, enable_validation=False, severity_threshold="medium", ) yield request # Cleanup Path(f.name).unlink(missing_ok=True) @pytest.fixture def sample_llm_finding(self): """Create a sample LLM finding.""" finding = Mock() finding.finding_type = "vulnerability" finding.severity = "high" finding.description = "Test vulnerability" finding.line_number = 10 finding.code_snippet = "test code" finding.explanation = "Test explanation" finding.recommendation = "Test recommendation" finding.confidence = 0.8 return finding @pytest.fixture def sample_llm_result(self): """Create a sample LLM analysis result.""" return { "finding_id": "test-llm-1", "title": "Test Security Issue", "description": "Test vulnerability description", "severity": "high", "line_number": 10, "column_number": 5, "file_path": "/test/file.py", "code_snippet": "vulnerable code", "confidence": 0.8, "reasoning": "AI detected pattern", # "remediation": "Fix the vulnerability", # Remove to avoid method call "category": "injection", } def test_initialization_with_scanner(self, mock_llm_scanner): """Test initialization with provided scanner.""" strategy = LLMScanStrategy(llm_scanner=mock_llm_scanner, enable_sessions=True) assert strategy._scanner == mock_llm_scanner assert strategy.enable_sessions is True def test_initialization_without_scanner_success(self): """Test initialization without scanner with successful credential setup.""" mock_credential_manager = Mock() mock_llm_scanner = Mock() with ( patch( "adversary_mcp_server.credentials.get_credential_manager", return_value=mock_credential_manager, ), patch( "adversary_mcp_server.application.adapters.llm_adapter.LLMScanner", return_value=mock_llm_scanner, ), ): strategy = LLMScanStrategy() assert strategy._scanner == mock_llm_scanner def test_initialization_without_scanner_failure(self): """Test initialization without scanner with credential failure.""" with patch( "adversary_mcp_server.credentials.get_credential_manager", side_effect=Exception("Credential error"), ): strategy = LLMScanStrategy() assert strategy._scanner is None def test_get_strategy_name_basic(self, llm_strategy): """Test getting strategy name without session awareness.""" llm_strategy._scanner.is_session_aware_available.return_value = False assert llm_strategy.get_strategy_name() == "llm_ai_analysis" def test_get_strategy_name_session_aware(self, llm_strategy): """Test getting strategy name with session awareness.""" llm_strategy._scanner.is_session_aware_available.return_value = True assert llm_strategy.get_strategy_name() == "llm_ai_analysis_session_aware" def test_can_scan_no_scanner(self, llm_strategy_no_scanner): """Test can_scan with no scanner available.""" context = Mock() context.metadata.scan_type = "file" context.content = None # The method checks if scanner is None first, so it should return False result = llm_strategy_no_scanner.can_scan(context) assert result is False def test_can_scan_file(self, llm_strategy): """Test can_scan for file context.""" context = Mock() context.metadata.scan_type = "file" context.content = None assert llm_strategy.can_scan(context) is True def test_can_scan_directory(self, llm_strategy): """Test can_scan for directory context.""" context = Mock() context.metadata.scan_type = "directory" context.content = None assert llm_strategy.can_scan(context) is True def test_can_scan_code(self, llm_strategy): """Test can_scan for code context.""" context = Mock() context.metadata.scan_type = "code" context.content = "print('hello')" # Small content assert llm_strategy.can_scan(context) is True def test_can_scan_code_too_large(self, llm_strategy): """Test can_scan for large code content.""" context = Mock() context.metadata.scan_type = "code" context.content = "x" * 60000 # Over 50KB limit assert llm_strategy.can_scan(context) is False def test_can_scan_diff(self, llm_strategy): """Test can_scan for diff context.""" context = Mock() context.metadata.scan_type = "diff" context.content = None assert llm_strategy.can_scan(context) is True def test_can_scan_incremental(self, llm_strategy): """Test can_scan for incremental context.""" context = Mock() context.metadata.scan_type = "incremental" context.content = None assert llm_strategy.can_scan(context) is True def test_can_scan_unsupported(self, llm_strategy): """Test can_scan for unsupported context.""" context = Mock() context.metadata.scan_type = "unknown" assert llm_strategy.can_scan(context) is False def test_get_supported_languages(self, llm_strategy): """Test getting supported languages.""" with patch( "adversary_mcp_server.application.adapters.llm_adapter.LanguageMapper.get_supported_languages", return_value=["python", "javascript", "java"], ): languages = llm_strategy.get_supported_languages() assert "python" in languages assert "javascript" in languages assert "java" in languages @pytest.mark.asyncio async def test_execute_scan_no_scanner( self, llm_strategy_no_scanner, sample_scan_request ): """Test executing scan with no scanner.""" result = await llm_strategy_no_scanner.execute_scan(sample_scan_request) assert len(result.threats) == 0 @pytest.mark.asyncio async def test_execute_scan_file( self, llm_strategy, sample_scan_request, sample_llm_finding ): """Test executing file scan.""" # Setup mocks llm_strategy._scanner.analyze_file.return_value = [sample_llm_finding] llm_strategy._scanner.is_session_aware_available.return_value = False result = await llm_strategy.execute_scan(sample_scan_request) assert len(result.threats) >= 0 # May be filtered by confidence assert result.scan_metadata["scanner"] == "llm_ai_analysis" @pytest.mark.asyncio async def test_execute_scan_directory(self, llm_strategy, sample_llm_finding): """Test executing directory scan.""" # Create directory scan request with tempfile.TemporaryDirectory() as temp_dir: request = ScanRequest.for_directory_scan( directory_path=temp_dir, requester="test" ) # Setup mocks llm_strategy._scanner.analyze_directory.return_value = [sample_llm_finding] llm_strategy._scanner.is_session_aware_available.return_value = False result = await llm_strategy.execute_scan(request) assert result.scan_metadata["scanner"] == "llm_ai_analysis" @pytest.mark.asyncio async def test_execute_scan_code(self, llm_strategy, sample_llm_finding): """Test executing code scan.""" # Create code scan request request = ScanRequest.for_code_scan( code="print('hello')", language="python", requester="test" ) # Setup mocks llm_strategy._scanner.analyze_code.return_value = [sample_llm_finding] llm_strategy._scanner.is_session_aware_available.return_value = False result = await llm_strategy.execute_scan(request) assert result.scan_metadata["scanner"] == "llm_ai_analysis" @pytest.mark.asyncio async def test_execute_scan_with_session( self, llm_strategy, sample_scan_request, sample_llm_finding ): """Test executing scan with session awareness.""" # Setup mocks for session-aware analysis llm_strategy._scanner.is_session_aware_available.return_value = True llm_strategy._scanner.analyze_file_with_context.return_value = [ sample_llm_finding ] # Mock project context detection with patch.object(llm_strategy, "_has_project_context", return_value=True): result = await llm_strategy.execute_scan(sample_scan_request) assert result.scan_metadata["session_aware"] is True llm_strategy._scanner.analyze_file_with_context.assert_called_once() @pytest.mark.asyncio async def test_execute_scan_error(self, llm_strategy, sample_scan_request): """Test scan execution with error.""" # Setup scanner to raise exception llm_strategy._scanner.analyze_file.side_effect = Exception("Scanner error") llm_strategy._scanner.is_session_aware_available.return_value = False with pytest.raises(ScanError, match="LLM scan failed"): await llm_strategy.execute_scan(sample_scan_request) @pytest.mark.asyncio async def test_analyze_file(self, llm_strategy, sample_llm_finding): """Test file analysis delegation.""" llm_strategy._scanner.analyze_file.return_value = [sample_llm_finding] result = await llm_strategy._analyze_file("/test/file.py", "python") llm_strategy._scanner.analyze_file.assert_called_once_with( "/test/file.py", "python" ) assert len(result) == 1 @pytest.mark.asyncio async def test_analyze_directory(self, llm_strategy, sample_llm_finding): """Test directory analysis delegation.""" llm_strategy._scanner.analyze_directory.return_value = [sample_llm_finding] result = await llm_strategy._analyze_directory("/test/dir") llm_strategy._scanner.analyze_directory.assert_called_once_with("/test/dir") assert len(result) == 1 @pytest.mark.asyncio async def test_analyze_code(self, llm_strategy, sample_llm_finding): """Test code analysis delegation.""" llm_strategy._scanner.analyze_code.return_value = [sample_llm_finding] result = await llm_strategy._analyze_code("print('hello')", "python") llm_strategy._scanner.analyze_code.assert_called_once_with( "print('hello')", "<code>", "python" ) assert len(result) == 1 @pytest.mark.asyncio async def test_analyze_with_session_file(self, llm_strategy, sample_llm_finding): """Test session-aware file analysis.""" context = Mock() context.metadata.scan_type = "file" context.target_path = "/test/file.py" context.metadata.analysis_focus = "security" llm_strategy._scanner.analyze_file_with_context.return_value = [ sample_llm_finding ] result = await llm_strategy._analyze_with_session(context) assert len(result) == 1 llm_strategy._scanner.analyze_file_with_context.assert_called_once() @pytest.mark.asyncio async def test_analyze_with_session_directory( self, llm_strategy, sample_llm_finding ): """Test session-aware directory analysis.""" context = Mock() context.metadata.scan_type = "directory" context.target_path = "/test/dir" context.metadata.analysis_focus = "security" llm_strategy._scanner.analyze_project_with_session.return_value = [ sample_llm_finding ] result = await llm_strategy._analyze_with_session(context) assert len(result) == 1 llm_strategy._scanner.analyze_project_with_session.assert_called_once() @pytest.mark.asyncio async def test_analyze_with_session_incremental( self, llm_strategy, sample_llm_finding ): """Test session-aware incremental analysis.""" context = Mock() context.metadata.scan_type = "incremental" context.metadata.changed_files = ["/test/file1.py", "/test/file2.py"] context.metadata.session_id = "test-session" context.metadata.commit_hash = "abc123" context.metadata.change_context = "feature branch" llm_strategy._scanner.session_manager.analyze_changes_incrementally.return_value = [ sample_llm_finding ] result = await llm_strategy._analyze_with_session(context) assert len(result) == 1 llm_strategy._scanner.session_manager.analyze_changes_incrementally.assert_called_once() @pytest.mark.asyncio async def test_analyze_with_session_incremental_no_session_manager( self, llm_strategy ): """Test incremental analysis without session manager.""" context = Mock() context.metadata.scan_type = "incremental" context.metadata.changed_files = ["/test/file1.py"] context.metadata.session_id = "test-session" llm_strategy._scanner.session_manager = None result = await llm_strategy._analyze_with_session(context) assert len(result) == 0 @pytest.mark.asyncio async def test_analyze_with_session_incremental_missing_metadata( self, llm_strategy ): """Test incremental analysis with missing metadata.""" context = Mock() context.metadata.scan_type = "incremental" # Missing required metadata - simulate hasattr returning False context.metadata.changed_files = None context.metadata.session_id = None with patch( "builtins.hasattr", side_effect=lambda obj, attr: attr not in ["changed_files", "session_id"], ): result = await llm_strategy._analyze_with_session(context) assert len(result) == 0 @pytest.mark.asyncio async def test_analyze_with_session_unsupported(self, llm_strategy): """Test session analysis with unsupported scan type.""" context = Mock() context.metadata.scan_type = "unknown" result = await llm_strategy._analyze_with_session(context) assert len(result) == 0 def test_has_project_context_file_exists(self, llm_strategy): """Test project context detection for existing file.""" with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as f: context = Mock() context.metadata.scan_type = "file" context.target_path = f.name # Always has project context for file scans now result = llm_strategy._has_project_context(context) assert result is True Path(f.name).unlink() def test_has_project_context_file_no_project(self, llm_strategy): """Test project context detection for file without project.""" with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as f: context = Mock() context.metadata.scan_type = "file" context.target_path = f.name # Always has project context for file scans now result = llm_strategy._has_project_context(context) assert result is True Path(f.name).unlink() def test_has_project_context_directory(self, llm_strategy): """Test project context detection for directory.""" with tempfile.TemporaryDirectory() as temp_dir: context = Mock() context.metadata.scan_type = "directory" context.target_path = temp_dir result = llm_strategy._has_project_context(context) assert result is True def test_has_project_context_nonexistent(self, llm_strategy): """Test project context detection for nonexistent path.""" context = Mock() context.metadata.scan_type = "file" context.target_path = "/nonexistent/file.py" result = llm_strategy._has_project_context(context) assert result is False # Removed project root detection tests - functionality no longer exists def test_convert_to_domain_threats( self, llm_strategy, sample_scan_request, sample_llm_result ): """Test converting LLM results to domain threats.""" results = [sample_llm_result] with patch("builtins.print"): # Suppress warning prints threats = llm_strategy._convert_to_domain_threats( results, sample_scan_request ) assert len(threats) == 1 # Should convert successfully now threat = threats[0] assert threat.source_scanner == "llm" assert threat.rule_id == "test-llm-1" def test_convert_to_domain_threats_conversion_error( self, llm_strategy, sample_scan_request ): """Test handling conversion error.""" # Create a result that will cause conversion error during ThreatMatch creation # Use invalid confidence score that will cause ConfidenceScore validation to fail bad_result = { "finding_id": "test-llm-1", "title": "Test Security Issue", "description": "Test vulnerability description", "severity": "high", "line_number": 10, "column_number": 5, "file_path": "/test/file.py", "code_snippet": "vulnerable code", "confidence": "invalid_confidence", # This will cause ConfidenceScore to fail "category": "injection", } with patch("builtins.print"): # Suppress warning print threats = llm_strategy._convert_to_domain_threats( [bad_result], sample_scan_request ) # Should continue processing and return empty list when conversion fails assert len(threats) == 0 def test_convert_to_domain_threats_none_file_path( self, llm_strategy, sample_scan_request ): """Test handling None file_path with graceful fallback.""" result_with_none_path = { "finding_id": "test-llm-none", "title": "Test Security Issue", "description": "Test vulnerability description", "severity": "high", "line_number": 10, "column_number": 5, "file_path": None, # This should now be handled gracefully "code_snippet": "vulnerable code", "confidence": 0.8, "category": "injection", } threats = llm_strategy._convert_to_domain_threats( [result_with_none_path], sample_scan_request ) # Should successfully create threat with fallback file path assert len(threats) == 1 threat = threats[0] assert threat.rule_id == "test-llm-none" assert str(threat.file_path) == str(sample_scan_request.context.target_path) def test_map_severity(self, llm_strategy): """Test severity mapping from LLM to domain.""" assert llm_strategy._map_severity("critical") == SeverityLevel.from_string( "critical" ) assert llm_strategy._map_severity("high") == SeverityLevel.from_string("high") assert llm_strategy._map_severity("medium") == SeverityLevel.from_string( "medium" ) assert llm_strategy._map_severity("low") == SeverityLevel.from_string("low") assert llm_strategy._map_severity("info") == SeverityLevel.from_string("low") assert llm_strategy._map_severity("warning") == SeverityLevel.from_string( "medium" ) assert llm_strategy._map_severity("error") == SeverityLevel.from_string("high") assert llm_strategy._map_severity("unknown") == SeverityLevel.from_string( "medium" ) def test_determine_category_explicit(self, llm_strategy): """Test category determination with explicit category.""" result = {"category": "explicit_category"} assert llm_strategy._determine_category(result) == "explicit_category" def test_determine_category_injection(self, llm_strategy): """Test category determination for injection.""" result = {"description": "SQL injection vulnerability"} assert llm_strategy._determine_category(result) == "injection" def test_determine_category_xss(self, llm_strategy): """Test category determination for XSS.""" result = {"title": "Cross-site scripting issue"} assert llm_strategy._determine_category(result) == "xss" def test_determine_category_crypto(self, llm_strategy): """Test category determination for cryptography.""" result = {"description": "Weak encryption algorithm"} assert llm_strategy._determine_category(result) == "cryptography" def test_determine_category_auth(self, llm_strategy): """Test category determination for authentication.""" result = {"description": "Authentication bypass"} assert llm_strategy._determine_category(result) == "authentication" def test_determine_category_path_traversal(self, llm_strategy): """Test category determination for path traversal.""" result = {"description": "Path traversal vulnerability"} assert llm_strategy._determine_category(result) == "path_traversal" def test_determine_category_information_disclosure(self, llm_strategy): """Test category determination for information disclosure.""" result = {"description": "Information disclosure issue"} assert llm_strategy._determine_category(result) == "information_disclosure" def test_determine_category_memory_safety(self, llm_strategy): """Test category determination for memory safety.""" result = {"description": "Buffer overflow vulnerability"} assert llm_strategy._determine_category(result) == "memory_safety" def test_determine_category_dos(self, llm_strategy): """Test category determination for denial of service.""" result = {"description": "Denial of service vulnerability"} assert llm_strategy._determine_category(result) == "denial_of_service" def test_determine_category_default(self, llm_strategy): """Test category determination with default fallback.""" result = {"description": "Unknown security issue"} assert llm_strategy._determine_category(result) == "security" def test_apply_severity_filter(self, llm_strategy): """Test severity filtering.""" # Create mock threats with severity that meets/doesn't meet threshold high_threat = Mock() high_threat.severity.meets_threshold.return_value = True low_threat = Mock() low_threat.severity.meets_threshold.return_value = False threats = [high_threat, low_threat] threshold = SeverityLevel.from_string("medium") filtered = llm_strategy._apply_severity_filter(threats, threshold) assert len(filtered) == 1 assert filtered[0] == high_threat def test_apply_severity_filter_no_threshold(self, llm_strategy): """Test severity filtering with no threshold.""" threats = [Mock(), Mock()] filtered = llm_strategy._apply_severity_filter(threats, None) assert len(filtered) == 2

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server