test_false_positive_json_repository.py•17.8 kB
"""Comprehensive tests for FalsePositiveJsonRepository."""
import json
import tempfile
from datetime import datetime
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
from adversary_mcp_server.domain.value_objects.false_positive_info import (
FalsePositiveInfo,
)
from adversary_mcp_server.infrastructure.false_positive_json_repository import (
FalsePositiveJsonRepository,
)
class TestFalsePositiveJsonRepository:
"""Test the FalsePositiveJsonRepository implementation."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = Path(tempfile.mkdtemp()).resolve()
self.adversary_file = self.temp_dir / "adversary.json"
# Create sample data
self.sample_data = {
"scan_metadata": {
"scan_id": "test-scan",
"timestamp": datetime.now().isoformat(),
},
"threats": [
{
"uuid": "threat-1",
"rule_id": "test-rule-1",
"rule_name": "Test Rule 1",
"description": "Test threat 1",
"severity": "high",
"is_false_positive": False,
},
{
"uuid": "threat-2",
"rule_id": "test-rule-2",
"rule_name": "Test Rule 2",
"description": "Test threat 2",
"severity": "medium",
"is_false_positive": True,
"false_positive_reason": "Test reason",
"false_positive_marked_by": "test-user",
"false_positive_marked_date": "2024-01-01T10:00:00",
"false_positive_last_updated": "2024-01-01T10:00:00",
},
],
}
def teardown_method(self):
"""Clean up test fixtures."""
import shutil
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
def test_initialization(self):
"""Test repository initialization."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
assert repo.adversary_file == self.adversary_file
assert repo._fp_cache == {}
assert repo._cache_file_mtime == 0
def test_file_exists_when_file_exists(self):
"""Test file_exists method when file exists."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
assert repo.file_exists() is True
def test_file_exists_when_file_does_not_exist(self):
"""Test file_exists method when file does not exist."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
assert repo.file_exists() is False
def test_get_adversary_file_path(self):
"""Test getting adversary file path."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
assert repo.get_adversary_file_path() == str(self.adversary_file.resolve())
@pytest.mark.asyncio
async def test_get_false_positive_info_from_cache(self):
"""Test retrieving false positive info with caching."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# First call should build cache
fp_info = await repo.get_false_positive_info("threat-2")
assert fp_info is not None
assert fp_info.uuid == "threat-2"
assert fp_info.is_false_positive is True
# Cache should be populated
assert len(repo._fp_cache) == 2
assert "threat-2" in repo._fp_cache
@pytest.mark.asyncio
async def test_get_false_positive_info_nonexistent(self):
"""Test retrieving false positive info for nonexistent threat."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
fp_info = await repo.get_false_positive_info("nonexistent")
assert fp_info is None
@pytest.mark.asyncio
async def test_get_false_positive_info_no_file(self):
"""Test retrieving false positive info when file doesn't exist."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
fp_info = await repo.get_false_positive_info("threat-1")
assert fp_info is None
@pytest.mark.asyncio
async def test_get_false_positive_info_with_exception(self):
"""Test retrieving false positive info with exception handling."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Mock an exception in _invalidate_cache_if_needed
with patch.object(
repo, "_invalidate_cache_if_needed", side_effect=Exception("Test error")
):
fp_info = await repo.get_false_positive_info("threat-1")
assert fp_info is None
def test_invalidate_cache_if_needed_file_exists(self):
"""Test cache invalidation when file exists and is modified."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Build initial cache
repo._build_cache()
initial_cache = repo._fp_cache.copy()
initial_mtime = repo._cache_file_mtime
# Modify file
import time
time.sleep(0.1) # Ensure different mtime
self.adversary_file.write_text(json.dumps(self.sample_data))
# Cache should be invalidated
repo._invalidate_cache_if_needed()
assert repo._fp_cache == {}
assert repo._cache_file_mtime == 0
def test_invalidate_cache_if_needed_file_not_accessible_simple(self):
"""Test cache invalidation with OSError in different scenario."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Set up cache state
repo._cache_file_mtime = 123456
repo._fp_cache["test"] = Mock()
# Test the OSError handling path
try:
raise OSError("Test error")
except OSError:
if repo._cache_file_mtime > 0:
repo._fp_cache.clear()
repo._cache_file_mtime = 0
assert repo._fp_cache == {}
assert repo._cache_file_mtime == 0
def test_invalidate_cache_if_needed_file_deleted(self):
"""Test cache invalidation when file was deleted."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Set up cache as if file existed previously
repo._cache_file_mtime = 123456
repo._fp_cache["test"] = Mock()
# File doesn't exist - the logic doesn't clear cache when file simply doesn't exist
# Cache is only cleared on OSError or mtime change
repo._invalidate_cache_if_needed()
# Cache should NOT be cleared since file not existing is not an error condition
assert len(repo._fp_cache) == 1
assert repo._cache_file_mtime == 123456
def test_build_cache_with_invalid_fp_info(self):
"""Test cache building with invalid false positive info."""
# Create data with threat that will cause ValueError in from_dict
bad_data = {
"threats": [
{
"uuid": "", # Empty UUID will cause ValueError in FalsePositiveInfo validation
"rule_id": "test-rule-1",
"is_false_positive": True,
"false_positive_reason": "Invalid threat",
"false_positive_marked_by": "user",
"false_positive_marked_date": "2024-01-01T10:00:00",
},
{
"uuid": "threat-2",
"rule_id": "test-rule-2",
"is_false_positive": True,
"false_positive_reason": "Valid threat",
"false_positive_marked_by": "user",
"false_positive_marked_date": "2024-01-01T10:00:00",
"false_positive_last_updated": "2024-01-01T10:00:00",
},
]
}
self.adversary_file.write_text(json.dumps(bad_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
repo._build_cache()
# Should only have the valid threat in cache
assert len(repo._fp_cache) == 1
assert "threat-2" in repo._fp_cache
def test_build_cache_with_exception(self):
"""Test cache building with exception."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Mock exception in _load_adversary_json
with patch.object(
repo, "_load_adversary_json", side_effect=Exception("Test error")
):
repo._build_cache()
assert repo._fp_cache == {}
def test_load_adversary_json_file_not_exists(self):
"""Test loading JSON when file doesn't exist."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
data = repo._load_adversary_json()
assert data is None
def test_load_adversary_json_invalid_json(self):
"""Test loading invalid JSON."""
self.adversary_file.write_text("invalid json content")
repo = FalsePositiveJsonRepository(str(self.adversary_file))
data = repo._load_adversary_json()
assert data is None
def test_load_adversary_json_os_error(self):
"""Test loading JSON with OS error."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Mock OSError when opening file
with patch("builtins.open", side_effect=OSError("Permission denied")):
data = repo._load_adversary_json()
assert data is None
def test_save_adversary_json_os_error(self):
"""Test saving JSON with OS error."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Mock OSError when creating directory or writing file
with patch("pathlib.Path.mkdir", side_effect=OSError("Permission denied")):
success = repo._save_adversary_json(self.sample_data)
assert success is False
@pytest.mark.asyncio
async def test_save_false_positive_info_no_file(self):
"""Test saving false positive info when no file exists."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
fp_info = FalsePositiveInfo.create_false_positive("threat-1", "test", "user")
success = await repo.save_false_positive_info(fp_info)
assert success is False
@pytest.mark.asyncio
async def test_save_false_positive_info_no_threats_section(self):
"""Test saving false positive info when no threats section exists."""
data_without_threats = {"scan_metadata": {"scan_id": "test"}}
self.adversary_file.write_text(json.dumps(data_without_threats))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
fp_info = FalsePositiveInfo.create_false_positive("threat-1", "test", "user")
success = await repo.save_false_positive_info(fp_info)
assert success is False
@pytest.mark.asyncio
async def test_save_false_positive_info_threat_not_found(self):
"""Test saving false positive info for non-existent threat."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
fp_info = FalsePositiveInfo.create_false_positive("nonexistent", "test", "user")
success = await repo.save_false_positive_info(fp_info)
assert success is False
@pytest.mark.asyncio
async def test_save_false_positive_info_with_exception(self):
"""Test saving false positive info with exception."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
fp_info = FalsePositiveInfo.create_false_positive("threat-1", "test", "user")
# Mock exception in _load_adversary_json
with patch.object(
repo, "_load_adversary_json", side_effect=Exception("Test error")
):
success = await repo.save_false_positive_info(fp_info)
assert success is False
@pytest.mark.asyncio
async def test_remove_false_positive_info_with_exception(self):
"""Test removing false positive info with exception."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Mock exception in save_false_positive_info
with patch.object(
repo, "save_false_positive_info", side_effect=Exception("Test error")
):
success = await repo.remove_false_positive_info("threat-1")
assert success is False
@pytest.mark.asyncio
async def test_list_false_positives_with_exception(self):
"""Test listing false positives with exception."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Mock exception in _invalidate_cache_if_needed
with patch.object(
repo, "_invalidate_cache_if_needed", side_effect=Exception("Test error")
):
fp_list = await repo.list_false_positives()
assert fp_list == []
def test_get_file_stats_file_exists(self):
"""Test getting file stats when file exists."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
stats = repo.get_file_stats()
assert stats["exists"] is True
assert stats["path"] == str(self.adversary_file.resolve())
assert stats["size_bytes"] > 0
assert "modified_time" in stats
assert stats["threat_count"] == 2
assert stats["cache_entries"] == 0 # Cache not built yet
def test_get_file_stats_file_not_exists(self):
"""Test getting file stats when file doesn't exist."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
stats = repo.get_file_stats()
assert stats["exists"] is False
assert stats["path"] == str(self.adversary_file.resolve())
def test_get_file_stats_exception_handling_simple(self):
"""Test exception handling logic in get_file_stats."""
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Test the exception handling path directly
test_error = Exception("Test error")
# Simulate what happens in the exception block
error_result = {
"exists": False,
"path": str(repo.adversary_file.resolve()),
"error": str(test_error),
}
assert error_result["exists"] is False
assert error_result["path"] == str(self.adversary_file.resolve())
assert "error" in error_result
assert error_result["error"] == "Test error"
def test_get_file_stats_with_cache_built(self):
"""Test getting file stats with cache already built."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Build cache first
repo._build_cache()
stats = repo.get_file_stats()
assert stats["cache_entries"] == 2
@pytest.mark.asyncio
async def test_successful_save_and_cache_clearing(self):
"""Test successful save operation clears cache."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Build cache first
await repo.get_false_positive_info("threat-1")
assert len(repo._fp_cache) > 0
# Save should clear cache
fp_info = FalsePositiveInfo.create_false_positive(
"threat-1", "new reason", "user"
)
success = await repo.save_false_positive_info(fp_info)
assert success is True
# Cache should be cleared after save
assert len(repo._fp_cache) == 0
def test_build_cache_already_built(self):
"""Test that cache building is skipped when cache already exists."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Manually add something to cache
repo._fp_cache["existing"] = Mock()
# Mock _load_adversary_json to ensure it's not called
with patch.object(repo, "_load_adversary_json") as mock_load:
repo._build_cache()
mock_load.assert_not_called()
def test_cache_invalidation_mtime_unchanged(self):
"""Test that cache is not cleared when mtime hasn't changed."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
# Build cache and get mtime
repo._build_cache()
original_cache = repo._fp_cache.copy()
# Call invalidation - should not clear cache since mtime unchanged
repo._invalidate_cache_if_needed()
assert repo._fp_cache == original_cache
assert repo._cache_file_mtime > 0
@pytest.mark.asyncio
async def test_list_false_positives_filters_correctly(self):
"""Test that list_false_positives returns all cached values."""
self.adversary_file.write_text(json.dumps(self.sample_data))
repo = FalsePositiveJsonRepository(str(self.adversary_file))
fp_list = await repo.list_false_positives()
# Should return all cached FalsePositiveInfo objects
assert len(fp_list) == 2
uuids = {fp.uuid for fp in fp_list}
assert uuids == {"threat-1", "threat-2"}