Skip to main content
Glama

Adversary MCP Server

by brettbergin
false_positive_manager.py13.9 kB
"""False positive management for tracking and suppressing vulnerability findings.""" import json from datetime import datetime from pathlib import Path from typing import Any from ..logger import get_logger logger = get_logger("false_positive_manager") class FalsePositiveManager: """Manager for tracking and handling false positive vulnerability findings. This class manages false positives by storing them directly within .adversary.json files alongside the threats they represent. """ def __init__(self, adversary_file_path: str): """Initialize false positive manager. Args: adversary_file_path: Path to the .adversary.json file to manage. """ self.adversary_file = Path(adversary_file_path) # Performance optimization: Cache for false positive lookups self._fp_cache: dict[str, dict[str, Any] | None] = {} self._cache_file_mtime: float = 0 def _invalidate_cache_if_needed(self) -> None: """Invalidate cache if .adversary.json file was modified.""" try: if self.adversary_file.exists(): current_mtime = self.adversary_file.stat().st_mtime if current_mtime != self._cache_file_mtime: logger.debug( f"Cache invalidated: {self.adversary_file} was modified" ) self._fp_cache.clear() self._cache_file_mtime = 0 except OSError: # File might have been deleted or is inaccessible if self._cache_file_mtime > 0: logger.debug( f"Cache invalidated: {self.adversary_file} is no longer accessible" ) self._fp_cache.clear() self._cache_file_mtime = 0 def _build_false_positive_cache(self) -> None: """Build cache of all false positive UUIDs and their metadata.""" if self._fp_cache: return # Cache already built and valid logger.debug("Building false positive cache...") start_time = datetime.now() # Load from .adversary.json file in project root try: if self.adversary_file.exists(): # Track file modification time for cache invalidation self._cache_file_mtime = self.adversary_file.stat().st_mtime data = self._load_adversary_json() if data and "threats" in data: for threat in data["threats"]: threat_uuid = threat.get("uuid") if threat_uuid and threat.get("is_false_positive"): self._fp_cache[threat_uuid] = { "uuid": threat_uuid, "reason": threat.get("false_positive_reason", ""), "marked_date": threat.get( "false_positive_marked_date", "" ), "last_updated": threat.get( "false_positive_last_updated", "" ), "marked_by": threat.get( "false_positive_marked_by", "system" ), "source": "project", } except Exception as e: logger.warning(f"Failed to load .adversary.json for cache: {e}") build_time = (datetime.now() - start_time).total_seconds() logger.info( f"False positive cache built: {len(self._fp_cache)} entries in {build_time:.3f}s" ) def _load_adversary_json(self) -> dict[str, Any] | None: """Load and parse the .adversary.json file. Returns: Parsed JSON data or None if file cannot be loaded """ if not self.adversary_file.exists(): return None try: with open(self.adversary_file, encoding="utf-8") as f: return json.load(f) except (OSError, json.JSONDecodeError) as e: logger.warning( f"Failed to load .adversary.json file {self.adversary_file}: {e}" ) return None def _save_adversary_json(self, data: dict[str, Any]) -> bool: """Save data to the .adversary.json file. Args: data: Data to save Returns: True if saved successfully, False otherwise """ try: with open(self.adversary_file, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) return True except OSError as e: logger.error( f"Failed to save .adversary.json file {self.adversary_file}: {e}" ) return False def get_false_positive_details(self, finding_uuid: str) -> dict[str, Any] | None: """Get complete false positive details for a finding. Args: finding_uuid: UUID of the finding to check Returns: False positive metadata dict if marked, None otherwise """ # Use cached lookup for performance self._invalidate_cache_if_needed() self._build_false_positive_cache() return self._fp_cache.get(finding_uuid) def mark_false_positive( self, finding_uuid: str, reason: str = "", marked_by: str = "user", ) -> bool: """Mark a finding as a false positive in .adversary.json file. Args: finding_uuid: UUID of the finding to mark reason: Reason for marking as false positive marked_by: Who marked it as false positive Returns: True if marked successfully, False if finding not found """ # Ensure cache is up-to-date before loading data self._invalidate_cache_if_needed() # Load .adversary.json data data = self._load_adversary_json() if not data or "threats" not in data: logger.warning(f"No .adversary.json file found for {finding_uuid}") return False # Find and update the threat updated = False for threat in data["threats"]: if threat.get("uuid") == finding_uuid: # Create or update false positive fields current_time = datetime.now().isoformat() threat["is_false_positive"] = True threat["false_positive_reason"] = reason threat["false_positive_marked_by"] = marked_by threat["false_positive_last_updated"] = current_time # Set marked_date if not already set if not threat.get("false_positive_marked_date"): threat["false_positive_marked_date"] = current_time if self._save_adversary_json(data): updated = True # Invalidate cache since we modified the data self._fp_cache.clear() self._cache_file_mtime = 0 adversary_file_abs = str(self.adversary_file.resolve()) logger.info( f"Marked threat {finding_uuid} as false positive in {adversary_file_abs}" ) else: adversary_file_abs = str(self.adversary_file.resolve()) logger.error( f"Failed to save false positive update to {adversary_file_abs}" ) break if not updated: logger.warning(f"Threat {finding_uuid} not found in .adversary.json") return updated def unmark_false_positive(self, finding_uuid: str) -> bool: """Remove false positive marking from a finding. Args: finding_uuid: UUID of the finding to unmark Returns: True if finding was unmarked, False if not found """ # Ensure cache is up-to-date before loading data self._invalidate_cache_if_needed() # Load .adversary.json file data = self._load_adversary_json() if not data or "threats" not in data: logger.warning( f"No .adversary.json file found for unmarking {finding_uuid}" ) return False updated = False for threat in data["threats"]: if threat.get("uuid") == finding_uuid and threat.get("is_false_positive"): # Remove false positive marking threat["is_false_positive"] = False threat["false_positive_reason"] = None threat["false_positive_marked_date"] = None threat["false_positive_last_updated"] = None threat["false_positive_marked_by"] = None if self._save_adversary_json(data): updated = True # Invalidate cache since we modified the data self._fp_cache.clear() self._cache_file_mtime = 0 adversary_file_abs = str(self.adversary_file.resolve()) logger.info( f"Unmarked threat {finding_uuid} as false positive in {adversary_file_abs}" ) else: adversary_file_abs = str(self.adversary_file.resolve()) logger.error( f"Failed to save false positive removal to {adversary_file_abs}" ) break return updated def is_false_positive(self, finding_uuid: str) -> bool: """Check if a finding is marked as false positive. Args: finding_uuid: UUID of the finding to check Returns: True if marked as false positive, False otherwise """ return self.get_false_positive_details(finding_uuid) is not None def get_false_positives(self) -> list[dict[str, Any]]: """Get all false positive findings from .adversary.json file. Returns: List of false positive findings """ false_positives = [] # Ensure cache is up-to-date before loading data self._invalidate_cache_if_needed() # Get false positives from .adversary.json file data = self._load_adversary_json() if data and "threats" in data: for threat in data["threats"]: if threat.get("is_false_positive"): fp_data = { "uuid": threat.get("uuid"), "reason": threat.get("false_positive_reason", ""), "marked_date": threat.get("false_positive_marked_date", ""), "last_updated": threat.get("false_positive_last_updated", ""), "marked_by": threat.get("false_positive_marked_by", "system"), "source": "project", "file_source": str(self.adversary_file), } false_positives.append(fp_data) return false_positives def get_false_positive_uuids(self) -> set[str]: """Get set of all false positive UUIDs for quick lookup. Returns: Set of false positive UUIDs """ false_positives = self.get_false_positives() return {fp["uuid"] for fp in false_positives} def filter_false_positives(self, threats: list) -> list: """Filter out false positives from a list of threat matches. Args: threats: List of ThreatMatch objects Returns: List of threats with false positives filtered out """ false_positive_uuids = self.get_false_positive_uuids() filtered_threats = [] for threat in threats: if hasattr(threat, "uuid") and threat.uuid in false_positive_uuids: # Mark as false positive but keep in results for transparency if hasattr(threat, "is_false_positive"): threat.is_false_positive = True filtered_threats.append(threat) return filtered_threats def clear_all_false_positives(self) -> None: """Clear all false positive markings from .adversary.json file.""" # Ensure cache is up-to-date before loading data self._invalidate_cache_if_needed() # Clear from .adversary.json file data = self._load_adversary_json() if not data or "threats" not in data: logger.info("No .adversary.json file found to clear false positives from") return updated = False for threat in data["threats"]: if threat.get("is_false_positive"): threat["is_false_positive"] = False threat["false_positive_reason"] = None threat["false_positive_marked_date"] = None threat["false_positive_last_updated"] = None threat["false_positive_marked_by"] = None updated = True if updated: self._save_adversary_json(data) adversary_file_abs = str(self.adversary_file.resolve()) logger.info(f"Cleared false positives from {adversary_file_abs}") # Invalidate cache since we cleared all data self._fp_cache.clear() self._cache_file_mtime = 0 def export_false_positives(self, output_path: Path) -> None: """Export false positives to a file. Args: output_path: Path to export file """ false_positives = self.get_false_positives() data = {"false_positives": false_positives, "version": "2.0"} with open(output_path, "w") as f: json.dump(data, f, indent=2)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server