Skip to main content
Glama

Adversary MCP Server

by brettbergin
diff_scanner.py33.1 kB
"""Git diff scanner for analyzing security vulnerabilities in code changes.""" import asyncio import re import time from pathlib import Path from ..config_manager import get_config_manager from ..logger import get_logger from ..resilience import ErrorHandler, ResilienceConfig from .scan_engine import EnhancedScanResult, ScanEngine from .types import Severity logger = get_logger("diff_scanner") class GitDiffError(Exception): """Exception raised when git diff operations fail.""" pass class DiffChunk: """Represents a chunk of changes in a git diff.""" def __init__( self, file_path: str, old_start: int, old_count: int, new_start: int, new_count: int, ): self.file_path = file_path self.old_start = old_start self.old_count = old_count self.new_start = new_start self.new_count = new_count self.added_lines: list[tuple[int, str]] = [] # (line_number, content) self.removed_lines: list[tuple[int, str]] = [] # (line_number, content) self.context_lines: list[tuple[int, str]] = [] # (line_number, content) logger.debug( f"Created DiffChunk for {file_path}: " f"old ({old_start}+{old_count}), new ({new_start}+{new_count})" ) def add_line(self, line_type: str, line_number: int, content: str) -> None: """Add a line to the diff chunk.""" if line_type == "+": self.added_lines.append((line_number, content)) logger.debug(f"Added line (+) {line_number}: {content[:50]}...") elif line_type == "-": self.removed_lines.append((line_number, content)) logger.debug(f"Removed line (-) {line_number}: {content[:50]}...") else: self.context_lines.append((line_number, content)) def get_changed_code(self) -> str: """Get the changed code as a single string.""" logger.debug( f"Getting changed code for {self.file_path}: " f"{len(self.context_lines)} context + {len(self.added_lines)} added lines" ) lines = [] # Add context lines for better analysis for _, content in self.context_lines: lines.append(content) # Add added lines (new code to scan) for _, content in self.added_lines: lines.append(content) result = "\n".join(lines) logger.debug(f"Combined changed code: {len(result)} characters") return result def get_added_lines_with_minimal_context(self) -> str: """Get added lines with minimal context for better analysis. This includes only 1-2 context lines around changes, not all context, which is useful for LLM analysis while keeping the scope focused. """ file_path_abs = str(Path(self.file_path).resolve()) logger.debug(f"Getting added lines with minimal context for {file_path_abs}") lines = [] # Add minimal context (max 2 lines before changes) context_to_include = self.context_lines[:2] if self.context_lines else [] logger.debug(f"Including {len(context_to_include)} context lines") for _, content in context_to_include: lines.append(f"// CONTEXT: {content}") # Add all added lines (these are what we're actually analyzing) for _, content in self.added_lines: lines.append(content) result = "\n".join(lines) logger.debug(f"Added lines with context: {len(result)} characters") return result def get_added_lines_only(self) -> str: """Get only the added lines as a single string.""" logger.debug( f"Getting only added lines for {self.file_path}: {len(self.added_lines)} lines" ) result = "\n".join(content for _, content in self.added_lines) logger.debug(f"Added lines only: {len(result)} characters") return result class GitDiffParser: """Parser for git diff output.""" def __init__(self): logger.debug("Initializing GitDiffParser with regex patterns") self.diff_header_pattern = re.compile(r"^diff --git a/(.*) b/(.*)$") self.chunk_header_pattern = re.compile( r"^@@\s*-(\d+)(?:,(\d+))?\s*\+(\d+)(?:,(\d+))?\s*@@" ) self.file_header_pattern = re.compile(r"^(\+\+\+|---)\s+(.*)") logger.debug("GitDiffParser initialized successfully") def parse_diff(self, diff_output: str) -> dict[str, list[DiffChunk]]: """Parse git diff output into structured chunks. Args: diff_output: Raw git diff output Returns: Dictionary mapping file paths to lists of DiffChunk objects """ logger.info("=== Starting diff parsing ===") logger.debug(f"Parsing diff output: {len(diff_output)} characters") chunks_by_file: dict[str, list[DiffChunk]] = {} current_file = None current_chunk = None old_line_num = 0 new_line_num = 0 lines = diff_output.split("\n") logger.debug(f"Diff contains {len(lines)} lines to parse") for line_idx, line in enumerate(lines): # Check for file header diff_match = self.diff_header_pattern.match(line) if diff_match: current_file = diff_match.group(2) # Use the 'b/' path (destination) chunks_by_file[current_file] = [] logger.info(f"Found file header: {current_file}") continue # Check for chunk header chunk_match = self.chunk_header_pattern.match(line) if chunk_match and current_file: old_start = int(chunk_match.group(1)) old_count = int(chunk_match.group(2) or "1") new_start = int(chunk_match.group(3)) new_count = int(chunk_match.group(4) or "1") current_chunk = DiffChunk( current_file, old_start, old_count, new_start, new_count ) chunks_by_file[current_file].append(current_chunk) logger.debug( f"Created chunk for {current_file}: " f"old({old_start},{old_count}) new({new_start},{new_count})" ) old_line_num = old_start new_line_num = new_start continue # Check for content lines if current_chunk and line: if line.startswith("+") and not line.startswith("+++"): content = line[1:] # Remove the '+' prefix current_chunk.add_line("+", new_line_num, content) new_line_num += 1 elif line.startswith("-") and not line.startswith("---"): content = line[1:] # Remove the '-' prefix current_chunk.add_line("-", old_line_num, content) old_line_num += 1 elif line.startswith(" "): content = line[1:] # Remove the ' ' prefix current_chunk.add_line(" ", new_line_num, content) old_line_num += 1 new_line_num += 1 # Log parsing results total_chunks = sum(len(chunks) for chunks in chunks_by_file.values()) logger.info( f"=== Diff parsing complete - {len(chunks_by_file)} files, {total_chunks} chunks ===" ) for file_path, chunks in chunks_by_file.items(): total_added = sum(len(chunk.added_lines) for chunk in chunks) total_removed = sum(len(chunk.removed_lines) for chunk in chunks) logger.debug( f"File {file_path}: {len(chunks)} chunks, +{total_added} -{total_removed} lines" ) return chunks_by_file class GitDiffScanner: """Scanner for analyzing security vulnerabilities in git diffs.""" def __init__( self, scan_engine: ScanEngine | None = None, working_dir: Path | None = None, metrics_collector=None, ): """Initialize the git diff scanner. Args: scan_engine: Scan engine for vulnerability detection working_dir: Working directory for git operations (defaults to current directory) metrics_collector: Optional metrics collector for diff scanning analytics """ logger.info("=== Initializing GitDiffScanner ===") self.scan_engine = scan_engine or ScanEngine() self.working_dir = working_dir or Path.cwd() self.parser = GitDiffParser() self.metrics_collector = metrics_collector self.config_manager = get_config_manager() # Initialize ErrorHandler for git command resilience resilience_config = ResilienceConfig( enable_circuit_breaker=True, failure_threshold=self.config_manager.dynamic_limits.circuit_breaker_failure_threshold, recovery_timeout_seconds=self.config_manager.dynamic_limits.circuit_breaker_recovery_timeout, enable_retry=True, max_retry_attempts=self.config_manager.dynamic_limits.max_retry_attempts, base_delay_seconds=self.config_manager.dynamic_limits.retry_base_delay, enable_graceful_degradation=True, default_timeout_seconds=float( self.config_manager.dynamic_limits.scan_timeout_seconds ), ) self.error_handler = ErrorHandler(resilience_config, self.metrics_collector) logger.info(f"Working directory: {self.working_dir}") logger.debug("Scan engine, parser, and error handler initialized") logger.info("=== GitDiffScanner initialization complete ===") async def _run_git_command( self, args: list[str], working_dir: Path | None = None ) -> str: """Run a git command and return its output with resilience handling. Args: args: Git command arguments working_dir: Working directory for git operations (uses self.working_dir if not specified) Returns: Command output as string Raises: GitDiffError: If the git command fails """ target_dir = working_dir or self.working_dir cmd = ["git"] + args command_str = " ".join(cmd) logger.debug(f"Executing git command: {command_str} in {target_dir}") # Record git operation start git_start_time = time.time() if self.metrics_collector: self.metrics_collector.record_metric( "git_operations_total", 1, labels={"command": args[0] if args else "unknown", "status": "started"}, ) # Define the git operation for resilience handling async def git_operation(): proc = await asyncio.create_subprocess_exec( *cmd, cwd=target_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=float( self.config_manager.dynamic_limits.scan_timeout_seconds ), ) if proc.returncode == 0: return stdout.decode("utf-8") else: error_msg = ( stderr.decode("utf-8").strip() if stderr else "Unknown error" ) raise GitDiffError(f"Git command failed: {error_msg}") finally: if proc.returncode is None: try: proc.terminate() await proc.wait() except ProcessLookupError: pass # Define fallback function for graceful degradation async def git_fallback(*args, **kwargs): logger.warning(f"Git service degraded for command: {' '.join(cmd)}") raise GitDiffError(f"Git command unavailable: {' '.join(cmd)}") try: # Execute with comprehensive error recovery recovery_result = await self.error_handler.execute_with_recovery( git_operation, operation_name=f"git_{'_'.join(args[:2])}", circuit_breaker_name="git_service", fallback_func=git_fallback, ) if recovery_result.success: result = recovery_result.result git_duration = time.time() - git_start_time # Record successful git operation if self.metrics_collector: self.metrics_collector.record_metric( "git_operations_total", 1, labels={ "command": args[0] if args else "unknown", "status": "success", }, ) self.metrics_collector.record_histogram( "git_operation_duration_seconds", git_duration, labels={ "command": args[0] if args else "unknown", "status": "success", }, ) self.metrics_collector.record_metric( "git_output_size_bytes", len(result) ) logger.debug(f"Git command successful: {len(result)} chars output") return result else: git_duration = time.time() - git_start_time error_msg = recovery_result.error_message or "Git service unavailable" # Record failed git operation if self.metrics_collector: self.metrics_collector.record_metric( "git_operations_total", 1, labels={ "command": args[0] if args else "unknown", "status": "failure", }, ) self.metrics_collector.record_histogram( "git_operation_duration_seconds", git_duration, labels={ "command": args[0] if args else "unknown", "status": "failure", }, ) raise GitDiffError(f"Git command failed with recovery: {error_msg}") except FileNotFoundError: git_duration = time.time() - git_start_time # Record git not found error if self.metrics_collector: self.metrics_collector.record_metric( "git_operations_total", 1, labels={ "command": args[0] if args else "unknown", "status": "not_found", }, ) self.metrics_collector.record_histogram( "git_operation_duration_seconds", git_duration, labels={ "command": args[0] if args else "unknown", "status": "not_found", }, ) logger.error("Git command not found in PATH") raise GitDiffError( "Git command not found. Please ensure git is installed and in PATH." ) async def _validate_branches( self, source_branch: str, target_branch: str, working_dir: Path | None = None ) -> None: """Validate that the specified branches exist. Args: source_branch: Source branch name target_branch: Target branch name working_dir: Working directory for git operations (uses self.working_dir if not specified) Raises: GitDiffError: If either branch doesn't exist """ logger.info(f"Validating branches: {source_branch} -> {target_branch}") try: # Check if source branch exists logger.debug(f"Validating source branch: {source_branch}") await self._run_git_command( ["rev-parse", "--verify", f"{source_branch}^{{commit}}"], working_dir ) logger.debug(f"Source branch {source_branch} exists") # Check if target branch exists logger.debug(f"Validating target branch: {target_branch}") await self._run_git_command( ["rev-parse", "--verify", f"{target_branch}^{{commit}}"], working_dir ) logger.debug(f"Target branch {target_branch} exists") logger.info("Branch validation successful") except GitDiffError as e: logger.error(f"Branch validation failed: {e}") raise GitDiffError(f"Branch validation failed: {e}") def _detect_language_from_path(self, file_path: str) -> str: """Detect programming language from file path using language mapping. Args: file_path: Path to the file Returns: Programming language name (e.g., 'python', 'javascript') or 'generic' for unknown """ from .language_mapping import LanguageMapper detected_language = LanguageMapper.detect_language_from_extension(file_path) logger.debug(f"Language detected for {file_path}: {detected_language}") return detected_language async def get_diff_changes( self, source_branch: str, target_branch: str, working_dir: Path | None = None ) -> dict[str, list[DiffChunk]]: """Get diff changes between two branches. Args: source_branch: Source branch (e.g., 'feature-branch') target_branch: Target branch (e.g., 'main') working_dir: Working directory for git operations (uses self.working_dir if not specified) Returns: Dictionary mapping file paths to lists of DiffChunk objects Raises: GitDiffError: If git operations fail """ diff_analysis_start_time = time.time() logger.info(f"=== Getting diff changes: {source_branch} -> {target_branch} ===") # Record diff analysis start if self.metrics_collector: self.metrics_collector.record_metric( "diff_analysis_operations_total", 1, labels={"source_branch": source_branch, "target_branch": target_branch}, ) # Validate branches exist await self._validate_branches(source_branch, target_branch, working_dir) # Get diff between branches diff_args = ["diff", f"{target_branch}...{source_branch}"] logger.debug(f"Getting diff with command: git {' '.join(diff_args)}") diff_output = await self._run_git_command(diff_args, working_dir) if not diff_output.strip(): logger.info( f"No differences found between {source_branch} and {target_branch}" ) # Record no changes metric if self.metrics_collector: diff_duration = time.time() - diff_analysis_start_time self.metrics_collector.record_histogram( "diff_analysis_duration_seconds", diff_duration, labels={"outcome": "no_changes"}, ) self.metrics_collector.record_metric("diff_analysis_files_changed", 0) return {} logger.info(f"Diff output received: {len(diff_output)} characters") # Parse the diff output logger.debug("Parsing diff output...") parse_start_time = time.time() changes = self.parser.parse_diff(diff_output) parse_duration = time.time() - parse_start_time # Record diff analysis metrics if self.metrics_collector: diff_duration = time.time() - diff_analysis_start_time total_chunks = sum(len(chunks) for chunks in changes.values()) total_added_lines = sum( len(chunk.added_lines) for chunks in changes.values() for chunk in chunks ) self.metrics_collector.record_histogram( "diff_analysis_duration_seconds", diff_duration, labels={"outcome": "changes_found"}, ) self.metrics_collector.record_histogram( "diff_parsing_duration_seconds", parse_duration ) self.metrics_collector.record_metric( "diff_analysis_files_changed", len(changes) ) self.metrics_collector.record_metric( "diff_analysis_chunks_total", total_chunks ) self.metrics_collector.record_metric( "diff_analysis_added_lines_total", total_added_lines ) self.metrics_collector.record_metric( "diff_output_size_bytes", len(diff_output) ) logger.info(f"=== Diff changes retrieved - {len(changes)} files changed ===") return changes async def scan_diff( self, source_branch: str, target_branch: str, working_dir: Path | None = None, use_llm: bool = False, use_semgrep: bool = True, use_validation: bool = True, use_rules: bool = True, severity_threshold: Severity | None = None, ) -> dict[str, list[EnhancedScanResult]]: """Scan security vulnerabilities in git diff changes. Args: source_branch: Source branch name target_branch: Target branch name working_dir: Working directory for git operations (uses self.working_dir if not specified) use_llm: Whether to use LLM analysis use_semgrep: Whether to use Semgrep analysis use_validation: Whether to use LLM validation to filter false positives use_rules: Whether to use rules-based scanner severity_threshold: Minimum severity threshold for filtering Returns: Dictionary mapping file paths to lists of scan results Raises: GitDiffError: If git operations fail """ diff_scan_start_time = time.time() logger.info(f"=== Starting diff scan: {source_branch} -> {target_branch} ===") logger.debug( f"Scan parameters - LLM: {use_llm}, Semgrep: {use_semgrep}, " f"Validation: {use_validation}, Rules: {use_rules}, Severity: {severity_threshold}" ) # Record diff scan start if self.metrics_collector: self.metrics_collector.record_metric( "diff_scan_operations_total", 1, labels={ "source_branch": source_branch, "target_branch": target_branch, "use_llm": str(use_llm), "use_semgrep": str(use_semgrep), "use_validation": str(use_validation), }, ) # Get diff changes logger.debug("Retrieving diff changes...") diff_changes = await self.get_diff_changes( source_branch, target_branch, working_dir ) if not diff_changes: logger.info("No diff changes found - returning empty results") # Record no changes scan if self.metrics_collector: diff_scan_duration = time.time() - diff_scan_start_time self.metrics_collector.record_histogram( "diff_scan_duration_seconds", diff_scan_duration, labels={"outcome": "no_changes"}, ) self.metrics_collector.record_metric("diff_scan_files_processed", 0) return {} logger.info(f"Processing {len(diff_changes)} changed files") scan_results: dict[str, list[EnhancedScanResult]] = {} files_processed = 0 files_skipped = 0 files_failed = 0 total_threats_found = 0 for file_path, chunks in diff_changes.items(): file_path_abs = str(Path(file_path).resolve()) logger.debug(f"Processing file: {file_path_abs}") # Get language (now always generic) language = self._detect_language_from_path(file_path) logger.info(f"Scanning {file_path_abs} as {language}") # Combine only the newly added lines from all chunks all_added_code = [] line_mapping = {} # Map from combined code lines to original diff lines total_added_lines = 0 combined_line_num = 1 for chunk_idx, chunk in enumerate(chunks): logger.debug( f"Processing chunk {chunk_idx + 1}/{len(chunks)} for {file_path}" ) # Only scan newly added lines, not context added_code = chunk.get_added_lines_only() if added_code.strip(): all_added_code.append(added_code) chunk_added_lines = len(chunk.added_lines) total_added_lines += chunk_added_lines logger.debug( f"Chunk {chunk_idx + 1}: {chunk_added_lines} added lines" ) # Map line numbers for accurate reporting (only for added lines) for i, (original_line_num, line_content) in enumerate( chunk.added_lines ): if line_content.strip(): # Skip empty lines line_mapping[combined_line_num] = original_line_num combined_line_num += 1 if not all_added_code: logger.debug(f"No added code to scan in {file_path_abs}") files_skipped += 1 continue logger.info(f"Scanning {total_added_lines} added lines in {file_path_abs}") # Scan the combined added code (only new lines) full_added_code = "\n".join(all_added_code) logger.debug(f"Combined added code: {len(full_added_code)} characters") try: logger.debug(f"Calling scan_engine.scan_code for {file_path_abs}...") scan_result = await self.scan_engine.scan_code( source_code=full_added_code, file_path=file_path, use_llm=use_llm, use_semgrep=use_semgrep, use_validation=use_validation, severity_threshold=severity_threshold, ) # Update line numbers to match original file original_threat_count = len(scan_result.all_threats) remapped_threats = 0 for threat in scan_result.all_threats: if threat.line_number in line_mapping: old_line = threat.line_number threat.line_number = line_mapping[threat.line_number] logger.debug( f"Remapped threat line number: {old_line} -> {threat.line_number}" ) remapped_threats += 1 scan_results[file_path] = [scan_result] threat_count = len(scan_result.all_threats) total_threats_found += threat_count files_processed += 1 logger.info( f"Scanned {file_path_abs}: {threat_count} threats found, " f"{remapped_threats} line numbers remapped" ) except Exception as e: logger.error(f"Failed to scan {file_path_abs}: {e}") logger.debug(f"Scan error details for {file_path_abs}", exc_info=True) files_failed += 1 continue # Record diff scan completion metrics if self.metrics_collector: diff_scan_duration = time.time() - diff_scan_start_time self.metrics_collector.record_histogram( "diff_scan_duration_seconds", diff_scan_duration, labels={"outcome": "completed"}, ) self.metrics_collector.record_metric( "diff_scan_files_processed", files_processed ) self.metrics_collector.record_metric( "diff_scan_files_skipped", files_skipped ) self.metrics_collector.record_metric("diff_scan_files_failed", files_failed) self.metrics_collector.record_metric( "diff_scan_threats_found_total", total_threats_found ) # Record scan performance metrics if diff_scan_duration > 0: files_per_second = len(diff_changes) / diff_scan_duration self.metrics_collector.record_histogram( "diff_scan_files_per_second", files_per_second ) logger.info( f"=== Diff scan complete - Processed: {files_processed}, " f"Skipped: {files_skipped}, Failed: {files_failed}, " f"Total threats: {total_threats_found} ===" ) return scan_results def scan_diff_sync( self, source_branch: str, target_branch: str, working_dir: Path | None = None, use_llm: bool = False, use_semgrep: bool = True, use_validation: bool = True, use_rules: bool = True, severity_threshold: Severity | None = None, ) -> dict[str, list[EnhancedScanResult]]: """Synchronous wrapper for scan_diff for testing and CLI usage.""" logger.debug( f"Synchronous diff scan wrapper called: {source_branch} -> {target_branch}" ) import asyncio return asyncio.run( self.scan_diff( source_branch=source_branch, target_branch=target_branch, working_dir=working_dir, use_llm=use_llm, use_semgrep=use_semgrep, use_validation=use_validation, use_rules=use_rules, severity_threshold=severity_threshold, ) ) async def get_diff_summary( self, source_branch: str, target_branch: str, working_dir: Path | None = None ) -> dict[str, any]: """Get a summary of the diff between two branches. Args: source_branch: Source branch name target_branch: Target branch name working_dir: Working directory for git operations (uses self.working_dir if not specified) Returns: Dictionary with diff summary information """ logger.info(f"Getting diff summary: {source_branch} -> {target_branch}") try: diff_changes = await self.get_diff_changes( source_branch, target_branch, working_dir ) total_files = len(diff_changes) total_chunks = sum(len(chunks) for chunks in diff_changes.values()) lines_added = 0 lines_removed = 0 supported_files = 0 scannable_files = [] logger.debug(f"Processing {total_files} changed files for summary") for file_path, chunks in diff_changes.items(): # All files are now considered supported supported_files += 1 scannable_files.append(file_path) for chunk in chunks: lines_added += len(chunk.added_lines) lines_removed += len(chunk.removed_lines) summary = { "source_branch": source_branch, "target_branch": target_branch, "total_files_changed": total_files, "supported_files": supported_files, "total_chunks": total_chunks, "lines_added": lines_added, "lines_removed": lines_removed, "scannable_files": scannable_files, } logger.info( f"Diff summary - Files: {total_files}, Supported: {supported_files}, " f"Lines: +{lines_added} -{lines_removed}" ) return summary except GitDiffError as e: logger.error(f"Failed to get diff summary: {e}") return { "source_branch": source_branch, "target_branch": target_branch, "error": "Failed to get diff summary", }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server