Skip to main content
Glama

Adversary MCP Server

by brettbergin
scan_engine.py109 kB
"""Enhanced scanner that combines Semgrep and LLM analysis for comprehensive security scanning.""" import asyncio import os import time from pathlib import Path from typing import Any from ..cache import CacheKey, CacheManager, CacheType, SerializableThreatMatch from ..config import get_app_cache_dir from ..config_manager import get_config_manager from ..credentials import CredentialManager, get_credential_manager from ..database.models import AdversaryDatabase from ..logger import get_logger from ..monitoring import MetricsCollector from ..resilience import ErrorHandler, ResilienceConfig from ..telemetry.integration import MetricsCollectionOrchestrator from ..telemetry.service import TelemetryService from .file_filter import FileFilter from .llm_scanner import LLMScanner from .llm_validator import LLMValidator from .semgrep_scanner import SemgrepScanner from .streaming_utils import StreamingFileReader, is_file_too_large from .types import Severity, ThreatMatch logger = get_logger("scan_engine") class EnhancedScanResult: """Result of enhanced scanning combining Semgrep and LLM analysis.""" def __init__( self, file_path: str, llm_threats: list[ThreatMatch], semgrep_threats: list[ThreatMatch], scan_metadata: dict[str, Any], validation_results: dict[str, Any] | None = None, llm_usage_stats: dict[str, Any] | None = None, ): """Initialize enhanced scan result. Args: file_path: Path to the scanned file llm_threats: Threats found by LLM analysis semgrep_threats: Threats found by Semgrep analysis scan_metadata: Metadata about the scan validation_results: Optional validation results from LLM validator llm_usage_stats: Optional LLM usage statistics (tokens, cost, etc.) """ self.file_path = file_path # Auto-detect language from file path self.language = self._detect_language_from_path(file_path) self.llm_threats = llm_threats self.semgrep_threats = semgrep_threats self.scan_metadata = scan_metadata self.validation_results = validation_results or {} self.llm_usage_stats = llm_usage_stats or self._create_default_usage_stats() # Combine and deduplicate threats self.all_threats = self._combine_threats() # Calculate statistics self.stats = self._calculate_stats() def _create_default_usage_stats(self) -> dict[str, Any]: """Create default LLM usage statistics structure. Returns: Dictionary with default usage statistics """ return { "analysis": { "total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0, "total_cost": 0.0, "currency": "USD", "api_calls": 0, "models_used": [], }, "validation": { "total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0, "total_cost": 0.0, "currency": "USD", "api_calls": 0, "models_used": [], }, "combined": { "total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0, "total_cost": 0.0, "currency": "USD", "api_calls": 0, "models_used": [], }, } def _detect_language_from_path(self, file_path: str) -> str: """Detect programming language from file extension using shared mapper. Args: file_path: Path to the file Returns: Programming language name (e.g., 'typescript', 'python', 'javascript') """ from .language_mapping import LanguageMapper return LanguageMapper.detect_language_from_extension(file_path) def _combine_threats(self) -> list[ThreatMatch]: """Combine and deduplicate threats from all sources. Returns: Combined list of unique threats """ combined = [] # Add Semgrep threats first (they're quite precise) for threat in self.semgrep_threats: combined.append(threat) # Add LLM threats that don't duplicate Semgrep findings for threat in self.llm_threats: # Check for similar threats (same line, similar category) is_duplicate = False for existing in combined: if ( abs(threat.line_number - existing.line_number) <= 2 and threat.category == existing.category ): is_duplicate = True break if not is_duplicate: combined.append(threat) # Sort by line number and severity combined.sort(key=lambda t: (t.line_number, t.severity.value)) return combined def _calculate_stats(self) -> dict[str, Any]: """Calculate scan statistics. Returns: Dictionary with scan statistics """ return { "total_threats": len(self.all_threats), "llm_threats": len(self.llm_threats), "semgrep_threats": len(self.semgrep_threats), "unique_threats": len(self.all_threats), "severity_counts": self._count_by_severity(), "category_counts": self._count_by_category(), "sources": { "llm_analysis": len(self.llm_threats) > 0, "semgrep_analysis": len(self.semgrep_threats) > 0, }, } def _count_by_severity(self) -> dict[str, int]: """Count threats by severity level.""" counts = {"low": 0, "medium": 0, "high": 0, "critical": 0} for threat in self.all_threats: counts[threat.severity.value] += 1 return counts def _count_by_category(self) -> dict[str, int]: """Count threats by category.""" counts = {} for threat in self.all_threats: # Handle both string categories and enum categories if hasattr(threat.category, "value"): category = threat.category.value else: category = str(threat.category) counts[category] = counts.get(category, 0) + 1 return counts def get_high_confidence_threats( self, min_confidence: float = 0.8 ) -> list[ThreatMatch]: """Get threats with high confidence scores. Args: min_confidence: Minimum confidence threshold Returns: List of high-confidence threats """ return [t for t in self.all_threats if t.confidence >= min_confidence] def get_critical_threats(self) -> list[ThreatMatch]: """Get critical severity threats. Returns: List of critical threats """ return [t for t in self.all_threats if t.severity == Severity.CRITICAL] def add_llm_usage(self, usage_type: str, cost_breakdown: dict[str, Any]) -> None: """Add LLM usage data to statistics. Args: usage_type: Type of usage ('analysis' or 'validation') cost_breakdown: Cost breakdown from PricingManager """ if usage_type not in self.llm_usage_stats: logger.warning(f"Unknown usage type: {usage_type}") return usage_section = self.llm_usage_stats[usage_type] tokens = cost_breakdown.get("tokens", {}) # Add to specific section usage_section["total_tokens"] += tokens.get("total_tokens", 0) usage_section["prompt_tokens"] += tokens.get("prompt_tokens", 0) usage_section["completion_tokens"] += tokens.get("completion_tokens", 0) usage_section["total_cost"] += cost_breakdown.get("total_cost", 0.0) usage_section["api_calls"] += 1 model = cost_breakdown.get("model") if model and model not in usage_section["models_used"]: usage_section["models_used"].append(model) # Update combined totals combined = self.llm_usage_stats["combined"] combined["total_tokens"] += tokens.get("total_tokens", 0) combined["prompt_tokens"] += tokens.get("prompt_tokens", 0) combined["completion_tokens"] += tokens.get("completion_tokens", 0) combined["total_cost"] += cost_breakdown.get("total_cost", 0.0) combined["api_calls"] += 1 if model and model not in combined["models_used"]: combined["models_used"].append(model) logger.debug( f"Added {usage_type} usage: {tokens.get('total_tokens', 0)} tokens, " f"${cost_breakdown.get('total_cost', 0.0):.6f}" ) def get_validation_summary(self) -> dict[str, Any]: """Get validation summary for this scan result. Returns: Dictionary with validation statistics and metadata """ # Check if validation was performed validation_enabled = self.scan_metadata.get("llm_validation_success", False) if not validation_enabled or not self.validation_results: return { "enabled": False, "total_findings_reviewed": 0, "legitimate_findings": 0, "false_positives_filtered": 0, "false_positive_rate": 0.0, "average_confidence": 0.0, "validation_errors": 0, "status": self.scan_metadata.get("llm_validation_reason", "disabled"), } # Calculate validation statistics from validation_results total_reviewed = len(self.validation_results) legitimate = sum(1 for v in self.validation_results.values() if v.is_legitimate) false_positives = total_reviewed - legitimate # Calculate average confidence avg_confidence = 0.0 if total_reviewed > 0: avg_confidence = ( sum(v.confidence for v in self.validation_results.values()) / total_reviewed ) # Count validation errors validation_errors = sum( 1 for v in self.validation_results.values() if v.validation_error ) return { "enabled": True, "total_findings_reviewed": total_reviewed, "legitimate_findings": legitimate, "false_positives_filtered": false_positives, "false_positive_rate": ( false_positives / total_reviewed if total_reviewed > 0 else 0.0 ), "average_confidence": round(avg_confidence, 3), "validation_errors": validation_errors, "status": "completed", } class ScanEngine: """Scan engine combining Semgrep and LLM analysis.""" def __init__( self, credential_manager: CredentialManager | None = None, cache_manager: CacheManager | None = None, metrics_collector: MetricsCollector | None = None, metrics_orchestrator: MetricsCollectionOrchestrator | None = None, enable_llm_analysis: bool = True, enable_semgrep_analysis: bool = True, enable_llm_validation: bool = True, ): """Initialize enhanced scanner. Args: credential_manager: Credential manager for configuration cache_manager: Optional cache manager for scan results metrics_collector: Optional legacy metrics collector for performance monitoring metrics_orchestrator: Optional telemetry orchestrator for comprehensive tracking enable_llm_analysis: Whether to enable LLM analysis enable_semgrep_analysis: Whether to enable Semgrep analysis enable_llm_validation: Whether to enable LLM validation of findings """ logger.info("=== Initializing ScanEngine ===") self.credential_manager = credential_manager or get_credential_manager() self.cache_manager = cache_manager self.metrics_collector = metrics_collector self.config_manager = get_config_manager() # Initialize telemetry system self.metrics_orchestrator = metrics_orchestrator if self.metrics_orchestrator is None: try: db = AdversaryDatabase() telemetry_service = TelemetryService(db) self.metrics_orchestrator = MetricsCollectionOrchestrator( telemetry_service ) logger.debug("Initialized telemetry system for scan engine") except Exception as e: logger.warning(f"Failed to initialize telemetry system: {e}") self.metrics_orchestrator = None logger.debug("Initialized core components") # Load configuration config = self.credential_manager.load_config() # Initialize cache manager if not provided (be robust to mocked configs) try: enable_caching_flag = bool(getattr(config, "enable_caching", True)) except Exception: enable_caching_flag = True if cache_manager is None and enable_caching_flag: cache_dir = get_app_cache_dir() # Use dynamic limits for cache configuration max_size_mb_num = self.config_manager.dynamic_limits.cache_max_size_mb max_age_hours_num = self.config_manager.dynamic_limits.cache_max_age_hours self.cache_manager = CacheManager( cache_dir=cache_dir, max_size_mb=max_size_mb_num, max_age_hours=max_age_hours_num, metrics_collector=self.metrics_collector, ) logger.info(f"Initialized cache manager for scan engine at {cache_dir}") # Initialize ErrorHandler for scan engine resilience resilience_config = ResilienceConfig( enable_circuit_breaker=True, failure_threshold=self.config_manager.dynamic_limits.circuit_breaker_failure_threshold, recovery_timeout_seconds=self.config_manager.dynamic_limits.recovery_timeout_seconds, enable_retry=True, max_retry_attempts=self.config_manager.dynamic_limits.max_retry_attempts, base_delay_seconds=self.config_manager.dynamic_limits.retry_base_delay, enable_graceful_degradation=True, ) self.error_handler = ErrorHandler(resilience_config) logger.info("Initialized ErrorHandler for scan engine resilience") # Set analysis parameters self.enable_llm_analysis = enable_llm_analysis self.enable_semgrep_analysis = enable_semgrep_analysis self.enable_llm_validation = enable_llm_validation logger.info(f"LLM analysis enabled: {self.enable_llm_analysis}") logger.info(f"Semgrep analysis enabled: {self.enable_semgrep_analysis}") logger.info(f"LLM validation enabled: {self.enable_llm_validation}") # Initialize Semgrep scanner logger.debug("Initializing Semgrep scanner...") self.semgrep_scanner = SemgrepScanner( credential_manager=self.credential_manager, metrics_collector=self.metrics_collector, ) # Check if Semgrep scanning is available and enabled self.enable_semgrep_analysis = ( self.enable_semgrep_analysis and bool(getattr(config, "enable_semgrep_scanning", True)) and self.semgrep_scanner.is_available() ) logger.info(f"Semgrep analysis enabled: {self.enable_semgrep_analysis}") if not self.semgrep_scanner.is_available(): logger.warning( "Semgrep not available - install semgrep for enhanced analysis" ) # Initialize LLM analyzer if enabled - pass shared cache manager self.llm_analyzer = None if self.enable_llm_analysis: logger.debug("Initializing LLM analyzer...") self.llm_analyzer = LLMScanner( self.credential_manager, self.cache_manager, self.metrics_collector ) if not self.llm_analyzer.is_available(): logger.warning( "LLM analysis requested but not available - API key not configured" ) self.enable_llm_analysis = False else: logger.info("LLM analyzer initialized successfully") else: logger.debug("LLM analysis disabled") # Initialize LLM validator if enabled - pass shared cache manager self.llm_validator = None if self.enable_llm_validation: logger.debug("Initializing LLM validator...") self.llm_validator = LLMValidator( self.credential_manager, self.cache_manager, self.metrics_collector ) logger.info("LLM validator initialized successfully") else: logger.debug("LLM validation disabled") logger.info("=== ScanEngine initialization complete ===") def _detect_language(self, file_path: Path) -> str: """Detect programming language from file extension using language mapping. Args: file_path: Path to the file Returns: Programming language name (e.g., 'python', 'javascript') or 'generic' for unknown """ from .language_mapping import LanguageMapper detected_language = LanguageMapper.detect_language_from_extension(file_path) logger.debug(f"Language detected for {file_path}: {detected_language}") return detected_language def _filter_by_severity( self, threats: list[ThreatMatch], min_severity: Severity, ) -> list[ThreatMatch]: """Filter threats by minimum severity level. Args: threats: List of threats to filter min_severity: Minimum severity level Returns: Filtered list of threats """ logger.debug( f"Filtering {len(threats)} threats by severity >= {min_severity.value}" ) severity_order = [ Severity.LOW, Severity.MEDIUM, Severity.HIGH, Severity.CRITICAL, ] min_index = severity_order.index(min_severity) filtered = [ threat for threat in threats if severity_order.index(threat.severity) >= min_index ] logger.debug( f"Severity filtering result: {len(threats)} -> {len(filtered)} threats" ) return filtered def _apply_validation_filter( self, threats: list[ThreatMatch], validation_results: dict[str, Any], confidence_threshold: float = 0.7, ) -> list[ThreatMatch]: """Filter threats using validation results without relying on external return values. Keeps threats that are either not present in validation results or explicitly marked legitimate with sufficient confidence. """ if not threats or not validation_results: return threats filtered: list[ThreatMatch] = [] for threat in threats: threat_uuid = getattr(threat, "uuid", None) if threat_uuid is None: filtered.append(threat) continue validation = validation_results.get(threat_uuid) if validation is None: filtered.append(threat) continue is_legitimate = getattr(validation, "is_legitimate", True) confidence = getattr(validation, "confidence", 1.0) if is_legitimate and confidence >= confidence_threshold: filtered.append(threat) return filtered def get_scanner_stats(self) -> dict[str, Any]: """Get statistics about the enhanced scanner. Returns: Dictionary with scanner statistics """ logger.debug("Generating scanner statistics...") stats = { "llm_analyzer_available": self.llm_analyzer is not None and self.llm_analyzer.is_available(), "semgrep_scanner_available": self.semgrep_scanner.is_available(), "llm_analysis_enabled": self.enable_llm_analysis, "semgrep_analysis_enabled": self.enable_semgrep_analysis, "llm_stats": ( self.llm_analyzer.get_analysis_stats() if self.llm_analyzer else None ), } logger.debug( f"Scanner stats generated - " f"LLM: {stats['llm_analyzer_available']}, " f"Semgrep: {stats['semgrep_scanner_available']}" ) return stats def set_llm_enabled(self, enabled: bool) -> None: """Enable or disable LLM analysis. Args: enabled: Whether to enable LLM analysis """ logger.info(f"Setting LLM analysis enabled: {enabled}") if enabled and not self.llm_analyzer: logger.debug("Creating new LLM analyzer...") self.llm_analyzer = LLMScanner( self.credential_manager, self.cache_manager, self.metrics_collector ) old_state = self.enable_llm_analysis self.enable_llm_analysis = enabled and ( self.llm_analyzer is not None and self.llm_analyzer.is_available() ) if old_state != self.enable_llm_analysis: logger.info( f"LLM analysis state changed: {old_state} -> {self.enable_llm_analysis}" ) else: logger.debug("LLM analysis state unchanged") def reload_configuration(self) -> None: """Reload configuration and reinitialize components.""" logger.info("Reloading scanner configuration...") # Reinitialize LLM analyzer with new configuration if self.enable_llm_analysis: logger.debug("Reinitializing LLM analyzer...") self.llm_analyzer = LLMScanner( self.credential_manager, self.cache_manager, self.metrics_collector ) if not self.llm_analyzer.is_available(): logger.warning( "LLM analysis disabled after reload - API key not configured" ) self.enable_llm_analysis = False else: logger.info("LLM analyzer reinitialized successfully") logger.info("Scanner configuration reload complete") def scan_code_sync( self, source_code: str, file_path: str, use_llm: bool = True, use_semgrep: bool = True, use_validation: bool = True, severity_threshold: Severity | None = None, ) -> EnhancedScanResult: """Synchronous wrapper for scan_code for CLI usage with auto-detected language.""" file_path_abs = str(Path(file_path).resolve()) logger.debug(f"Synchronous code scan wrapper called for: {file_path_abs}") import asyncio return asyncio.run( self.scan_code( source_code=source_code, file_path=file_path, use_llm=use_llm, use_semgrep=use_semgrep, use_validation=use_validation, severity_threshold=severity_threshold, ) ) def scan_directory_sync( self, directory_path: Path, recursive: bool = True, use_llm: bool = True, use_semgrep: bool = True, use_validation: bool = True, severity_threshold: Severity | None = None, ) -> list[EnhancedScanResult]: """Synchronous wrapper for scan_directory for CLI usage.""" directory_path_abs = str(Path(directory_path).resolve()) logger.debug( f"Synchronous directory scan wrapper called for: {directory_path_abs}" ) import asyncio return asyncio.run( self.scan_directory( directory_path=directory_path, recursive=recursive, use_llm=use_llm, use_semgrep=use_semgrep, use_validation=use_validation, severity_threshold=severity_threshold, ) ) def scan_file_sync( self, file_path: Path, use_llm: bool = True, use_semgrep: bool = True, use_validation: bool = True, severity_threshold: Severity | None = None, ) -> EnhancedScanResult: """Synchronous wrapper for scan_file for CLI usage with auto-detected language.""" file_path_abs = str(Path(file_path).resolve()) logger.debug(f"Synchronous file scan wrapper called for: {file_path_abs}") import asyncio return asyncio.run( self.scan_file( file_path=file_path, use_llm=use_llm, use_semgrep=use_semgrep, use_validation=use_validation, severity_threshold=severity_threshold, ) ) async def scan_code( self, source_code: str, file_path: str, use_llm: bool = True, use_semgrep: bool = True, use_validation: bool = True, severity_threshold: Severity | None = None, ) -> EnhancedScanResult: """Scan source code using Semgrep and LLM analysis with auto-detected language. Args: source_code: Source code to scan file_path: Path to the source file (used for language auto-detection) use_llm: Whether to use LLM analysis use_semgrep: Whether to use Semgrep analysis use_validation: Whether to use LLM validation for findings severity_threshold: Minimum severity threshold for filtering Returns: Enhanced scan result """ scan_start_time = time.time() file_path_abs = str(Path(file_path).resolve()) # Record scan start (legacy metrics) if self.metrics_collector: self.metrics_collector.record_scan_start("code", file_count=1) # Use telemetry context manager for comprehensive tracking if self.metrics_orchestrator: logger.debug("Using metrics orchestrator for code scan tracking") with self.metrics_orchestrator.track_scan_execution( trigger_source="scan_engine", scan_type="code", target_path=file_path_abs, semgrep_enabled=use_semgrep, llm_enabled=use_llm, validation_enabled=use_validation, file_count=1, ) as scan_context: logger.debug( "Scan context created, executing scan with telemetry tracking" ) return await self._scan_code_with_context( scan_context, source_code, file_path_abs, use_llm, use_semgrep, use_validation, severity_threshold, ) else: # Fallback without telemetry tracking logger.debug( "No metrics orchestrator available, executing scan without telemetry" ) return await self._scan_code_with_context( None, source_code, file_path_abs, use_llm, use_semgrep, use_validation, severity_threshold, ) async def _scan_code_with_context( self, scan_context, source_code: str, file_path: str, use_llm: bool, use_semgrep: bool, use_validation: bool, severity_threshold: Severity | None, ) -> EnhancedScanResult: """Internal scan code implementation with telemetry context.""" scan_start_time = time.time() file_path_abs = str(Path(file_path).resolve()) # Check cache first if enabled cached_result = await self._get_cached_scan_result( source_code, file_path_abs, use_llm, use_semgrep, use_validation, severity_threshold, ) if cached_result: logger.info(f"Cache hit for scan: {file_path_abs}") return cached_result # Auto-detect language from file path language = self._detect_language(Path(file_path)) logger.info(f"=== Starting code scan for {file_path_abs} ===") logger.debug( f"Scan parameters - Language: {language} (auto-detected), " f"LLM: {use_llm}, Semgrep: {use_semgrep}, " f"Severity threshold: {severity_threshold}" ) scan_metadata = { "file_path": file_path, "language": language, "use_llm": use_llm and self.enable_llm_analysis, "use_semgrep": use_semgrep and self.enable_semgrep_analysis, "source_lines": len(source_code.split("\n")), "source_size": len(source_code), } logger.info( f"Source code stats - Lines: {scan_metadata['source_lines']}, " f"Size: {scan_metadata['source_size']} chars" ) # Initialize threat lists llm_threats = [] semgrep_threats = [] # Perform Semgrep scanning if enabled semgrep_threats = [] logger.debug("Checking Semgrep status...") semgrep_status = self.semgrep_scanner.get_status() scan_metadata["semgrep_status"] = semgrep_status logger.debug(f"Semgrep status: {semgrep_status}") # Store LLM status for consistency with semgrep if self.llm_analyzer: llm_status = self.llm_analyzer.get_status() scan_metadata["llm_status"] = llm_status logger.debug(f"LLM status: {llm_status}") else: scan_metadata["llm_status"] = { "available": False, "installation_status": "not_initialized", "description": "LLM analyzer not initialized", } if use_semgrep and self.enable_semgrep_analysis: if not semgrep_status["available"]: # Semgrep not available - provide detailed status logger.warning(f"Semgrep not available: {semgrep_status['error']}") scan_metadata.update( { "semgrep_scan_success": False, "semgrep_scan_error": semgrep_status["error"], "semgrep_scan_reason": "semgrep_not_available", "semgrep_installation_status": semgrep_status[ "installation_status" ], "semgrep_installation_guidance": semgrep_status[ "installation_guidance" ], } ) else: logger.info("Starting Semgrep scanning...") try: config = self.credential_manager.load_config() logger.debug("Calling Semgrep scanner...") semgrep_threats = await self.semgrep_scanner.scan_code( source_code=source_code, file_path=file_path, language=language, config=config.semgrep_config, rules=config.semgrep_rules, severity_threshold=severity_threshold, ) logger.info( f"Semgrep scan completed - found {len(semgrep_threats)} threats" ) # Record individual threat findings in telemetry for threat in semgrep_threats: scan_context.add_threat_finding(threat, "semgrep") scan_metadata.update( { "semgrep_scan_success": True, "semgrep_scan_reason": "analysis_completed", "semgrep_version": semgrep_status["version"], "semgrep_has_pro_features": semgrep_status.get( "has_pro_features", False ), } ) except Exception as e: logger.error(f"Semgrep scan failed for {file_path_abs}: {e}") logger.debug("Semgrep scan error details", exc_info=True) scan_metadata.update( { "semgrep_scan_success": False, "semgrep_scan_error": str(e), "semgrep_scan_reason": "scan_failed", "semgrep_version": semgrep_status["version"], } ) else: if not use_semgrep: reason = "skipped_intentionally" logger.debug( "Semgrep scanning skipped (already completed at directory level to avoid duplication)" ) else: reason = "not_available" logger.debug("Semgrep scanning not available") scan_metadata.update( { "semgrep_scan_success": False, "semgrep_scan_reason": reason, } ) # Perform LLM analysis if enabled llm_threats = [] if use_llm and self.enable_llm_analysis and self.llm_analyzer: logger.info("Starting LLM analysis...") try: logger.debug("Calling LLM analyzer for code analysis...") llm_findings = self.llm_analyzer.analyze_code( source_code, file_path, language ) # Convert LLM findings to threats for finding in llm_findings: threat = finding.to_threat_match(file_path) llm_threats.append(threat) logger.info( f"LLM analysis completed - found {len(llm_threats)} threats" ) # Record individual threat findings in telemetry for threat in llm_threats: scan_context.add_threat_finding(threat, "llm") scan_metadata["llm_scan_success"] = True scan_metadata["llm_scan_reason"] = "analysis_completed" except Exception as e: logger.error(f"LLM analysis failed for {file_path}: {e}") logger.debug("LLM analysis error details", exc_info=True) scan_metadata["llm_scan_success"] = False scan_metadata["llm_scan_error"] = str(e) scan_metadata["llm_scan_reason"] = "analysis_failed" else: if not use_llm: reason = "disabled_by_user" logger.debug("LLM analysis disabled by user request") elif not self.enable_llm_analysis: reason = "disabled_in_config" logger.debug("LLM analysis disabled in configuration") elif not self.llm_analyzer: reason = "not_initialized" logger.debug("LLM analyzer not initialized") else: reason = "not_available" logger.debug("LLM analysis not available - check configuration") scan_metadata["llm_scan_success"] = False scan_metadata["llm_scan_reason"] = reason # Filter by severity threshold if specified original_counts = { "semgrep": len(semgrep_threats), "llm": len(llm_threats), } if severity_threshold: logger.info(f"Applying severity filter: {severity_threshold.value}") llm_threats = self._filter_by_severity(llm_threats, severity_threshold) semgrep_threats = self._filter_by_severity( semgrep_threats, severity_threshold ) filtered_counts = { "semgrep": len(semgrep_threats), "llm": len(llm_threats), } logger.info( f"Severity filtering results - " f"Semgrep: {original_counts['semgrep']} -> {filtered_counts['semgrep']}, " f"LLM: {original_counts['llm']} -> {filtered_counts['llm']}" ) # Apply LLM validation if enabled validation_results = {} # Debug logging for validation conditions logger.debug("Validation conditions check:") logger.debug(f" use_validation: {use_validation}") logger.debug(f" self.enable_llm_validation: {self.enable_llm_validation}") logger.debug(f" self.llm_validator: {self.llm_validator is not None}") logger.debug(f" self.llm_validator type: {type(self.llm_validator)}") if use_validation and self.enable_llm_validation and self.llm_validator: # Combine all threats for validation all_threats_for_validation = llm_threats + semgrep_threats if all_threats_for_validation: # Check if validator is fully functional or using fallback if self.llm_validator.is_fully_functional(): logger.info( f"Validating {len(all_threats_for_validation)} findings with LLM validator (full mode)" ) scan_metadata["llm_validation_mode"] = "full" else: logger.info( f"Validating {len(all_threats_for_validation)} findings with LLM validator (fallback mode)" ) scan_metadata["llm_validation_mode"] = "fallback" try: validation_results = ( await self.llm_validator._validate_findings_async( findings=all_threats_for_validation, source_code=source_code, file_path=file_path, generate_exploits=True, ) ) # Store validation results in scan context for atomic database updates if ( "scan_context" in locals() and scan_context and validation_results ): scan_context.set_validation_results(validation_results) # Filter false positives based on validation # Call validator (for side effects/mocking in tests) and use its return if valid, # otherwise fall back to internal filter for robustness try: llm_filtered = self.llm_validator.filter_false_positives( llm_threats, validation_results ) except Exception: llm_filtered = None if isinstance(llm_filtered, list): llm_threats = llm_filtered else: llm_threats = self._apply_validation_filter( llm_threats, validation_results ) try: semgrep_filtered = self.llm_validator.filter_false_positives( semgrep_threats, validation_results ) except Exception: semgrep_filtered = None if isinstance(semgrep_filtered, list): semgrep_threats = semgrep_filtered else: semgrep_threats = self._apply_validation_filter( semgrep_threats, validation_results ) # Add validation stats to metadata scan_metadata["llm_validation_success"] = True scan_metadata["llm_validation_stats"] = ( self.llm_validator.get_validation_stats(validation_results) ) logger.info( f"Validation complete - filtered to {len(llm_threats) + len(semgrep_threats)} legitimate findings" ) except Exception as e: logger.error(f"LLM validation failed: {e}") scan_metadata["llm_validation_success"] = False scan_metadata["llm_validation_error"] = str(e) else: logger.debug("Validation conditions not met - entering else clause") scan_metadata["llm_validation_success"] = False if not use_validation: logger.debug("Reason: use_validation=False") scan_metadata["llm_validation_reason"] = "disabled" elif not self.enable_llm_validation: logger.debug("Reason: self.enable_llm_validation=False") scan_metadata["llm_validation_reason"] = "disabled" else: logger.debug("Reason: self.llm_validator is None or falsy") scan_metadata["llm_validation_reason"] = "not_available" # Create enhanced scan result with LLM usage tracking result = EnhancedScanResult( file_path=file_path, llm_threats=llm_threats, semgrep_threats=semgrep_threats, scan_metadata=scan_metadata, validation_results=validation_results, llm_usage_stats=None, # Will be populated by LLM components ) # Cache the scan result if caching is enabled await self._cache_scan_result( result, source_code, file_path_abs, use_llm, use_semgrep, use_validation, severity_threshold, ) logger.info( f"=== Code scan complete for {file_path} - " f"Total threats: {len(result.all_threats)} ===" ) # Record scan completion if self.metrics_collector: duration = time.time() - scan_start_time self.metrics_collector.record_scan_completion( "code", duration, success=True, findings_count=len(result.all_threats) ) return result async def scan_file( self, file_path: Path, use_llm: bool = True, use_semgrep: bool = True, use_validation: bool = True, severity_threshold: Severity | None = None, ) -> EnhancedScanResult: """Scan a single file using enhanced scanning with auto-detected language. Args: file_path: Path to the file to scan (used for language auto-detection) use_llm: Whether to use LLM analysis use_semgrep: Whether to use Semgrep analysis use_validation: Whether to use LLM validation for findings severity_threshold: Minimum severity threshold for filtering Returns: Enhanced scan result """ scan_start_time = time.time() file_path_abs = str(Path(file_path).resolve()) logger.info(f"=== Starting file scan: {file_path_abs} ===") logger.info( f"SCAN ENGINE DEBUG: use_validation={use_validation}, type={type(use_validation)}" ) if not file_path.exists(): logger.error(f"File not found: {file_path_abs}") raise FileNotFoundError(f"File not found: {file_path}") # Record scan start if self.metrics_collector: self.metrics_collector.record_scan_start("file", file_count=1) # Use telemetry context manager for comprehensive tracking if self.metrics_orchestrator: with self.metrics_orchestrator.track_scan_execution( trigger_source="scan_engine", scan_type="file", target_path=file_path_abs, semgrep_enabled=use_semgrep, llm_enabled=use_llm, validation_enabled=use_validation, file_count=1, ) as scan_context: return await self._scan_file_with_context( scan_context, file_path, file_path_abs, use_llm, use_semgrep, use_validation, severity_threshold, ) else: # Fallback without telemetry tracking return await self._scan_file_with_context( None, file_path, file_path_abs, use_llm, use_semgrep, use_validation, severity_threshold, ) async def _scan_file_with_context( self, scan_context, file_path: Path, file_path_abs: str, use_llm: bool, use_semgrep: bool, use_validation: bool, severity_threshold: Severity | None, ) -> EnhancedScanResult: """Internal scan file implementation with telemetry context.""" scan_start_time = time.time() # Check cache if available cache_key = None cached_result = None if self.cache_manager: # Read file content for hashing try: file_content = file_path.read_text(encoding="utf-8", errors="replace") # Create cache key with scan parameters content_hash = self.cache_manager.get_hasher().hash_content( file_content ) metadata = { "use_llm": use_llm and self.enable_llm_analysis, "use_semgrep": use_semgrep and self.enable_semgrep_analysis, "use_validation": use_validation and self.enable_llm_validation, "severity_threshold": ( str(severity_threshold) if severity_threshold else None ), } metadata_hash = self.cache_manager.get_hasher().hash_metadata(metadata) from ..cache.types import CacheKey, CacheType cache_key = CacheKey( cache_type=CacheType.FILE_ANALYSIS, content_hash=content_hash, metadata_hash=metadata_hash, ) cached_result = self.cache_manager.get(cache_key) if cached_result: logger.info(f"Cache hit for file scan: {file_path_abs}") # Add cache metadata to result cached_result.scan_metadata["cache_hit"] = True cached_result.scan_metadata["cache_key"] = str(cache_key) return cached_result else: logger.debug(f"Cache miss for file scan: {file_path_abs}") except Exception as e: logger.warning(f"Cache check failed for {file_path_abs}: {e}") cache_key = None # Auto-detect language from file extension logger.debug(f"Auto-detecting language for: {file_path_abs}") language = self._detect_language(file_path) logger.info(f"Detected language: {language}") scan_metadata = { "file_path": str(file_path), "language": language, "use_llm": use_llm and self.enable_llm_analysis, "use_semgrep": use_semgrep and self.enable_semgrep_analysis, } # Initialize threat lists rules_threats = [] semgrep_threats = [] llm_threats = [] # Perform Semgrep scanning if enabled logger.debug("Checking Semgrep status...") semgrep_status = self.semgrep_scanner.get_status() scan_metadata["semgrep_status"] = semgrep_status logger.debug(f"Semgrep status: {semgrep_status}") # Store LLM status for consistency with semgrep if self.llm_analyzer: llm_status = self.llm_analyzer.get_status() scan_metadata["llm_status"] = llm_status logger.debug(f"LLM status: {llm_status}") else: scan_metadata["llm_status"] = { "available": False, "installation_status": "not_initialized", "description": "LLM analyzer not initialized", } if use_semgrep and self.enable_semgrep_analysis: if not semgrep_status["available"]: # Semgrep not available - provide detailed status logger.warning(f"Semgrep not available: {semgrep_status['error']}") scan_metadata.update( { "semgrep_scan_success": False, "semgrep_scan_error": semgrep_status["error"], "semgrep_scan_reason": "semgrep_not_available", "semgrep_installation_status": semgrep_status[ "installation_status" ], "semgrep_installation_guidance": semgrep_status[ "installation_guidance" ], } ) else: logger.info("Starting Semgrep scanning...") try: config = self.credential_manager.load_config() logger.debug("Calling Semgrep scanner...") semgrep_threats = await self.semgrep_scanner.scan_file( file_path=str(file_path), language=language, config=config.semgrep_config, rules=config.semgrep_rules, severity_threshold=severity_threshold, ) logger.info( f"Semgrep scan completed - found {len(semgrep_threats)} threats" ) # Record individual threat findings in telemetry for threat in semgrep_threats: scan_context.add_threat_finding(threat, "semgrep") scan_metadata.update( { "semgrep_scan_success": True, "semgrep_scan_reason": "analysis_completed", "semgrep_version": semgrep_status["version"], "semgrep_has_pro_features": semgrep_status.get( "has_pro_features", False ), } ) except Exception as e: logger.error(f"Semgrep scan failed for {file_path_abs}: {e}") logger.debug("Semgrep scan error details", exc_info=True) scan_metadata.update( { "semgrep_scan_success": False, "semgrep_scan_error": str(e), "semgrep_scan_reason": "scan_failed", "semgrep_version": semgrep_status["version"], } ) else: if not use_semgrep: reason = "disabled_by_user" logger.debug("Semgrep scanning disabled by user request") else: reason = "not_available" logger.debug("Semgrep scanning not available") scan_metadata.update( { "semgrep_scan_success": False, "semgrep_scan_reason": reason, } ) # Perform LLM analysis if enabled if use_llm and self.enable_llm_analysis and self.llm_analyzer: logger.info("Starting LLM analysis...") try: logger.debug("Calling LLM analyzer for file...") llm_findings = await self.llm_analyzer.analyze_file( file_path=file_path, language=language ) # Convert LLM findings to threats for finding in llm_findings: threat = finding.to_threat_match(str(file_path)) llm_threats.append(threat) logger.info( f"LLM analysis completed - found {len(llm_threats)} threats" ) # Record individual threat findings in telemetry for threat in llm_threats: scan_context.add_threat_finding(threat, "llm") scan_metadata["llm_scan_success"] = True scan_metadata["llm_scan_reason"] = "analysis_completed" except Exception as e: logger.error(f"LLM analysis failed for {file_path_abs}: {e}") logger.debug("LLM analysis error details", exc_info=True) scan_metadata["llm_scan_success"] = False scan_metadata["llm_scan_error"] = str(e) scan_metadata["llm_scan_reason"] = "analysis_failed" else: if not use_llm: reason = "disabled_by_user" logger.debug("LLM analysis disabled by user request") elif not self.enable_llm_analysis: reason = "disabled_in_config" logger.debug("LLM analysis disabled in configuration") elif not self.llm_analyzer: reason = "not_initialized" logger.debug("LLM analyzer not initialized") else: reason = "not_available" logger.debug("LLM analysis not available - check configuration") scan_metadata["llm_scan_success"] = False scan_metadata["llm_scan_reason"] = reason # Filter by severity threshold if specified if severity_threshold: rules_threats = self._filter_by_severity(rules_threats, severity_threshold) llm_threats = self._filter_by_severity(llm_threats, severity_threshold) semgrep_threats = self._filter_by_severity( semgrep_threats, severity_threshold ) # Apply LLM validation if enabled validation_results = {} if use_validation and self.enable_llm_validation and self.llm_validator: # Combine all threats for validation all_threats_for_validation = llm_threats + semgrep_threats if all_threats_for_validation: # Check if validator is fully functional or using fallback if self.llm_validator.is_fully_functional(): logger.info( f"Validating {len(all_threats_for_validation)} findings with LLM validator (full mode)" ) scan_metadata["llm_validation_mode"] = "full" else: logger.info( f"Validating {len(all_threats_for_validation)} findings with LLM validator (fallback mode)" ) scan_metadata["llm_validation_mode"] = "fallback" # Read file content for validation source_code = "" try: reader = StreamingFileReader() chunks = [] async for chunk in reader.read_file_async(file_path): chunks.append(chunk) source_code = "".join(chunks) except Exception as e: logger.error(f"Failed to read file content for validation: {e}") scan_metadata["llm_validation_success"] = False scan_metadata["llm_validation_error"] = ( f"File reading failed: {str(e)}" ) # Continue with empty source_code for validation # Perform validation validation_results = {} # Initialize to avoid NameError if source_code or True: # Allow validation even if file reading failed try: validation_results = ( await self.llm_validator._validate_findings_async( findings=all_threats_for_validation, source_code=source_code, file_path=str(file_path), generate_exploits=True, ) ) logger.debug( f"Validation completed with {len(validation_results)} results" ) # Store validation results in scan context for atomic database updates if ( "scan_context" in locals() and scan_context and validation_results ): scan_context.set_validation_results(validation_results) except Exception as e: logger.error(f"LLM validation processing failed: {e}") scan_metadata["llm_validation_success"] = False scan_metadata["llm_validation_error"] = ( f"Validation processing failed: {str(e)}" ) # Keep validation_results as empty dict for graceful handling validation_results = {} # Apply filtering if validation was successful if validation_results: try: # Filter LLM threats try: llm_filtered = ( self.llm_validator.filter_false_positives( llm_threats, validation_results ) ) except Exception: llm_filtered = None if isinstance(llm_filtered, list): llm_threats = llm_filtered else: llm_threats = self._apply_validation_filter( llm_threats, validation_results ) # Filter Semgrep threats try: semgrep_filtered = ( self.llm_validator.filter_false_positives( semgrep_threats, validation_results ) ) except Exception: semgrep_filtered = None if isinstance(semgrep_filtered, list): semgrep_threats = semgrep_filtered else: semgrep_threats = self._apply_validation_filter( semgrep_threats, validation_results ) logger.info( f"Validation filtering complete - filtered to {len(llm_threats) + len(semgrep_threats)} legitimate findings" ) except Exception as e: logger.error(f"Validation filtering failed: {e}") # Don't reset validation_results here - preserve them for the result # Add validation stats to metadata (separate try-catch to avoid losing validation_results) try: if validation_results and self.llm_validator: scan_metadata["llm_validation_success"] = True scan_metadata["llm_validation_stats"] = ( self.llm_validator.get_validation_stats( validation_results ) ) else: scan_metadata["llm_validation_success"] = False except Exception as e: logger.error(f"Failed to generate validation stats: {e}") scan_metadata["llm_validation_success"] = ( True # Still successful, just no stats ) scan_metadata["llm_validation_stats_error"] = str(e) else: logger.debug("Validation conditions not met - entering else clause") scan_metadata["llm_validation_success"] = False if not use_validation: logger.debug("Reason: use_validation=False") scan_metadata["llm_validation_reason"] = "disabled" elif not self.enable_llm_validation: logger.debug("Reason: self.enable_llm_validation=False") scan_metadata["llm_validation_reason"] = "disabled" else: logger.debug("Reason: self.llm_validator is None or falsy") scan_metadata["llm_validation_reason"] = "not_available" result = EnhancedScanResult( file_path=str(file_path), llm_threats=llm_threats, semgrep_threats=semgrep_threats, scan_metadata=scan_metadata, validation_results=validation_results, llm_usage_stats=None, # Will be populated by LLM components ) logger.info( f"=== File scan complete for {file_path} - " f"Total threats: {len(result.all_threats)} ===" ) # Record scan completion if self.metrics_collector: duration = time.time() - scan_start_time self.metrics_collector.record_scan_completion( "file", duration, success=True, findings_count=len(result.all_threats) ) # Update telemetry context with scan results if scan_context: # Extract timing from metadata (if available) if result.scan_metadata.get("semgrep_duration_ms"): scan_context.semgrep_duration = result.scan_metadata[ "semgrep_duration_ms" ] if result.scan_metadata.get("llm_duration_ms"): scan_context.llm_duration = result.scan_metadata["llm_duration_ms"] if result.scan_metadata.get("validation_duration_ms"): scan_context.validation_duration = result.scan_metadata[ "validation_duration_ms" ] if result.scan_metadata.get("cache_lookup_ms"): scan_context.cache_lookup_duration = result.scan_metadata[ "cache_lookup_ms" ] # Update result counts scan_context.threats_found = len(result.all_threats) scan_context.threats_validated = sum( 1 for t in result.all_threats if hasattr(t, "validated") and t.validated ) scan_context.false_positives_filtered = result.scan_metadata.get( "false_positives_filtered", 0 ) # Mark cache hit if applicable if result.scan_metadata.get("cache_hit", False): scan_context.mark_cache_hit() # Store result in cache if available if self.cache_manager and cache_key: try: # Add cache metadata to the result before storing result.scan_metadata["cache_hit"] = False result.scan_metadata["cache_key"] = str(cache_key) self.cache_manager.put(cache_key, result) logger.debug(f"Cached scan result for: {file_path_abs}") except Exception as e: logger.warning(f"Failed to cache scan result for {file_path_abs}: {e}") return result async def scan_directory( self, directory_path: Path, recursive: bool = True, use_llm: bool = True, use_semgrep: bool = True, use_validation: bool = True, severity_threshold: Severity | None = None, ) -> list[EnhancedScanResult]: """Scan a directory using enhanced scanning with optimized approach. Args: directory_path: Path to the directory to scan recursive: Whether to scan subdirectories use_llm: Whether to use LLM analysis use_semgrep: Whether to use Semgrep analysis use_validation: Whether to use LLM validation for findings severity_threshold: Minimum severity threshold for filtering Returns: List of enhanced scan results """ directory_path_obj = Path(directory_path).resolve() directory_path_abs = str(directory_path_obj) logger.info(f"=== Starting directory scan: {directory_path_abs} ===") logger.debug( f"Directory scan parameters - Recursive: {recursive}, " f"LLM: {use_llm}, Semgrep: {use_semgrep}" ) if not directory_path_obj.exists(): logger.error(f"Directory not found: {directory_path_abs}") raise FileNotFoundError(f"Directory not found: {directory_path_abs}") # Use telemetry context manager for comprehensive tracking if self.metrics_orchestrator: logger.debug("Using metrics orchestrator for directory scan tracking") with self.metrics_orchestrator.track_scan_execution( trigger_source="scan_engine", scan_type="directory", target_path=directory_path_abs, semgrep_enabled=use_semgrep, llm_enabled=use_llm, validation_enabled=use_validation, file_count=0, # Will be updated when files are discovered ) as scan_context: logger.debug( "Scan context created for directory scan with telemetry tracking" ) # Continue with directory scan logic - scan_context is now available for threat recording return await self._execute_directory_scan( directory_path_obj, scan_context, recursive, use_llm, use_semgrep, use_validation, severity_threshold, ) else: # Fallback without telemetry tracking - use None as scan_context logger.debug( "No metrics orchestrator available, running directory scan without telemetry" ) return await self._execute_directory_scan( directory_path_obj, None, recursive, use_llm, use_semgrep, use_validation, severity_threshold, ) async def _execute_directory_scan( self, directory_path_obj: Path, scan_context, # Can be None if telemetry is not available recursive: bool, use_llm: bool, use_semgrep: bool, use_validation: bool, severity_threshold: Severity | None, ) -> list[EnhancedScanResult]: """Execute directory scan with optional telemetry tracking.""" directory_path_abs = str(directory_path_obj) # Initialize file filter with smart exclusions config = self.credential_manager.load_config() file_filter = FileFilter( root_path=directory_path_obj, max_file_size_mb=config.max_file_size_mb, respect_gitignore=True, ) # Find all files to scan with intelligent filtering all_files = [] pattern = "**/*" if recursive else "*" logger.debug(f"Discovering files with pattern: {pattern}") for file_path in directory_path_obj.glob(pattern): if file_path.is_file(): all_files.append(file_path) logger.info(f"Discovered {len(all_files)} total files") # Apply smart filtering files_to_scan = file_filter.filter_files(all_files) logger.info(f"After filtering: {len(files_to_scan)} files to scan") if len(all_files) > len(files_to_scan): logger.info( f"Filtered out {len(all_files) - len(files_to_scan)} files (.gitignore, binary, too large, etc.)" ) # Perform Semgrep scanning once for entire directory if enabled directory_semgrep_threats = {} # Map file_path -> list[ThreatMatch] semgrep_scan_metadata = {} # Always get semgrep status for metadata consistency semgrep_status = self.semgrep_scanner.get_status() if use_semgrep and self.enable_semgrep_analysis: logger.info("Starting directory-level Semgrep scan...") if semgrep_status["available"]: try: logger.debug("Running single Semgrep scan for entire directory") config = self.credential_manager.load_config() # Use semgrep's directory scanning capability all_semgrep_threats = await self.semgrep_scanner.scan_directory( directory_path=directory_path_abs, config=config.semgrep_config, rules=config.semgrep_rules, recursive=recursive, severity_threshold=severity_threshold, ) # Group threats by file path for threat in all_semgrep_threats: file_path = threat.file_path if file_path not in directory_semgrep_threats: directory_semgrep_threats[file_path] = [] directory_semgrep_threats[file_path].append(threat) logger.info( f"Directory Semgrep scan complete: found {len(all_semgrep_threats)} threats across {len(directory_semgrep_threats)} files" ) logger.info( f"[+] Semgrep optimization: Scanned entire directory once instead of {len(files_to_scan)} individual scans" ) # Record individual threat findings in telemetry using scan context if scan_context: for threat in all_semgrep_threats: scan_context.add_threat_finding(threat, "semgrep") semgrep_scan_metadata = { "semgrep_scan_success": True, "semgrep_scan_reason": "directory_analysis_completed", "semgrep_version": semgrep_status["version"], "semgrep_has_pro_features": semgrep_status.get( "has_pro_features", False ), "semgrep_total_threats": len(all_semgrep_threats), "semgrep_files_with_threats": len(directory_semgrep_threats), } except Exception as e: logger.error(f"Directory Semgrep scan failed: {e}") logger.debug("Directory Semgrep scan error details", exc_info=True) semgrep_scan_metadata = { "semgrep_scan_success": False, "semgrep_scan_error": str(e), "semgrep_scan_reason": "directory_scan_failed", "semgrep_version": semgrep_status["version"], } else: logger.warning( f"Semgrep not available for directory scan: {semgrep_status['error']}" ) semgrep_scan_metadata = { "semgrep_scan_success": False, "semgrep_scan_error": semgrep_status["error"], "semgrep_scan_reason": "semgrep_not_available", "semgrep_installation_status": semgrep_status[ "installation_status" ], "semgrep_installation_guidance": semgrep_status[ "installation_guidance" ], } else: if not use_semgrep: reason = "disabled_by_user" logger.info("Directory Semgrep scan disabled by user request") else: reason = "not_available" logger.warning( "Directory Semgrep scan unavailable - Semgrep not found or not configured" ) semgrep_scan_metadata = { "semgrep_scan_success": False, "semgrep_scan_reason": reason, } # Perform LLM analysis for entire directory if enabled directory_llm_threats = {} # Map file_path -> list[ThreatMatch] llm_scan_metadata = {} if use_llm and self.enable_llm_analysis and self.llm_analyzer: logger.info("Starting directory-level LLM analysis...") try: logger.debug( f"Calling LLM analyzer for {len(files_to_scan)} filtered files..." ) all_llm_findings = await self.llm_analyzer.analyze_files( file_paths=files_to_scan, ) # Convert LLM findings to threats and group by file all_llm_threats = [] for finding in all_llm_findings: threat = finding.to_threat_match(finding.file_path) all_llm_threats.append(threat) file_path = finding.file_path logger.debug(f"Processing LLM finding for file: {file_path}") if file_path not in directory_llm_threats: directory_llm_threats[file_path] = [] directory_llm_threats[file_path].append(threat) logger.debug( f"Added threat to directory_llm_threats[{file_path}], now has {len(directory_llm_threats[file_path])} threats" ) logger.info( f"Directory LLM analysis complete: found {len(all_llm_threats)} threats across {len(directory_llm_threats)} files" ) # Record individual threat findings in telemetry using scan context if scan_context: for threat in all_llm_threats: scan_context.add_threat_finding(threat, "llm") llm_scan_metadata = { "llm_scan_success": True, "llm_scan_reason": "directory_analysis_completed", "llm_total_threats": len(all_llm_threats), "llm_files_with_threats": len(directory_llm_threats), } except Exception as e: logger.error(f"Directory LLM analysis failed: {e}") logger.debug("Directory LLM analysis error details", exc_info=True) llm_scan_metadata = { "llm_scan_success": False, "llm_scan_error": str(e), "llm_scan_reason": "directory_analysis_failed", } else: if not use_llm: reason = "disabled_by_user" logger.info("Directory LLM analysis disabled by user request") else: reason = "not_available" logger.warning( "Directory LLM analysis unavailable - no API key configured" ) llm_scan_metadata = { "llm_scan_success": False, "llm_scan_reason": reason, } # Return directory-level scan results directly logger.info( "=== Directory scan complete - returning directory-level results ===" ) # Combine all threats from directory scans all_threats = [] all_threats.extend( all_semgrep_threats if "all_semgrep_threats" in locals() else [] ) all_threats.extend( all_llm_threats if "all_llm_threats" in locals() and use_llm else [] ) # Build file information for proper metrics files_with_threats = set() for threat in all_threats: if hasattr(threat, "file_path"): files_with_threats.add(threat.file_path) # Create file information list for JSON metrics files_info = [] for file_path in files_to_scan: file_threat_count = sum( 1 for t in all_threats if hasattr(t, "file_path") and t.file_path == str(file_path) ) files_info.append( { "file_path": str(file_path), "language": ( self._detect_language(file_path) if file_path.exists() else "generic" ), "threat_count": file_threat_count, "issues_identified": file_threat_count > 0, } ) # Initialize validation results validation_results = None # LLM validation step - same logic as file scanning if use_validation and self.enable_llm_validation and self.llm_validator: # Combine all threats for validation all_threats_for_validation = all_threats if all_threats_for_validation: # Check if validator is fully functional or using fallback if self.llm_validator.is_fully_functional(): logger.info( f"Validating {len(all_threats_for_validation)} findings with LLM validator (full mode) for directory scan" ) semgrep_scan_metadata["llm_validation_mode"] = "full" else: logger.info( f"Validating {len(all_threats_for_validation)} findings with LLM validator (fallback mode) for directory scan" ) semgrep_scan_metadata["llm_validation_mode"] = "fallback" try: # For directory scans, we need to read the directory contents for context # Create a concatenated source code context from all relevant files directory_context = f"Directory scan of: {directory_path_abs}\n" directory_context += ( f"Files scanned: {len(files_to_scan)} files\n\n" ) # Add sample content from files with threats for context context_files_added = 0 max_context_files = 5 # Limit context to avoid huge payloads for file_path in files_to_scan: if context_files_added >= max_context_files: break file_threat_count = sum( 1 for t in all_threats if hasattr(t, "file_path") and t.file_path == str(file_path) ) if file_threat_count > 0: try: file_content = file_path.read_text( encoding="utf-8", errors="replace" ) directory_context += ( f"=== {file_path.name} ===\n{file_content}\n\n" ) context_files_added += 1 except Exception as e: logger.debug( f"Could not read {file_path} for validation context: {e}" ) validation_results = ( await self.llm_validator._validate_findings_async( findings=all_threats_for_validation, source_code=directory_context, file_path=directory_path_abs, generate_exploits=True, ) ) # Store validation results in scan context for atomic database updates if ( "scan_context" in locals() and scan_context and validation_results ): scan_context.set_validation_results(validation_results) # Filter false positives based on validation if "all_llm_threats" in locals(): try: llm_filtered = self.llm_validator.filter_false_positives( all_llm_threats, validation_results ) except Exception: llm_filtered = None if isinstance(llm_filtered, list): all_llm_threats = llm_filtered else: all_llm_threats = self._apply_validation_filter( all_llm_threats, validation_results ) if "all_semgrep_threats" in locals(): try: semgrep_filtered = ( self.llm_validator.filter_false_positives( all_semgrep_threats, validation_results ) ) except Exception: semgrep_filtered = None if isinstance(semgrep_filtered, list): all_semgrep_threats = semgrep_filtered else: all_semgrep_threats = self._apply_validation_filter( all_semgrep_threats, validation_results ) # Update combined threats list after filtering all_threats = [] all_threats.extend( all_semgrep_threats if "all_semgrep_threats" in locals() else [] ) all_threats.extend( all_llm_threats if "all_llm_threats" in locals() and use_llm else [] ) # Add validation stats to metadata semgrep_scan_metadata["llm_validation_success"] = True semgrep_scan_metadata["llm_validation_stats"] = ( self.llm_validator.get_validation_stats(validation_results) ) logger.info( f"Directory validation complete - filtered to {len(all_threats)} legitimate findings" ) except Exception as e: logger.error(f"Directory LLM validation failed: {e}") semgrep_scan_metadata["llm_validation_success"] = False semgrep_scan_metadata["llm_validation_error"] = str(e) else: # No threats to validate semgrep_scan_metadata["llm_validation_success"] = True semgrep_scan_metadata["llm_validation_reason"] = ( "no_threats_to_validate" ) else: logger.debug( "Directory validation conditions not met - entering else clause" ) semgrep_scan_metadata["llm_validation_success"] = False if not use_validation: logger.debug("Reason: use_validation=False") semgrep_scan_metadata["llm_validation_reason"] = "disabled" elif not self.enable_llm_validation: logger.debug("Reason: self.enable_llm_validation=False") semgrep_scan_metadata["llm_validation_reason"] = "disabled" else: logger.debug("Reason: self.llm_validator is None or falsy") semgrep_scan_metadata["llm_validation_reason"] = "not_available" # Recalculate files with threats after validation filtering files_with_threats = set() for threat in all_threats: if hasattr(threat, "file_path"): files_with_threats.add(threat.file_path) # Update files_info with new threat counts after validation for file_info in files_info: file_path_str = file_info["file_path"] file_threat_count = sum( 1 for t in all_threats if hasattr(t, "file_path") and t.file_path == file_path_str ) file_info["threat_count"] = file_threat_count file_info["issues_identified"] = file_threat_count > 0 # Create single directory-level result directory_result = EnhancedScanResult( file_path=directory_path_abs, semgrep_threats=( all_semgrep_threats if "all_semgrep_threats" in locals() else [] ), llm_threats=( all_llm_threats if "all_llm_threats" in locals() and use_llm else [] ), validation_results=validation_results, scan_metadata={ "directory_path": directory_path_abs, "directory_scan": True, "total_files_discovered": len(all_files), "files_filtered_for_scan": len(files_to_scan), "files_with_threats": len(files_with_threats), "files_clean": len(files_to_scan) - len(files_with_threats), "total_threats_found": len(all_threats), "scan_type": "directory_level", "directory_files_info": files_info, # For proper JSON metrics **semgrep_scan_metadata, **llm_scan_metadata, }, llm_usage_stats=None, # TODO: Add LLM usage stats if needed ) logger.info(f"Directory scan found {len(all_threats)} total threats") return [directory_result] async def _process_single_file( self, file_path: Path, directory_semgrep_threats: dict[str, list[ThreatMatch]], directory_llm_threats: dict[str, list[ThreatMatch]], semgrep_scan_metadata: dict[str, Any], llm_scan_metadata: dict[str, Any], semgrep_status: dict[str, Any], use_llm: bool, use_semgrep: bool, use_validation: bool, severity_threshold: Severity | None, semaphore: asyncio.Semaphore, ) -> EnhancedScanResult: """Process a single file for scanning (used in parallel processing). Args: file_path: Path to the file to process directory_semgrep_threats: Threats found by directory-level Semgrep scan directory_llm_threats: Threats found by directory-level LLM scan semgrep_scan_metadata: Metadata from Semgrep scan llm_scan_metadata: Metadata from LLM scan semgrep_status: Semgrep status information use_llm: Whether LLM analysis is enabled use_semgrep: Whether Semgrep analysis is enabled use_validation: Whether LLM validation is enabled severity_threshold: Minimum severity threshold semaphore: Semaphore for concurrency control Returns: EnhancedScanResult for the file """ async with semaphore: # Limit concurrent operations file_path_abs = str(Path(file_path).resolve()) logger.debug(f"Processing file: {file_path_abs}") # Detect language language = self._detect_language(file_path) # Get threats for this file from directory scans file_semgrep_threats = directory_semgrep_threats.get(str(file_path), []) file_llm_threats = directory_llm_threats.get(str(file_path), []) logger.debug( f"File {file_path.name}: {len(file_semgrep_threats)} Semgrep threats, " f"{len(file_llm_threats)} LLM threats from directory scans" ) # Initialize file scan metadata scan_metadata: dict[str, Any] = { "file_path": str(file_path), "language": language, "use_llm": use_llm and self.enable_llm_analysis, "use_semgrep": use_semgrep and self.enable_semgrep_analysis, "directory_scan": True, "parallel_processing": True, "semgrep_source": "directory_scan", "llm_source": "directory_scan", } # Add directory scan metadata scan_metadata.update(semgrep_scan_metadata) scan_metadata.update(llm_scan_metadata) # Add semgrep status scan_metadata["semgrep_status"] = semgrep_status # Add LLM status for consistency if self.llm_analyzer: llm_status = self.llm_analyzer.get_status() scan_metadata["llm_status"] = llm_status else: scan_metadata["llm_status"] = { "available": False, "installation_status": "not_initialized", "description": "LLM analyzer not initialized", } # Filter by severity threshold if specified if severity_threshold: file_llm_threats = self._filter_by_severity( file_llm_threats, severity_threshold ) file_semgrep_threats = self._filter_by_severity( file_semgrep_threats, severity_threshold ) # Apply LLM validation if enabled validation_results = {} if use_validation and self.enable_llm_validation and self.llm_validator: # Combine all threats for validation all_threats_for_validation = file_llm_threats + file_semgrep_threats if all_threats_for_validation: # Check if validator is fully functional or using fallback if self.llm_validator.is_fully_functional(): logger.debug( f"Validating {len(all_threats_for_validation)} findings for {file_path.name} (full mode)" ) scan_metadata["llm_validation_mode"] = "full" else: logger.debug( f"Validating {len(all_threats_for_validation)} findings for {file_path.name} (fallback mode)" ) scan_metadata["llm_validation_mode"] = "fallback" try: # Use streaming for large files, regular read for small files config = self.credential_manager.load_config() try: max_file_size_config = getattr( config, "max_file_size_mb", 10 ) max_file_size_num = ( int(max_file_size_config) if max_file_size_config is not None else 10 ) except Exception: max_file_size_num = 10 if is_file_too_large(file_path, max_size_mb=max_file_size_num): logger.debug( f"Using streaming read for large file: {file_path}" ) streaming_reader = StreamingFileReader() source_code = await streaming_reader.get_file_preview( file_path, preview_size=10000, # 10KB preview for validation ) else: # Read small files using streaming reader reader = StreamingFileReader() chunks = [] async for chunk in reader.read_file_async(file_path): chunks.append(chunk) source_code = "".join(chunks) validation_results = ( await self.llm_validator._validate_findings_async( findings=all_threats_for_validation, source_code=source_code, file_path=str(file_path), generate_exploits=True, ) ) # Filter false positives based on validation file_llm_threats = self.llm_validator.filter_false_positives( file_llm_threats, validation_results ) file_semgrep_threats = ( self.llm_validator.filter_false_positives( file_semgrep_threats, validation_results ) ) # Add validation stats to metadata scan_metadata["llm_validation_success"] = True scan_metadata["llm_validation_stats"] = ( self.llm_validator.get_validation_stats(validation_results) ) except Exception as e: logger.debug(f"LLM validation failed for {file_path.name}: {e}") scan_metadata["llm_validation_success"] = False scan_metadata["llm_validation_error"] = str(e) else: scan_metadata["llm_validation_success"] = False if not use_validation: scan_metadata["llm_validation_reason"] = "disabled" elif not self.enable_llm_validation: scan_metadata["llm_validation_reason"] = "disabled" else: scan_metadata["llm_validation_reason"] = "not_available" # Create result for this file result = EnhancedScanResult( file_path=str(file_path), llm_threats=file_llm_threats, semgrep_threats=file_semgrep_threats, scan_metadata=scan_metadata, validation_results=validation_results, llm_usage_stats=None, # Will be populated by LLM components ) return result async def scan_directory_streaming( self, directory_path: Path, recursive: bool = True, use_llm: bool = True, use_semgrep: bool = True, use_validation: bool = True, severity_threshold: Severity | None = None, batch_size: int = 10, ): """Streaming version of scan_directory that yields results in batches. This method yields EnhancedScanResult objects as they are completed, allowing for progressive processing of large directories without accumulating all results in memory. Args: directory_path: Path to the directory to scan recursive: Whether to scan subdirectories use_llm: Whether to use LLM analysis use_semgrep: Whether to use Semgrep analysis use_validation: Whether to use LLM validation for findings severity_threshold: Minimum severity threshold for filtering batch_size: Number of files to process in each batch Yields: EnhancedScanResult objects as they are completed """ directory_path_obj = Path(directory_path).resolve() directory_path_abs = str(directory_path_obj) logger.info(f"=== Starting streaming directory scan: {directory_path_abs} ===") logger.debug( f"Streaming scan parameters - Recursive: {recursive}, " f"Batch size: {batch_size}, LLM: {use_llm}, Semgrep: {use_semgrep}" ) if not directory_path_obj.exists(): logger.error(f"Directory not found: {directory_path_abs}") raise FileNotFoundError(f"Directory not found: {directory_path_abs}") # Initialize file filter with smart exclusions config = self.credential_manager.load_config() file_filter = FileFilter( root_path=directory_path_obj, max_file_size_mb=config.max_file_size_mb, respect_gitignore=True, ) # Find all files to scan with intelligent filtering all_files = [] pattern = "**/*" if recursive else "*" logger.debug(f"Discovering files with pattern: {pattern}") for file_path in directory_path_obj.glob(pattern): if file_path.is_file(): all_files.append(file_path) logger.info(f"Discovered {len(all_files)} total files") # Apply smart filtering files_to_scan = file_filter.filter_files(all_files) logger.info(f"After filtering: {len(files_to_scan)} files to scan") if len(all_files) > len(files_to_scan): logger.info( f"Filtered out {len(all_files) - len(files_to_scan)} files (.gitignore, binary, too large, etc.)" ) # Handle case when no files to scan if not files_to_scan: logger.info("No files to scan after filtering") return # Perform directory-level Semgrep and LLM scans (same as regular scan_directory) # ... (This would contain the same directory-level scanning logic) # For simplicity in this implementation, we'll process files without # directory-level pre-scanning and yield results as they complete # Create a semaphore to limit concurrent operations try: cpu_count_val = os.cpu_count() or 4 except Exception: cpu_count_val = 4 max_workers = min(32, int(cpu_count_val) + 4, len(files_to_scan)) semaphore = asyncio.Semaphore(max_workers) logger.info(f"Using {max_workers} parallel workers for streaming scan") # Process files in batches and yield results successful_scans = 0 failed_scans = 0 for i in range(0, len(files_to_scan), batch_size): batch_files = files_to_scan[i : i + batch_size] logger.debug( f"Processing streaming batch: files {i+1}-{min(i+batch_size, len(files_to_scan))}" ) # Create tasks for this batch batch_tasks = [] for file_path in batch_files: # For streaming, we'll do individual file scans (simpler implementation) task = self.scan_file( file_path=file_path, use_llm=use_llm, use_semgrep=use_semgrep, use_validation=use_validation, severity_threshold=severity_threshold, ) batch_tasks.append(task) # Execute batch and yield results as they complete batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) for j, result in enumerate(batch_results): if isinstance(result, Exception): logger.error(f"Failed to process file {batch_files[j]}: {result}") failed_scans += 1 # Create error result error_result = EnhancedScanResult( file_path=str(batch_files[j]), llm_threats=[], semgrep_threats=[], scan_metadata={ "file_path": str(batch_files[j]), "error": str(result), "streaming_scan": True, "batch_processing": True, }, llm_usage_stats=None, ) yield error_result else: successful_scans += 1 yield result # Log progress after each batch logger.debug( f"Streamed batch {i//batch_size + 1}/{(len(files_to_scan) + batch_size - 1)//batch_size}" ) logger.info( f"=== Streaming directory scan complete - Processed {successful_scans + failed_scans} files " f"(Success: {successful_scans}, Failed: {failed_scans}) ===" ) async def _get_cached_scan_result( self, source_code: str, file_path: str, use_llm: bool, use_semgrep: bool, use_validation: bool, severity_threshold: Severity | None, ) -> EnhancedScanResult | None: """Get cached scan result if available.""" if not self.cache_manager: return None try: config = self.credential_manager.load_config() if not config.enable_caching: return None hasher = self.cache_manager.get_hasher() # Create cache key based on content and scan parameters content_hash = hasher.hash_content(source_code) scan_context = { "file_path": file_path, "use_llm": use_llm, "use_semgrep": use_semgrep, "use_validation": use_validation, "severity_threshold": ( severity_threshold.value if severity_threshold else None ), "semgrep_config": config.semgrep_config, "semgrep_rules": config.semgrep_rules, "llm_model": config.llm_model, } metadata_hash = hasher.hash_metadata(scan_context) cache_key = CacheKey( cache_type=CacheType.FILE_ANALYSIS, content_hash=content_hash, metadata_hash=metadata_hash, ) cached_data = self.cache_manager.get(cache_key) if cached_data and isinstance(cached_data, dict): # Reconstruct EnhancedScanResult from cached data return self._deserialize_scan_result(cached_data) except Exception as e: logger.warning(f"Failed to retrieve cached scan result: {e}") return None async def _cache_scan_result( self, result: EnhancedScanResult, source_code: str, file_path: str, use_llm: bool, use_semgrep: bool, use_validation: bool, severity_threshold: Severity | None, ) -> None: """Cache scan result for future use.""" if not self.cache_manager: return try: config = self.credential_manager.load_config() if not config.enable_caching: return hasher = self.cache_manager.get_hasher() # Create same cache key as used for retrieval content_hash = hasher.hash_content(source_code) scan_context = { "file_path": file_path, "use_llm": use_llm, "use_semgrep": use_semgrep, "use_validation": use_validation, "severity_threshold": ( severity_threshold.value if severity_threshold else None ), "semgrep_config": config.semgrep_config, "semgrep_rules": config.semgrep_rules, "llm_model": config.llm_model, } metadata_hash = hasher.hash_metadata(scan_context) cache_key = CacheKey( cache_type=CacheType.FILE_ANALYSIS, content_hash=content_hash, metadata_hash=metadata_hash, ) # Serialize scan result for caching serialized_result = self._serialize_scan_result(result) # Cache for shorter duration than LLM responses (scan results can change with rule updates) cache_expiry_seconds = ( config.cache_max_age_hours * 1800 ) # Half the normal duration self.cache_manager.put(cache_key, serialized_result, cache_expiry_seconds) logger.debug(f"Cached scan result for {file_path}") except Exception as e: logger.warning(f"Failed to cache scan result: {e}") def _serialize_scan_result(self, result: EnhancedScanResult) -> dict: """Serialize EnhancedScanResult for caching.""" return { "file_path": result.file_path, "language": result.language, "llm_threats": [ SerializableThreatMatch.from_infrastructure_threat_match( threat ).to_dict() for threat in result.llm_threats ], "semgrep_threats": [ SerializableThreatMatch.from_infrastructure_threat_match( threat ).to_dict() for threat in result.semgrep_threats ], "scan_metadata": result.scan_metadata, "validation_results": result.validation_results, "stats": result.stats, "llm_usage_stats": result.llm_usage_stats, } def _deserialize_scan_result(self, cached_data: dict) -> EnhancedScanResult: """Deserialize cached data back to EnhancedScanResult.""" # Reconstruct threat matches as infrastructure objects llm_threats = [ SerializableThreatMatch.from_dict( threat_data ).to_infrastructure_threat_match() for threat_data in cached_data.get("llm_threats", []) ] semgrep_threats = [ SerializableThreatMatch.from_dict( threat_data ).to_infrastructure_threat_match() for threat_data in cached_data.get("semgrep_threats", []) ] # Create EnhancedScanResult result = EnhancedScanResult( file_path=cached_data["file_path"], llm_threats=llm_threats, semgrep_threats=semgrep_threats, scan_metadata=cached_data.get("scan_metadata", {}), validation_results=cached_data.get("validation_results", {}), llm_usage_stats=cached_data.get("llm_usage_stats"), ) # Manually set computed properties that might not recompute correctly if "stats" in cached_data: result.stats = cached_data["stats"] return result def clear_cache(self) -> None: """Clear all caches used by the scan engine.""" logger.info("Clearing scan engine caches...") # Clear main cache manager if self.cache_manager: self.cache_manager.clear() # Clear semgrep scanner cache if self.semgrep_scanner: self.semgrep_scanner.clear_cache() # Clear LLM analyzer cache if available if self.llm_analyzer and hasattr(self.llm_analyzer, "clear_cache"): self.llm_analyzer.clear_cache() # Clear token estimator cache if available if self.llm_analyzer and hasattr(self.llm_analyzer, "token_estimator"): if hasattr(self.llm_analyzer.token_estimator, "clear_cache"): self.llm_analyzer.token_estimator.clear_cache() logger.info("Scan engine caches cleared successfully")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server