Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_false_positive_json_repository.py17.8 kB
"""Comprehensive tests for FalsePositiveJsonRepository.""" import json import tempfile from datetime import datetime from pathlib import Path from unittest.mock import Mock, patch import pytest from adversary_mcp_server.domain.value_objects.false_positive_info import ( FalsePositiveInfo, ) from adversary_mcp_server.infrastructure.false_positive_json_repository import ( FalsePositiveJsonRepository, ) class TestFalsePositiveJsonRepository: """Test the FalsePositiveJsonRepository implementation.""" def setup_method(self): """Set up test fixtures.""" self.temp_dir = Path(tempfile.mkdtemp()).resolve() self.adversary_file = self.temp_dir / "adversary.json" # Create sample data self.sample_data = { "scan_metadata": { "scan_id": "test-scan", "timestamp": datetime.now().isoformat(), }, "threats": [ { "uuid": "threat-1", "rule_id": "test-rule-1", "rule_name": "Test Rule 1", "description": "Test threat 1", "severity": "high", "is_false_positive": False, }, { "uuid": "threat-2", "rule_id": "test-rule-2", "rule_name": "Test Rule 2", "description": "Test threat 2", "severity": "medium", "is_false_positive": True, "false_positive_reason": "Test reason", "false_positive_marked_by": "test-user", "false_positive_marked_date": "2024-01-01T10:00:00", "false_positive_last_updated": "2024-01-01T10:00:00", }, ], } def teardown_method(self): """Clean up test fixtures.""" import shutil if self.temp_dir.exists(): shutil.rmtree(self.temp_dir) def test_initialization(self): """Test repository initialization.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) assert repo.adversary_file == self.adversary_file assert repo._fp_cache == {} assert repo._cache_file_mtime == 0 def test_file_exists_when_file_exists(self): """Test file_exists method when file exists.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) assert repo.file_exists() is True def test_file_exists_when_file_does_not_exist(self): """Test file_exists method when file does not exist.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) assert repo.file_exists() is False def test_get_adversary_file_path(self): """Test getting adversary file path.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) assert repo.get_adversary_file_path() == str(self.adversary_file.resolve()) @pytest.mark.asyncio async def test_get_false_positive_info_from_cache(self): """Test retrieving false positive info with caching.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) # First call should build cache fp_info = await repo.get_false_positive_info("threat-2") assert fp_info is not None assert fp_info.uuid == "threat-2" assert fp_info.is_false_positive is True # Cache should be populated assert len(repo._fp_cache) == 2 assert "threat-2" in repo._fp_cache @pytest.mark.asyncio async def test_get_false_positive_info_nonexistent(self): """Test retrieving false positive info for nonexistent threat.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) fp_info = await repo.get_false_positive_info("nonexistent") assert fp_info is None @pytest.mark.asyncio async def test_get_false_positive_info_no_file(self): """Test retrieving false positive info when file doesn't exist.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) fp_info = await repo.get_false_positive_info("threat-1") assert fp_info is None @pytest.mark.asyncio async def test_get_false_positive_info_with_exception(self): """Test retrieving false positive info with exception handling.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Mock an exception in _invalidate_cache_if_needed with patch.object( repo, "_invalidate_cache_if_needed", side_effect=Exception("Test error") ): fp_info = await repo.get_false_positive_info("threat-1") assert fp_info is None def test_invalidate_cache_if_needed_file_exists(self): """Test cache invalidation when file exists and is modified.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Build initial cache repo._build_cache() initial_cache = repo._fp_cache.copy() initial_mtime = repo._cache_file_mtime # Modify file import time time.sleep(0.1) # Ensure different mtime self.adversary_file.write_text(json.dumps(self.sample_data)) # Cache should be invalidated repo._invalidate_cache_if_needed() assert repo._fp_cache == {} assert repo._cache_file_mtime == 0 def test_invalidate_cache_if_needed_file_not_accessible_simple(self): """Test cache invalidation with OSError in different scenario.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Set up cache state repo._cache_file_mtime = 123456 repo._fp_cache["test"] = Mock() # Test the OSError handling path try: raise OSError("Test error") except OSError: if repo._cache_file_mtime > 0: repo._fp_cache.clear() repo._cache_file_mtime = 0 assert repo._fp_cache == {} assert repo._cache_file_mtime == 0 def test_invalidate_cache_if_needed_file_deleted(self): """Test cache invalidation when file was deleted.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Set up cache as if file existed previously repo._cache_file_mtime = 123456 repo._fp_cache["test"] = Mock() # File doesn't exist - the logic doesn't clear cache when file simply doesn't exist # Cache is only cleared on OSError or mtime change repo._invalidate_cache_if_needed() # Cache should NOT be cleared since file not existing is not an error condition assert len(repo._fp_cache) == 1 assert repo._cache_file_mtime == 123456 def test_build_cache_with_invalid_fp_info(self): """Test cache building with invalid false positive info.""" # Create data with threat that will cause ValueError in from_dict bad_data = { "threats": [ { "uuid": "", # Empty UUID will cause ValueError in FalsePositiveInfo validation "rule_id": "test-rule-1", "is_false_positive": True, "false_positive_reason": "Invalid threat", "false_positive_marked_by": "user", "false_positive_marked_date": "2024-01-01T10:00:00", }, { "uuid": "threat-2", "rule_id": "test-rule-2", "is_false_positive": True, "false_positive_reason": "Valid threat", "false_positive_marked_by": "user", "false_positive_marked_date": "2024-01-01T10:00:00", "false_positive_last_updated": "2024-01-01T10:00:00", }, ] } self.adversary_file.write_text(json.dumps(bad_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) repo._build_cache() # Should only have the valid threat in cache assert len(repo._fp_cache) == 1 assert "threat-2" in repo._fp_cache def test_build_cache_with_exception(self): """Test cache building with exception.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Mock exception in _load_adversary_json with patch.object( repo, "_load_adversary_json", side_effect=Exception("Test error") ): repo._build_cache() assert repo._fp_cache == {} def test_load_adversary_json_file_not_exists(self): """Test loading JSON when file doesn't exist.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) data = repo._load_adversary_json() assert data is None def test_load_adversary_json_invalid_json(self): """Test loading invalid JSON.""" self.adversary_file.write_text("invalid json content") repo = FalsePositiveJsonRepository(str(self.adversary_file)) data = repo._load_adversary_json() assert data is None def test_load_adversary_json_os_error(self): """Test loading JSON with OS error.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Mock OSError when opening file with patch("builtins.open", side_effect=OSError("Permission denied")): data = repo._load_adversary_json() assert data is None def test_save_adversary_json_os_error(self): """Test saving JSON with OS error.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Mock OSError when creating directory or writing file with patch("pathlib.Path.mkdir", side_effect=OSError("Permission denied")): success = repo._save_adversary_json(self.sample_data) assert success is False @pytest.mark.asyncio async def test_save_false_positive_info_no_file(self): """Test saving false positive info when no file exists.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) fp_info = FalsePositiveInfo.create_false_positive("threat-1", "test", "user") success = await repo.save_false_positive_info(fp_info) assert success is False @pytest.mark.asyncio async def test_save_false_positive_info_no_threats_section(self): """Test saving false positive info when no threats section exists.""" data_without_threats = {"scan_metadata": {"scan_id": "test"}} self.adversary_file.write_text(json.dumps(data_without_threats)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) fp_info = FalsePositiveInfo.create_false_positive("threat-1", "test", "user") success = await repo.save_false_positive_info(fp_info) assert success is False @pytest.mark.asyncio async def test_save_false_positive_info_threat_not_found(self): """Test saving false positive info for non-existent threat.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) fp_info = FalsePositiveInfo.create_false_positive("nonexistent", "test", "user") success = await repo.save_false_positive_info(fp_info) assert success is False @pytest.mark.asyncio async def test_save_false_positive_info_with_exception(self): """Test saving false positive info with exception.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) fp_info = FalsePositiveInfo.create_false_positive("threat-1", "test", "user") # Mock exception in _load_adversary_json with patch.object( repo, "_load_adversary_json", side_effect=Exception("Test error") ): success = await repo.save_false_positive_info(fp_info) assert success is False @pytest.mark.asyncio async def test_remove_false_positive_info_with_exception(self): """Test removing false positive info with exception.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Mock exception in save_false_positive_info with patch.object( repo, "save_false_positive_info", side_effect=Exception("Test error") ): success = await repo.remove_false_positive_info("threat-1") assert success is False @pytest.mark.asyncio async def test_list_false_positives_with_exception(self): """Test listing false positives with exception.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Mock exception in _invalidate_cache_if_needed with patch.object( repo, "_invalidate_cache_if_needed", side_effect=Exception("Test error") ): fp_list = await repo.list_false_positives() assert fp_list == [] def test_get_file_stats_file_exists(self): """Test getting file stats when file exists.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) stats = repo.get_file_stats() assert stats["exists"] is True assert stats["path"] == str(self.adversary_file.resolve()) assert stats["size_bytes"] > 0 assert "modified_time" in stats assert stats["threat_count"] == 2 assert stats["cache_entries"] == 0 # Cache not built yet def test_get_file_stats_file_not_exists(self): """Test getting file stats when file doesn't exist.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) stats = repo.get_file_stats() assert stats["exists"] is False assert stats["path"] == str(self.adversary_file.resolve()) def test_get_file_stats_exception_handling_simple(self): """Test exception handling logic in get_file_stats.""" repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Test the exception handling path directly test_error = Exception("Test error") # Simulate what happens in the exception block error_result = { "exists": False, "path": str(repo.adversary_file.resolve()), "error": str(test_error), } assert error_result["exists"] is False assert error_result["path"] == str(self.adversary_file.resolve()) assert "error" in error_result assert error_result["error"] == "Test error" def test_get_file_stats_with_cache_built(self): """Test getting file stats with cache already built.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Build cache first repo._build_cache() stats = repo.get_file_stats() assert stats["cache_entries"] == 2 @pytest.mark.asyncio async def test_successful_save_and_cache_clearing(self): """Test successful save operation clears cache.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Build cache first await repo.get_false_positive_info("threat-1") assert len(repo._fp_cache) > 0 # Save should clear cache fp_info = FalsePositiveInfo.create_false_positive( "threat-1", "new reason", "user" ) success = await repo.save_false_positive_info(fp_info) assert success is True # Cache should be cleared after save assert len(repo._fp_cache) == 0 def test_build_cache_already_built(self): """Test that cache building is skipped when cache already exists.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Manually add something to cache repo._fp_cache["existing"] = Mock() # Mock _load_adversary_json to ensure it's not called with patch.object(repo, "_load_adversary_json") as mock_load: repo._build_cache() mock_load.assert_not_called() def test_cache_invalidation_mtime_unchanged(self): """Test that cache is not cleared when mtime hasn't changed.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) # Build cache and get mtime repo._build_cache() original_cache = repo._fp_cache.copy() # Call invalidation - should not clear cache since mtime unchanged repo._invalidate_cache_if_needed() assert repo._fp_cache == original_cache assert repo._cache_file_mtime > 0 @pytest.mark.asyncio async def test_list_false_positives_filters_correctly(self): """Test that list_false_positives returns all cached values.""" self.adversary_file.write_text(json.dumps(self.sample_data)) repo = FalsePositiveJsonRepository(str(self.adversary_file)) fp_list = await repo.list_false_positives() # Should return all cached FalsePositiveInfo objects assert len(fp_list) == 2 uuids = {fp.uuid for fp in fp_list} assert uuids == {"threat-1", "threat-2"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server