Skip to main content
Glama

adv_diff_scan

Scan git diff changes for security vulnerabilities between branches, analyze exploit examples, and filter false positives using LLM. Results saved in JSON or Markdown format.

Instructions

Scan security vulnerabilities in git diff changes between branches. Results are saved in the repository root.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
include_exploitsNoWhether to include exploit examples
output_formatNoOutput format for results (json or markdown)json
pathNoPath to git repository (must contain .git directory).
severity_thresholdNoMinimum severity threshold (low, medium, high, critical)medium
source_branchYesSource branch name (e.g., 'feature-branch')
target_branchYesTarget branch name (e.g., 'main')
use_llmNoWhether to include LLM analysis prompts (for use with your client's LLM)
use_semgrepNoWhether to include Semgrep analysis
use_validationNoWhether to use LLM validation to filter false positives

Implementation Reference

  • Core handler function that performs git diff scanning: fetches diff between branches, parses changes, scans only added code for vulnerabilities using LLM/Semgrep, returns results per file.
    async def scan_diff( self, source_branch: str, target_branch: str, working_dir: Path | None = None, use_llm: bool = False, use_semgrep: bool = True, use_validation: bool = True, use_rules: bool = True, severity_threshold: Severity | None = None, ) -> dict[str, list[EnhancedScanResult]]: """Scan security vulnerabilities in git diff changes. Args: source_branch: Source branch name target_branch: Target branch name working_dir: Working directory for git operations (uses self.working_dir if not specified) use_llm: Whether to use LLM analysis use_semgrep: Whether to use Semgrep analysis use_validation: Whether to use LLM validation to filter false positives use_rules: Whether to use rules-based scanner severity_threshold: Minimum severity threshold for filtering Returns: Dictionary mapping file paths to lists of scan results Raises: GitDiffError: If git operations fail """ diff_scan_start_time = time.time() logger.info(f"=== Starting diff scan: {source_branch} -> {target_branch} ===") logger.debug( f"Scan parameters - LLM: {use_llm}, Semgrep: {use_semgrep}, " f"Validation: {use_validation}, Rules: {use_rules}, Severity: {severity_threshold}" ) # Record diff scan start if self.metrics_collector: self.metrics_collector.record_metric( "diff_scan_operations_total", 1, labels={ "source_branch": source_branch, "target_branch": target_branch, "use_llm": str(use_llm), "use_semgrep": str(use_semgrep), "use_validation": str(use_validation), }, ) # Get diff changes logger.debug("Retrieving diff changes...") diff_changes = await self.get_diff_changes( source_branch, target_branch, working_dir ) if not diff_changes: logger.info("No diff changes found - returning empty results") # Record no changes scan if self.metrics_collector: diff_scan_duration = time.time() - diff_scan_start_time self.metrics_collector.record_histogram( "diff_scan_duration_seconds", diff_scan_duration, labels={"outcome": "no_changes"}, ) self.metrics_collector.record_metric("diff_scan_files_processed", 0) return {} logger.info(f"Processing {len(diff_changes)} changed files") scan_results: dict[str, list[EnhancedScanResult]] = {} files_processed = 0 files_skipped = 0 files_failed = 0 total_threats_found = 0 for file_path, chunks in diff_changes.items(): file_path_abs = str(Path(file_path).resolve()) logger.debug(f"Processing file: {file_path_abs}") # Get language (now always generic) language = self._detect_language_from_path(file_path) logger.info(f"Scanning {file_path_abs} as {language}") # Combine only the newly added lines from all chunks all_added_code = [] line_mapping = {} # Map from combined code lines to original diff lines total_added_lines = 0 combined_line_num = 1 for chunk_idx, chunk in enumerate(chunks): logger.debug( f"Processing chunk {chunk_idx + 1}/{len(chunks)} for {file_path}" ) # Only scan newly added lines, not context added_code = chunk.get_added_lines_only() if added_code.strip(): all_added_code.append(added_code) chunk_added_lines = len(chunk.added_lines) total_added_lines += chunk_added_lines logger.debug( f"Chunk {chunk_idx + 1}: {chunk_added_lines} added lines" ) # Map line numbers for accurate reporting (only for added lines) for i, (original_line_num, line_content) in enumerate( chunk.added_lines ): if line_content.strip(): # Skip empty lines line_mapping[combined_line_num] = original_line_num combined_line_num += 1 if not all_added_code: logger.debug(f"No added code to scan in {file_path_abs}") files_skipped += 1 continue logger.info(f"Scanning {total_added_lines} added lines in {file_path_abs}") # Scan the combined added code (only new lines) full_added_code = "\n".join(all_added_code) logger.debug(f"Combined added code: {len(full_added_code)} characters") try: logger.debug(f"Calling scan_engine.scan_code for {file_path_abs}...") scan_result = await self.scan_engine.scan_code( source_code=full_added_code, file_path=file_path, use_llm=use_llm, use_semgrep=use_semgrep, use_validation=use_validation, severity_threshold=severity_threshold, ) # Update line numbers to match original file original_threat_count = len(scan_result.all_threats) remapped_threats = 0 for threat in scan_result.all_threats: if threat.line_number in line_mapping: old_line = threat.line_number threat.line_number = line_mapping[threat.line_number] logger.debug( f"Remapped threat line number: {old_line} -> {threat.line_number}" ) remapped_threats += 1 scan_results[file_path] = [scan_result] threat_count = len(scan_result.all_threats) total_threats_found += threat_count files_processed += 1 logger.info( f"Scanned {file_path_abs}: {threat_count} threats found, " f"{remapped_threats} line numbers remapped" ) except Exception as e: logger.error(f"Failed to scan {file_path_abs}: {e}") logger.debug(f"Scan error details for {file_path_abs}", exc_info=True) files_failed += 1 continue # Record diff scan completion metrics if self.metrics_collector: diff_scan_duration = time.time() - diff_scan_start_time self.metrics_collector.record_histogram( "diff_scan_duration_seconds", diff_scan_duration, labels={"outcome": "completed"}, ) self.metrics_collector.record_metric( "diff_scan_files_processed", files_processed ) self.metrics_collector.record_metric( "diff_scan_files_skipped", files_skipped ) self.metrics_collector.record_metric("diff_scan_files_failed", files_failed) self.metrics_collector.record_metric( "diff_scan_threats_found_total", total_threats_found ) # Record scan performance metrics if diff_scan_duration > 0: files_per_second = len(diff_changes) / diff_scan_duration self.metrics_collector.record_histogram( "diff_scan_files_per_second", files_per_second ) logger.info( f"=== Diff scan complete - Processed: {files_processed}, " f"Skipped: {files_skipped}, Failed: {files_failed}, " f"Total threats: {total_threats_found} ===" ) return scan_results
  • Pydantic model defining the input schema/parameters for the adv_diff_scan MCP tool.
    class DiffScanRequest(BaseModel): """Request for diff scanning using Clean Architecture.""" source_branch: str target_branch: str path: str = "." use_semgrep: bool = True use_llm: bool = False use_validation: bool = False severity_threshold: str = "medium"
  • Utility class that parses raw git diff output into structured DiffChunk objects for analysis.
    class GitDiffParser: """Parser for git diff output.""" def __init__(self): logger.debug("Initializing GitDiffParser with regex patterns") self.diff_header_pattern = re.compile(r"^diff --git a/(.*) b/(.*)$") self.chunk_header_pattern = re.compile( r"^@@\s*-(\d+)(?:,(\d+))?\s*\+(\d+)(?:,(\d+))?\s*@@" ) self.file_header_pattern = re.compile(r"^(\+\+\+|---)\s+(.*)") logger.debug("GitDiffParser initialized successfully") def parse_diff(self, diff_output: str) -> dict[str, list[DiffChunk]]: """Parse git diff output into structured chunks. Args: diff_output: Raw git diff output Returns: Dictionary mapping file paths to lists of DiffChunk objects """ logger.info("=== Starting diff parsing ===") logger.debug(f"Parsing diff output: {len(diff_output)} characters") chunks_by_file: dict[str, list[DiffChunk]] = {} current_file = None current_chunk = None old_line_num = 0 new_line_num = 0 lines = diff_output.split("\n") logger.debug(f"Diff contains {len(lines)} lines to parse") for line_idx, line in enumerate(lines): # Check for file header diff_match = self.diff_header_pattern.match(line) if diff_match: current_file = diff_match.group(2) # Use the 'b/' path (destination) chunks_by_file[current_file] = [] logger.info(f"Found file header: {current_file}") continue # Check for chunk header chunk_match = self.chunk_header_pattern.match(line) if chunk_match and current_file: old_start = int(chunk_match.group(1)) old_count = int(chunk_match.group(2) or "1") new_start = int(chunk_match.group(3)) new_count = int(chunk_match.group(4) or "1") current_chunk = DiffChunk( current_file, old_start, old_count, new_start, new_count ) chunks_by_file[current_file].append(current_chunk) logger.debug( f"Created chunk for {current_file}: " f"old({old_start},{old_count}) new({new_start},{new_count})" ) old_line_num = old_start new_line_num = new_start continue # Check for content lines if current_chunk and line: if line.startswith("+") and not line.startswith("+++"): content = line[1:] # Remove the '+' prefix current_chunk.add_line("+", new_line_num, content) new_line_num += 1 elif line.startswith("-") and not line.startswith("---"): content = line[1:] # Remove the '-' prefix current_chunk.add_line("-", old_line_num, content) old_line_num += 1 elif line.startswith(" "): content = line[1:] # Remove the ' ' prefix current_chunk.add_line(" ", new_line_num, content) old_line_num += 1 new_line_num += 1 # Log parsing results total_chunks = sum(len(chunks) for chunks in chunks_by_file.values()) logger.info( f"=== Diff parsing complete - {len(chunks_by_file)} files, {total_chunks} chunks ===" ) for file_path, chunks in chunks_by_file.items(): total_added = sum(len(chunk.added_lines) for chunk in chunks) total_removed = sum(len(chunk.removed_lines) for chunk in chunks) logger.debug( f"File {file_path}: {len(chunks)} chunks, +{total_added} -{total_removed} lines" ) return chunks_by_file
  • Data structure representing a hunk/chunk from git diff, with methods to extract added/removed/context lines for scanning.
    class DiffChunk: """Represents a chunk of changes in a git diff.""" def __init__( self, file_path: str, old_start: int, old_count: int, new_start: int, new_count: int, ): self.file_path = file_path self.old_start = old_start self.old_count = old_count self.new_start = new_start self.new_count = new_count self.added_lines: list[tuple[int, str]] = [] # (line_number, content) self.removed_lines: list[tuple[int, str]] = [] # (line_number, content) self.context_lines: list[tuple[int, str]] = [] # (line_number, content) logger.debug( f"Created DiffChunk for {file_path}: " f"old ({old_start}+{old_count}), new ({new_start}+{new_count})" ) def add_line(self, line_type: str, line_number: int, content: str) -> None: """Add a line to the diff chunk.""" if line_type == "+": self.added_lines.append((line_number, content)) logger.debug(f"Added line (+) {line_number}: {content[:50]}...") elif line_type == "-": self.removed_lines.append((line_number, content)) logger.debug(f"Removed line (-) {line_number}: {content[:50]}...") else: self.context_lines.append((line_number, content)) def get_changed_code(self) -> str: """Get the changed code as a single string.""" logger.debug( f"Getting changed code for {self.file_path}: " f"{len(self.context_lines)} context + {len(self.added_lines)} added lines" ) lines = [] # Add context lines for better analysis for _, content in self.context_lines: lines.append(content) # Add added lines (new code to scan) for _, content in self.added_lines: lines.append(content) result = "\n".join(lines) logger.debug(f"Combined changed code: {len(result)} characters") return result def get_added_lines_with_minimal_context(self) -> str: """Get added lines with minimal context for better analysis. This includes only 1-2 context lines around changes, not all context, which is useful for LLM analysis while keeping the scope focused. """ file_path_abs = str(Path(self.file_path).resolve()) logger.debug(f"Getting added lines with minimal context for {file_path_abs}") lines = [] # Add minimal context (max 2 lines before changes) context_to_include = self.context_lines[:2] if self.context_lines else [] logger.debug(f"Including {len(context_to_include)} context lines") for _, content in context_to_include: lines.append(f"// CONTEXT: {content}") # Add all added lines (these are what we're actually analyzing) for _, content in self.added_lines: lines.append(content) result = "\n".join(lines) logger.debug(f"Added lines with context: {len(result)} characters") return result def get_added_lines_only(self) -> str: """Get only the added lines as a single string.""" logger.debug( f"Getting only added lines for {self.file_path}: {len(self.added_lines)} lines" ) result = "\n".join(content for _, content in self.added_lines) logger.debug(f"Added lines only: {len(result)} characters") return result
  • Explicit recognition of 'adv_diff_scan' as a known tool name during input validation, confirming its existence in the MCP toolset.
    if tool_name in ("adv_scan_folder", "adv_diff_scan"):

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server