Skip to main content
Glama

Bug Bounty MCP Server

by SlanyCukr
deduplication.py8.92 kB
"""Deduplication utilities for bug bounty findings. This module provides functionality to deduplicate findings using hash keys and track duplicate statistics across various finding types. """ import hashlib import logging from typing import Any logger = logging.getLogger(__name__) class FindingDeduplicator: """Handles deduplication of bug bounty findings using hash-based keys.""" def __init__(self): """Initialize the deduplicator with empty tracking.""" self.seen_hashes: set[str] = set() self.duplicate_stats: dict[str, Any] = { "total_processed": 0, "duplicates_found": 0, "unique_findings": 0, "by_type": {}, } def _generate_hash_key( self, finding_type: str, host_or_url: str, evidence_key: str ) -> str: """Generate a hash key for deduplication. Args: finding_type: Type of finding (subdomain, port, vuln, endpoint, param) host_or_url: Host or URL associated with the finding evidence_key: Key evidence that identifies the finding Returns: SHA256 hash string for deduplication """ # Normalize inputs for consistent hashing normalized_host = host_or_url.lower().strip() normalized_type = finding_type.lower().strip() normalized_evidence = str(evidence_key).lower().strip() # Create composite key composite_key = f"{normalized_host}|{normalized_type}|{normalized_evidence}" # Generate SHA256 hash return hashlib.sha256(composite_key.encode("utf-8")).hexdigest() def _extract_finding_components( self, finding: dict[str, Any] ) -> tuple[str, str, str]: """Extract components needed for hash generation from a finding. Args: finding: Dictionary containing finding data Returns: Tuple of (finding_type, host_or_url, evidence_key) """ # Determine finding type finding_type = finding.get("type", "unknown") if not finding_type and "tool" in finding: # Infer type from tool tool = finding["tool"].lower() if tool in ["subfinder", "amass", "dnsenum"]: finding_type = "subdomain" elif tool in ["nmap", "masscan", "rustscan"]: finding_type = "port" elif tool in ["nuclei", "nikto", "sqlmap"]: finding_type = "vulnerability" elif tool in ["ffuf", "gobuster", "dirb", "feroxbuster"]: finding_type = "endpoint" elif tool in ["arjun", "paramspider"]: finding_type = "parameter" else: finding_type = "unknown" # Extract host/URL host_or_url = ( finding.get("host") or finding.get("url") or finding.get("target") or finding.get("domain") or "unknown" ) # Extract evidence key based on finding type evidence_key = "" if finding_type == "subdomain": evidence_key = finding.get("subdomain", finding.get("host", "")) elif finding_type == "port": port = finding.get("port", "") service = finding.get("service", "") evidence_key = f"{port}:{service}" elif finding_type == "vulnerability": vuln_id = finding.get("template_id", finding.get("id", "")) vuln_name = finding.get("name", finding.get("title", "")) evidence_key = f"{vuln_id}:{vuln_name}" elif finding_type == "endpoint": path = finding.get("path", finding.get("uri", "")) status = finding.get("status_code", finding.get("status", "")) evidence_key = f"{path}:{status}" elif finding_type == "parameter": param_name = finding.get("parameter", finding.get("param", "")) param_type = finding.get("param_type", "") evidence_key = f"{param_name}:{param_type}" else: # Fallback: use any available identifying information evidence_key = ( finding.get("name") or finding.get("title") or finding.get("description", "")[:50] or str(finding)[:50] ) return finding_type, host_or_url, evidence_key def is_duplicate(self, finding: dict[str, Any]) -> bool: """Check if a finding is a duplicate. Args: finding: Dictionary containing finding data Returns: True if the finding is a duplicate, False otherwise """ try: finding_type, host_or_url, evidence_key = self._extract_finding_components( finding ) hash_key = self._generate_hash_key(finding_type, host_or_url, evidence_key) self.duplicate_stats["total_processed"] += 1 # Track by type if finding_type not in self.duplicate_stats["by_type"]: self.duplicate_stats["by_type"][finding_type] = { "total": 0, "duplicates": 0, "unique": 0, } self.duplicate_stats["by_type"][finding_type]["total"] += 1 if hash_key in self.seen_hashes: self.duplicate_stats["duplicates_found"] += 1 self.duplicate_stats["by_type"][finding_type]["duplicates"] += 1 logger.debug(f"Duplicate found: {finding_type} - {hash_key[:16]}...") return True else: self.seen_hashes.add(hash_key) self.duplicate_stats["unique_findings"] += 1 self.duplicate_stats["by_type"][finding_type]["unique"] += 1 logger.debug(f"New finding: {finding_type} - {hash_key[:16]}...") return False except Exception as e: logger.error(f"Error checking for duplicate: {str(e)}") # On error, consider it not a duplicate to avoid losing data return False def deduplicate_findings( self, findings: list[dict[str, Any]] ) -> list[dict[str, Any]]: """Deduplicate a list of findings. Args: findings: List of finding dictionaries Returns: List of unique findings """ if not findings: return [] unique_findings = [] for finding in findings: if not self.is_duplicate(finding): unique_findings.append(finding) logger.info( "Deduplication complete: %d unique out of %d total", len(unique_findings), len(findings), ) return unique_findings def get_stats(self) -> dict[str, Any]: """Get deduplication statistics. Returns: Dictionary containing deduplication statistics """ return self.duplicate_stats.copy() def reset_stats(self) -> None: """Reset deduplication statistics and seen hashes.""" self.seen_hashes.clear() self.duplicate_stats = { "total_processed": 0, "duplicates_found": 0, "unique_findings": 0, "by_type": {}, } logger.info("Deduplication stats reset") # Global deduplicator instance for convenience _global_deduplicator = FindingDeduplicator() def deduplicate_findings(findings: list[dict[str, Any]]) -> list[dict[str, Any]]: """Convenience function to deduplicate findings using global instance. Args: findings: List of finding dictionaries Returns: List of unique findings """ return _global_deduplicator.deduplicate_findings(findings) def is_duplicate_finding(finding: dict[str, Any]) -> bool: """Convenience function to check if finding is duplicate using global instance. Args: finding: Dictionary containing finding data Returns: True if the finding is a duplicate, False otherwise """ return _global_deduplicator.is_duplicate(finding) def get_deduplication_stats() -> dict[str, Any]: """Get deduplication statistics from global instance. Returns: Dictionary containing deduplication statistics """ return _global_deduplicator.get_stats() def reset_deduplication_stats() -> None: """Reset deduplication statistics in global instance.""" _global_deduplicator.reset_stats() def create_finding_hash(finding_type: str, host_or_url: str, evidence_key: str) -> str: """Create a hash for manual deduplication. Args: finding_type: Type of finding host_or_url: Host or URL evidence_key: Evidence key Returns: SHA256 hash string """ return _global_deduplicator._generate_hash_key( finding_type, host_or_url, evidence_key )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SlanyCukr/bugbounty-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server