Skip to main content
Glama

Adversary MCP Server

by brettbergin
llm_adapter.py25.4 kB
"""Adapter for LLMScanner to implement domain IScanStrategy interface.""" import asyncio import time from pathlib import Path from typing import Any from adversary_mcp_server.application.adapters.input_models import ( LLMScanResultInput, safe_convert_to_input_model, ) from adversary_mcp_server.domain.entities.scan_request import ScanRequest from adversary_mcp_server.domain.entities.scan_result import ScanResult from adversary_mcp_server.domain.entities.threat_match import ThreatMatch from adversary_mcp_server.domain.interfaces import IScanStrategy, ScanError from adversary_mcp_server.domain.value_objects.confidence_score import ConfidenceScore from adversary_mcp_server.domain.value_objects.file_path import FilePath from adversary_mcp_server.domain.value_objects.scan_context import ScanContext from adversary_mcp_server.domain.value_objects.severity_level import SeverityLevel from adversary_mcp_server.logger import get_logger from adversary_mcp_server.scanner.language_mapping import LanguageMapper from adversary_mcp_server.scanner.llm_scanner import LLMScanner logger = get_logger("llm_adapter") class LLMScanStrategy(IScanStrategy): """ Adapter that wraps LLMScanner to implement the domain IScanStrategy interface. This adapter enables the domain layer to use LLM-powered security analysis while maintaining clean separation between domain logic and infrastructure concerns. Features session-aware analysis when available for enhanced context and cross-file vulnerability detection. """ def __init__( self, llm_scanner: LLMScanner | None = None, enable_sessions: bool = True ): """Initialize the adapter with an optional LLMScanner instance. Args: llm_scanner: Pre-configured LLMScanner instance (optional) enable_sessions: Enable session-aware analysis capabilities """ self.enable_sessions = enable_sessions if llm_scanner: self._scanner = llm_scanner else: # Try to initialize with default dependencies try: from adversary_mcp_server.credentials import get_credential_manager credential_manager = get_credential_manager() self._scanner = LLMScanner( credential_manager, enable_sessions=enable_sessions ) except Exception as e: logger.error( f"Failed to initialize LLMScanner for strategy: {e}", extra={ "error_type": type(e).__name__, "enable_sessions": enable_sessions, "component": "llm_scan_strategy", "impact": "LLM analysis will be unavailable", }, ) self._scanner = None def get_strategy_name(self) -> str: """Get the name of this scan strategy.""" base_name = "llm_ai_analysis" if self._scanner and self._scanner.is_session_aware_available(): return f"{base_name}_session_aware" return base_name def can_scan(self, context: ScanContext) -> bool: """ Check if this strategy can scan the given context. LLM scanner can analyze files, directories, and code snippets when enabled. """ # Check if scanner is available if self._scanner is None: return False # LLM can handle most scan types, but may have size limitations if context.metadata.scan_type in [ "file", "directory", "code", "diff", "incremental", ]: # Check content size constraints for code scans if context.content and len(context.content) > 50000: # 50KB limit return False return True return False def get_supported_languages(self) -> list[str]: """Get list of programming languages supported by LLM analysis.""" return LanguageMapper.get_supported_languages() async def execute_scan(self, request: ScanRequest) -> ScanResult: """ Execute LLM scan using the domain request and return domain result. This method coordinates between domain and infrastructure layers: 1. Converts domain request to LLM analysis parameters 2. Executes LLM analysis 3. Converts LLM results to domain objects """ if self._scanner is None: # Return empty result if scanner not available return ScanResult.create_empty(request) scan_start_time = time.time() try: context = request.context scan_type = context.metadata.scan_type logger.info( f"Starting LLM scan - Type: {scan_type}, Target: {context.target_path}" ) # Convert domain objects to infrastructure parameters llm_results = [] # Use session-aware analysis when available and project context is provided session_available = ( self._scanner.is_session_aware_available() if self._scanner else False ) has_project_context = self._has_project_context(context) logger.debug( f"Session analysis check - Available: {session_available}, " f"Scan type: {scan_type}, Has context: {has_project_context}" ) use_session_analysis = ( session_available and scan_type in ["file", "directory"] and has_project_context ) if use_session_analysis: logger.debug(f"Using session-aware analysis for {scan_type} scan") try: llm_results = await self._analyze_with_session(context) logger.debug( f"Session analysis completed with {len(llm_results)} results" ) except Exception as e: logger.error( f"Session analysis failed, falling back to legacy: {e}" ) use_session_analysis = False if not use_session_analysis or not llm_results: # Fall back to legacy analysis methods logger.info(f"Using legacy analysis for {scan_type} scan") if scan_type == "file": # File analysis - use content from context if available, otherwise read file if context.content: llm_results = await self._analyze_code( context.content, context.language ) else: file_path = str(context.target_path) llm_results = await self._analyze_file( file_path, context.language ) elif scan_type == "directory": # Directory analysis dir_path = str(context.target_path) llm_results = await self._analyze_directory(dir_path) elif scan_type == "code": # Code snippet analysis code_content = context.content or "" llm_results = await self._analyze_code( code_content, context.language ) elif scan_type == "diff": # Diff analysis - analyze the target file with diff context file_path = str(context.target_path) llm_results = await self._analyze_file(file_path, context.language) logger.info( f"Legacy analysis completed with {len(llm_results)} results" ) # Convert infrastructure results to domain objects domain_threats = self._convert_to_domain_threats(llm_results, request) # Apply severity filtering filtered_threats = self._apply_severity_filter( domain_threats, request.severity_threshold ) # Apply confidence filtering confidence_threshold = ConfidenceScore( 0.5 ) # Default LLM confidence threshold high_confidence_threats = [ threat for threat in filtered_threats if threat.confidence.meets_threshold(confidence_threshold) ] # Create domain scan result with performance metrics analysis_type = "ai_powered" if self._scanner and self._scanner.is_session_aware_available(): analysis_type = "session_aware_ai" scan_duration = time.time() - scan_start_time logger.info( f"LLM scan completed - Duration: {scan_duration:.2f}s, " f"Total findings: {len(llm_results)}, " f"High-confidence findings: {len(high_confidence_threats)}, " f"Analysis type: {analysis_type}" ) return ScanResult.create_from_threats( request=request, threats=high_confidence_threats, scan_metadata={ "scanner": self.get_strategy_name(), "analysis_type": analysis_type, "total_findings": len(llm_results), "filtered_findings": len(high_confidence_threats), "confidence_threshold": confidence_threshold.get_percentage(), "session_aware": ( self._scanner.is_session_aware_available() if self._scanner else False ), "scan_duration_seconds": round(scan_duration, 2), "findings_per_second": round( len(llm_results) / max(scan_duration, 0.01), 2 ), }, ) except Exception as e: # Log performance metrics even on failure scan_duration = time.time() - scan_start_time logger.error( f"LLM scan failed after {scan_duration:.2f}s - Error: {str(e)}" ) # Convert infrastructure exceptions to domain exceptions raise ScanError(f"LLM scan failed: {str(e)}") from e async def _analyze_file( self, file_path: str, language: str | None = None ) -> list[dict[str, Any]]: """Execute LLM file analysis.""" # LLMScanner.analyze_file is async, so call it directly findings = await self._scanner.analyze_file(file_path, language or "") return [self._finding_to_dict(finding) for finding in findings] async def _analyze_directory(self, dir_path: str) -> list[dict[str, Any]]: """Execute LLM directory analysis.""" # LLMScanner.analyze_directory is async, so call it directly findings = await self._scanner.analyze_directory(dir_path) return [self._finding_to_dict(finding) for finding in findings] async def _analyze_code( self, code_content: str, language: str | None = None ) -> list[dict[str, Any]]: """Execute LLM code analysis.""" # LLMScanner.analyze_code is sync, so use executor loop = asyncio.get_event_loop() findings = await loop.run_in_executor( None, lambda: self._scanner.analyze_code(code_content, "<code>", language or ""), ) return [self._finding_to_dict(finding) for finding in findings] async def _analyze_with_session(self, context: ScanContext) -> list[dict[str, Any]]: """Execute session-aware analysis when available.""" scan_type = context.metadata.scan_type if scan_type == "file": # Use session-aware file analysis file_path = Path(context.target_path) scan_scope = file_path.parent context_hint = getattr(context.metadata, "analysis_focus", None) findings = await self._scanner.analyze_file_with_context( file_path=file_path, project_root=scan_scope, context_hint=context_hint, max_findings=None, ) elif scan_type == "directory": # Use session-aware directory analysis scan_scope = Path(context.target_path) analysis_focus = getattr( context.metadata, "analysis_focus", "comprehensive security analysis" ) findings = await self._scanner.analyze_project_with_session( project_root=scan_scope, analysis_focus=analysis_focus, max_findings=None, ) elif scan_type == "incremental": # Use incremental analysis for changed files if hasattr(context.metadata, "changed_files") and hasattr( context.metadata, "session_id" ): changed_files = [Path(f) for f in context.metadata.changed_files] commit_hash = getattr(context.metadata, "commit_hash", None) change_context = getattr(context.metadata, "change_context", None) if self._scanner.session_manager: findings = await self._scanner.session_manager.analyze_changes_incrementally( session_id=context.metadata.session_id, changed_files=changed_files, commit_hash=commit_hash, change_context=change_context, ) else: logger.warning( "Session manager not available for incremental analysis" ) return [] else: logger.warning( "Incremental analysis requires changed_files and session_id metadata" ) return [] else: # Fallback to legacy analysis logger.warning( f"Session-aware analysis not supported for scan type: {scan_type}" ) return [] return [self._finding_to_dict(finding) for finding in findings] def _has_project_context(self, context: ScanContext) -> bool: """Check if the context has sufficient information for session-aware analysis.""" # Check if target path exists and looks like it has project structure target_path = Path(context.target_path) if not target_path.exists(): return False # Session-aware analysis is always preferred when available return True def _finding_to_dict(self, finding) -> dict[str, Any]: """Convert LLMSecurityFinding to dictionary format expected by _convert_to_domain_threats.""" # Handle both LLMSecurityFinding and other finding types if hasattr(finding, "finding_type"): # LLMSecurityFinding format finding_id = finding.finding_type title = finding.finding_type.replace("_", " ").title() logger.debug(f"Converting LLMSecurityFinding to dict: {finding_id}") else: # Legacy or other finding format finding_id = getattr(finding, "rule_id", "unknown_finding") title = getattr(finding, "rule_name", "Security Finding") logger.debug(f"Converting legacy finding to dict: {finding_id}") return { # Map LLMSecurityFinding fields to the format expected by _convert_to_domain_threats "finding_id": finding_id, "title": title, "description": finding.description, "severity": str( finding.severity ), # LLMSecurityFinding.severity is already a string "line_number": finding.line_number, "code_snippet": finding.code_snippet, "file_path": finding.file_path, "confidence": finding.confidence, "explanation": finding.explanation, "recommendation": finding.recommendation, "column_number": 1, # Default column number "cwe_id": getattr(finding, "cwe_id", None), "owasp_category": getattr(finding, "owasp_category", None), } def _convert_to_domain_threats( self, llm_results: list[dict[str, Any]], request: ScanRequest ) -> list[ThreatMatch]: """Convert LLM analysis results to domain ThreatMatch objects.""" threats = [] for result in llm_results: try: # Extract data from LLM result format rule_id = result.get("finding_id", f"llm_{len(threats) + 1}") rule_name = result.get("title", "AI-detected Security Issue") description = result.get( "description", "Security vulnerability detected by AI analysis" ) # Map LLM severity to domain severity llm_severity = result.get("severity", "medium") severity = self._map_severity(llm_severity) # Extract location information line_number = result.get("line_number", 1) column_number = result.get("column_number", 1) # Extract file path - use unique placeholder for missing paths file_path_str = result.get("file_path") if not file_path_str: # For directory scans, generate unique placeholder instead of using directory path if request.context.metadata.scan_type == "directory": import uuid placeholder_name = f"<unknown-file-{uuid.uuid4().hex[:8]}>" logger.warning( f"LLM finding missing file_path, using placeholder: {placeholder_name}" ) file_path_str = placeholder_name else: # For file/code scans, fall back to target path file_path_str = str(request.context.target_path) file_path = FilePath.from_string(file_path_str) # Validate that file_path is not a directory (unless it's a placeholder) if ( not file_path_str.startswith("<unknown-file-") and file_path.exists() and file_path.is_directory() ): logger.warning( f"Skipping LLM finding with directory path: {file_path_str}" ) continue # Extract code snippet code_snippet = result.get("code_snippet", "") # Determine category from LLM analysis category = self._determine_category(result) # Extract confidence from LLM analysis confidence_score = result.get("confidence", 0.7) confidence = ConfidenceScore(min(1.0, max(0.0, confidence_score))) # Create domain threat threat = ThreatMatch( rule_id=rule_id, rule_name=rule_name, description=description, category=category, severity=severity, file_path=file_path, line_number=line_number, column_number=column_number, code_snippet=code_snippet, confidence=confidence, source_scanner="llm", metadata={ "llm_analysis": True, "reasoning": result.get("reasoning", ""), "remediation": result.get("remediation", ""), "original_result": result, }, ) # Add remediation advice if available if "remediation" in result: threat = threat.add_remediation_advice(result["remediation"]) threats.append(threat) except Exception as e: # Log conversion error but continue processing other results logger.warning(f"Failed to convert LLM result to domain threat: {e}") continue return threats def _convert_legacy_threat_to_domain(self, legacy_threat) -> ThreatMatch: """Convert a single legacy ThreatMatch to domain ThreatMatch using type-safe input models.""" # Convert to type-safe input model to avoid getattr/hasattr calls input_threat = safe_convert_to_input_model(legacy_threat, LLMScanResultInput) # Map scanner severity to domain severity severity = SeverityLevel.from_string(input_threat.severity) # Convert file path file_path = FilePath.from_string(str(input_threat.file_path)) logger.debug( f"[THREAT_MATCH_DEBUG] Creating ThreatMatch: rule_id='{input_threat.rule_id}', line_number={input_threat.line_number}" ) # Create domain threat with type-safe access threat_match = ThreatMatch( rule_id=input_threat.rule_id, rule_name=input_threat.rule_name, description=input_threat.description, category=input_threat.category, severity=severity, file_path=file_path, line_number=input_threat.line_number, column_number=input_threat.column_number, code_snippet=input_threat.code_snippet, function_name=input_threat.function_name, exploit_examples=input_threat.exploit_examples, remediation=input_threat.remediation, references=input_threat.references, cwe_id=input_threat.cwe_id, owasp_category=input_threat.owasp_category, confidence=ConfidenceScore(input_threat.confidence), source_scanner="llm", is_false_positive=input_threat.is_false_positive, ) logger.debug( f"[THREAT_MATCH_DEBUG] Created ThreatMatch with fingerprint: {threat_match.get_fingerprint()}" ) return threat_match def _map_severity(self, llm_severity: str) -> SeverityLevel: """Map LLM severity to domain SeverityLevel.""" severity_mapping = { "critical": "critical", "high": "high", "medium": "medium", "low": "low", "info": "low", "warning": "medium", "error": "high", } domain_severity = severity_mapping.get(llm_severity.lower(), "medium") return SeverityLevel.from_string(domain_severity) def _determine_category(self, result: dict[str, Any]) -> str: """Determine threat category from LLM analysis result.""" # Extract category from LLM analysis if "category" in result: return result["category"] # Infer category from description/title description = ( result.get("description", "") + " " + result.get("title", "") ).lower() if any(keyword in description for keyword in ["injection", "sql", "command"]): return "injection" elif any(keyword in description for keyword in ["xss", "cross-site"]): return "xss" elif any( keyword in description for keyword in ["crypto", "encryption", "hash"] ): return "cryptography" elif any(keyword in description for keyword in ["auth", "session", "token"]): return "authentication" elif any( keyword in description for keyword in ["path", "traversal", "directory"] ): return "path_traversal" elif any( keyword in description for keyword in ["disclosure", "leak", "expose"] ): return "information_disclosure" elif any( keyword in description for keyword in ["buffer", "overflow", "memory"] ): return "memory_safety" elif any(keyword in description for keyword in ["dos", "denial", "resource"]): return "denial_of_service" else: return "security" def _apply_severity_filter( self, threats: list[ThreatMatch], threshold: SeverityLevel | None ) -> list[ThreatMatch]: """Filter threats based on severity threshold.""" if threshold is None: return threats filtered = [] for threat in threats: try: if threat.severity.meets_threshold(threshold): filtered.append(threat) except Exception as e: logger.error( f"Severity comparison failed: threat.severity={threat.severity} " f"(type={type(threat.severity)}, id={id(threat.severity)}) vs " f"threshold={threshold} (type={type(threshold)}, id={id(threshold)})" ) logger.error(f"Threat rule_id: {threat.rule_id}") raise e return filtered

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server