Skip to main content
Glama

Adversary MCP Server

by brettbergin
threat_match.py18.9 kB
"""ThreatMatch domain entity with domain logic.""" import uuid from dataclasses import dataclass, field from typing import Any from ..exceptions import ValidationError from ..utils import merge_scanner_names from ..value_objects.confidence_score import ConfidenceScore from ..value_objects.file_path import FilePath from ..value_objects.severity_level import SeverityLevel @dataclass class ThreatMatch: """ Domain entity representing a detected security threat with rich behavior and validation. Encapsulates all information about a security vulnerability detection, including metadata, location information, and business logic for threat analysis and processing. """ # Core identification rule_id: str rule_name: str description: str category: str # Will be enhanced with domain categories later severity: SeverityLevel # Location information file_path: FilePath line_number: int column_number: int = 0 # Content and context code_snippet: str = "" function_name: str | None = None # Security metadata exploit_examples: list[str] = field(default_factory=list) remediation: str = "" references: list[str] = field(default_factory=list) cwe_id: str | None = None owasp_category: str | None = None # Analysis metadata confidence: ConfidenceScore = field(default_factory=lambda: ConfidenceScore(1.0)) source_scanner: str = "unknown" # Scanner source: "semgrep", "llm", "rules" is_false_positive: bool = False metadata: dict[str, Any] = field(default_factory=dict) # Session-aware metadata # Unique identification uuid: str = field(default_factory=lambda: str(uuid.uuid4())) def __post_init__(self): """Validate threat match after initialization.""" self._validate_threat() @classmethod def create_semgrep_threat( cls, rule_id: str, rule_name: str, description: str, category: str, severity: str, file_path: str, line_number: int, column_number: int = 0, code_snippet: str = "", **kwargs, ) -> "ThreatMatch": """Create a threat match from Semgrep detection.""" return cls( rule_id=rule_id, rule_name=rule_name, description=description, category=category, severity=SeverityLevel.from_string(severity), file_path=FilePath.from_string(file_path), line_number=line_number, column_number=column_number, code_snippet=code_snippet, source_scanner="semgrep", confidence=ConfidenceScore(0.9), # Semgrep is generally reliable **kwargs, ) @classmethod def create_llm_threat( cls, rule_id: str, rule_name: str, description: str, category: str, severity: str, file_path: str, line_number: int, confidence: float = 0.7, code_snippet: str = "", **kwargs, ) -> "ThreatMatch": """Create a threat match from LLM analysis.""" return cls( rule_id=rule_id, rule_name=rule_name, description=description, category=category, severity=SeverityLevel.from_string(severity), file_path=FilePath.from_string(file_path), line_number=line_number, code_snippet=code_snippet, source_scanner="llm", confidence=ConfidenceScore(confidence), **kwargs, ) @classmethod def create_rules_threat( cls, rule_id: str, rule_name: str, description: str, category: str, severity: str, file_path: str, line_number: int, **kwargs, ) -> "ThreatMatch": """Create a threat match from rules-based detection.""" return cls( rule_id=rule_id, rule_name=rule_name, description=description, category=category, severity=SeverityLevel.from_string(severity), file_path=FilePath.from_string(file_path), line_number=line_number, source_scanner="rules", confidence=ConfidenceScore(0.8), # Rules are generally reliable **kwargs, ) def _validate_threat(self) -> None: """Validate the threat match according to business rules.""" if not self.rule_id.strip(): raise ValidationError("Rule ID cannot be empty") if not self.rule_name.strip(): raise ValidationError("Rule name cannot be empty") if not self.description.strip(): raise ValidationError("Description cannot be empty") if self.line_number < 1: raise ValidationError("Line number must be positive") if self.column_number < 0: raise ValidationError("Column number cannot be negative") # Validate source scanner is known (allow combinations) valid_sources = {"semgrep", "llm", "rules", "unknown"} # Split combined sources and check each part source_parts = self.source_scanner.split("+") for part in source_parts: clean_part = part.strip() if clean_part not in valid_sources: raise ValidationError( f"Invalid source_scanner part: {clean_part}. Must be one of: {valid_sources}" ) def get_fingerprint(self) -> str: """ Generate a unique fingerprint for this finding. Used to identify the same logical finding across multiple scans to preserve UUIDs and false positive markings. Returns: Unique fingerprint string based on file_path and line_number (location-based) """ normalized_path = ( str(self.file_path.path.resolve()) if not self.file_path._is_virtual else str(self.file_path) ) return f"{normalized_path}:{self.line_number}" def get_display_location(self) -> str: """Get human-readable location string.""" if self.column_number > 0: return f"{self.file_path.name}:{self.line_number}:{self.column_number}" else: return f"{self.file_path.name}:{self.line_number}" def get_full_location(self) -> str: """Get full path location string.""" if self.column_number > 0: return f"{self.file_path}:{self.line_number}:{self.column_number}" else: return f"{self.file_path}:{self.line_number}" def is_high_severity(self) -> bool: """Check if this threat is high or critical severity.""" return self.severity.is_actionable() def is_confident(self, threshold: ConfidenceScore | None = None) -> bool: """Check if this threat meets confidence threshold.""" if threshold is None: threshold = ConfidenceScore.default_threshold() return self.confidence.meets_threshold(threshold) def is_actionable( self, confidence_threshold: ConfidenceScore | None = None ) -> bool: """Check if this threat is actionable (high severity and confident).""" return self.is_high_severity() and self.is_confident(confidence_threshold) def should_be_reported( self, severity_threshold: SeverityLevel | None = None ) -> bool: """Check if this threat should be included in reports.""" if self.is_false_positive: return False if severity_threshold is None: severity_threshold = SeverityLevel.get_default_threshold() return self.severity.meets_threshold(severity_threshold) def get_risk_score(self) -> float: """Calculate a numeric risk score for prioritization (0.0-10.0).""" # Base score from severity severity_weight = self.severity.get_priority_weight() # 0.0-1.0 # Confidence adjustment confidence_weight = self.confidence.get_decimal() # 0.0-1.0 # Source reliability adjustment source_weights = { "semgrep": 1.0, "rules": 0.9, "llm": 0.8, "unknown": 0.5, } source_weight = source_weights.get(self.source_scanner, 0.5) # Calculate final score (0.0-10.0) risk_score = severity_weight * confidence_weight * source_weight * 10.0 return round(risk_score, 2) def get_priority_category(self) -> str: """Get priority category for triage.""" risk_score = self.get_risk_score() if risk_score >= 8.0: return "Critical" elif risk_score >= 6.0: return "High" elif risk_score >= 4.0: return "Medium" elif risk_score >= 2.0: return "Low" else: return "Informational" def mark_as_false_positive(self, reason: str = "") -> "ThreatMatch": """Create a new instance marked as false positive.""" # Create a copy with false positive marking new_threat = ThreatMatch( rule_id=self.rule_id, rule_name=self.rule_name, description=self.description, category=self.category, severity=self.severity, file_path=self.file_path, line_number=self.line_number, column_number=self.column_number, code_snippet=self.code_snippet, function_name=self.function_name, exploit_examples=self.exploit_examples.copy(), remediation=self.remediation, references=self.references.copy(), cwe_id=self.cwe_id, owasp_category=self.owasp_category, confidence=self.confidence, source_scanner=self.source_scanner, is_false_positive=True, uuid=self.uuid, # Keep same UUID ) return new_threat def update_confidence(self, new_confidence: ConfidenceScore) -> "ThreatMatch": """Create a new instance with updated confidence score.""" return ThreatMatch( rule_id=self.rule_id, rule_name=self.rule_name, description=self.description, category=self.category, severity=self.severity, file_path=self.file_path, line_number=self.line_number, column_number=self.column_number, code_snippet=self.code_snippet, function_name=self.function_name, exploit_examples=self.exploit_examples.copy(), remediation=self.remediation, references=self.references.copy(), cwe_id=self.cwe_id, owasp_category=self.owasp_category, confidence=new_confidence, source_scanner=self.source_scanner, is_false_positive=self.is_false_positive, uuid=self.uuid, # Keep same UUID ) def enhance_with_validation_result( self, validation_confidence: ConfidenceScore, validation_reason: str = "" ) -> "ThreatMatch": """Create enhanced version with validation results.""" # Combine original confidence with validation confidence combined_confidence = self.confidence.combine_with( validation_confidence, weight=0.7 ) return ThreatMatch( rule_id=self.rule_id, rule_name=self.rule_name, description=self.description, category=self.category, severity=self.severity, file_path=self.file_path, line_number=self.line_number, column_number=self.column_number, code_snippet=self.code_snippet, function_name=self.function_name, exploit_examples=self.exploit_examples.copy(), remediation=self.remediation + (f"\n\nValidation: {validation_reason}" if validation_reason else ""), references=self.references.copy(), cwe_id=self.cwe_id, owasp_category=self.owasp_category, confidence=combined_confidence, source_scanner=f"{self.source_scanner}+validation", is_false_positive=self.is_false_positive, uuid=self.uuid, # Keep same UUID ) def add_exploit_example(self, exploit: str) -> "ThreatMatch": """Create new instance with additional exploit example.""" if exploit in self.exploit_examples: return self # Already exists new_exploits = self.exploit_examples.copy() new_exploits.append(exploit) return ThreatMatch( rule_id=self.rule_id, rule_name=self.rule_name, description=self.description, category=self.category, severity=self.severity, file_path=self.file_path, line_number=self.line_number, column_number=self.column_number, code_snippet=self.code_snippet, function_name=self.function_name, exploit_examples=new_exploits, remediation=self.remediation, references=self.references.copy(), cwe_id=self.cwe_id, owasp_category=self.owasp_category, confidence=self.confidence, source_scanner=self.source_scanner, is_false_positive=self.is_false_positive, uuid=self.uuid, ) def is_similar_to(self, other: "ThreatMatch", proximity_threshold: int = 5) -> bool: """Check if this threat is similar to another (for deduplication).""" if not isinstance(other, ThreatMatch): return False # Same file - required if self.file_path != other.file_path: return False # Same rule ID OR same category - at least one must match if self.rule_id != other.rule_id and self.category != other.category: return False # Lines are close together line_diff = abs(self.line_number - other.line_number) return line_diff <= proximity_threshold def merge_with(self, other: "ThreatMatch") -> "ThreatMatch": """Merge this threat with another similar threat.""" if not self.is_similar_to(other): raise ValueError("Cannot merge dissimilar threats") # Use the higher confidence best_confidence = max(self.confidence, other.confidence) # Use the higher severity best_severity = max(self.severity, other.severity) # Combine exploit examples combined_exploits = list(set(self.exploit_examples + other.exploit_examples)) # Combine references combined_references = list(set(self.references + other.references)) # Use better code snippet (longer one) best_snippet = ( self.code_snippet if len(self.code_snippet) > len(other.code_snippet) else other.code_snippet ) # Combine remediation combined_remediation = self.remediation if other.remediation and other.remediation not in self.remediation: combined_remediation += f"\n\n{other.remediation}" return ThreatMatch( rule_id=self.rule_id, rule_name=self.rule_name, description=self.description, category=self.category, severity=best_severity, file_path=self.file_path, line_number=min(self.line_number, other.line_number), # Use earlier line column_number=self.column_number, code_snippet=best_snippet, function_name=self.function_name or other.function_name, exploit_examples=combined_exploits, remediation=combined_remediation, references=combined_references, cwe_id=self.cwe_id or other.cwe_id, owasp_category=self.owasp_category or other.owasp_category, confidence=best_confidence, source_scanner=merge_scanner_names( self.source_scanner, other.source_scanner ), is_false_positive=self.is_false_positive and other.is_false_positive, uuid=self.uuid, # Keep original UUID ) def to_summary_dict(self) -> dict: """Convert to dictionary for logging/reporting.""" return { "uuid": self.uuid, "rule_id": self.rule_id, "rule_name": self.rule_name, "category": self.category, "severity": str(self.severity), "confidence": str(self.confidence), "location": self.get_display_location(), "risk_score": self.get_risk_score(), "priority": self.get_priority_category(), "source": self.source_scanner, "is_false_positive": self.is_false_positive, } def mark_false_positive(self) -> "ThreatMatch": """Create a copy marked as false positive.""" return self.mark_as_false_positive() def add_metadata(self, metadata_dict: dict[str, Any]) -> "ThreatMatch": """Add metadata to the threat match and return a new instance.""" new_metadata = {**self.metadata, **metadata_dict} return ThreatMatch( rule_id=self.rule_id, rule_name=self.rule_name, description=self.description, category=self.category, severity=self.severity, file_path=self.file_path, line_number=self.line_number, column_number=self.column_number, code_snippet=self.code_snippet, function_name=self.function_name, exploit_examples=self.exploit_examples.copy(), remediation=self.remediation, references=self.references.copy(), cwe_id=self.cwe_id, owasp_category=self.owasp_category, confidence=self.confidence, source_scanner=self.source_scanner, is_false_positive=self.is_false_positive, metadata=new_metadata, uuid=self.uuid, ) def to_detailed_dict(self) -> dict: """Convert to detailed dictionary for full reporting.""" return { **self.to_summary_dict(), "description": self.description, "file_path": str(self.file_path), "line_number": self.line_number, "column_number": self.column_number, "code_snippet": self.code_snippet, "function_name": self.function_name, "exploit_examples": self.exploit_examples, "remediation": self.remediation, "references": self.references, "cwe_id": self.cwe_id, "owasp_category": self.owasp_category, "fingerprint": self.get_fingerprint(), }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server