Skip to main content
Glama
diff.py12.8 kB
# cmcp/utils/diff.py # container-mcp © 2025 by Martin Bukowski is licensed under Apache 2.0 """Core diff utilities for text processing.""" import re import difflib from typing import List, Dict, Any, Optional, Tuple, NamedTuple from dataclasses import dataclass from enum import Enum from cmcp.utils.logging import get_logger logger = get_logger(__name__) class DiffFormat(Enum): """Supported diff formats.""" UNIFIED = "unified" CONTEXT = "context" NDIFF = "ndiff" @dataclass class DiffStats: """Statistics about a diff.""" lines_added: int = 0 lines_removed: int = 0 lines_modified: int = 0 hunks: int = 0 @property def net_change(self) -> int: """Calculate net line change.""" return self.lines_added - self.lines_removed class Hunk(NamedTuple): """Represents a single diff hunk.""" old_start: int old_count: int new_start: int new_count: int lines: List[str] def generate_diff( original_content: str, new_content: str, diff_format: DiffFormat = DiffFormat.UNIFIED, context_lines: int = 3, from_file: str = "a/file", to_file: str = "b/file" ) -> Tuple[str, DiffStats]: """Generate a diff between two text contents. Args: original_content: Original text content new_content: New text content diff_format: Format of the diff to generate context_lines: Number of context lines for unified/context diffs from_file: Label for the original file to_file: Label for the new file Returns: Tuple of (diff_content, diff_stats) """ # Split content into lines original_lines = original_content.splitlines(keepends=True) new_lines = new_content.splitlines(keepends=True) # Generate diff based on format if diff_format == DiffFormat.UNIFIED: diff_lines = list(difflib.unified_diff( original_lines, new_lines, fromfile=from_file, tofile=to_file, n=context_lines )) elif diff_format == DiffFormat.CONTEXT: diff_lines = list(difflib.context_diff( original_lines, new_lines, fromfile=from_file, tofile=to_file, n=context_lines )) elif diff_format == DiffFormat.NDIFF: diff_lines = list(difflib.ndiff(original_lines, new_lines)) else: raise ValueError(f"Unsupported diff format: {diff_format}") diff_content = "".join(diff_lines) stats = _calculate_stats(diff_lines, diff_format) return diff_content, stats def apply_unified_diff(original_content: str, diff_content: str) -> Tuple[str, int]: """Apply a unified diff to content. Args: original_content: Original content to modify diff_content: Unified diff to apply Returns: Tuple of (new_content, lines_applied) Raises: ValueError: If diff format is invalid or cannot be applied """ # Parse the diff hunks = _parse_unified_diff(diff_content) if not hunks: logger.warning("No hunks found in diff") return original_content, 0 # Apply each hunk lines = original_content.splitlines() lines_applied = 0 # Sort hunks by line number (reverse order for proper application) sorted_hunks = sorted(hunks, key=lambda h: h.old_start, reverse=True) for hunk in sorted_hunks: try: applied = _apply_hunk(lines, hunk) lines_applied += applied except Exception as e: logger.error(f"Failed to apply hunk at line {hunk.old_start}: {e}") raise ValueError(f"Failed to apply hunk: {e}") return "\n".join(lines), lines_applied def split_patch_into_files(patch_content: str) -> Dict[str, str]: """Split a patch file into individual file diffs. Args: patch_content: Content of the patch file Returns: Dictionary mapping file paths to their diff content """ file_diffs = {} lines = patch_content.splitlines() current_file = None current_diff_lines = [] for line in lines: if line.startswith('--- '): # Start of a new file diff if current_file and current_diff_lines: file_diffs[current_file] = "\n".join(current_diff_lines) # Extract filename (remove a/ prefix if present) filename = line[4:].strip() if filename.startswith('a/'): filename = filename[2:] current_file = filename current_diff_lines = [line] elif current_file: current_diff_lines.append(line) # Add the last file if current_file and current_diff_lines: file_diffs[current_file] = "\n".join(current_diff_lines) return file_diffs def analyze_diff(diff_content: str) -> Dict[str, Any]: """Analyze a diff for insights and patterns. Args: diff_content: The diff content to analyze Returns: Dictionary with analysis results """ lines = diff_content.splitlines() stats = DiffStats() patterns = { "imports_changed": False, "comments_added": 0, "comments_removed": 0, "potential_issues": [] } for line in lines: if line.startswith('@@'): stats.hunks += 1 elif line.startswith('+') and not line.startswith('+++'): stats.lines_added += 1 _analyze_line_patterns(line, patterns, True) elif line.startswith('-') and not line.startswith('---'): stats.lines_removed += 1 _analyze_line_patterns(line, patterns, False) stats.lines_modified = min(stats.lines_added, stats.lines_removed) # Calculate complexity complexity_score = stats.lines_added * 0.5 + stats.lines_removed * 0.3 + stats.hunks * 2 if complexity_score < 10: complexity_level = "low" elif complexity_score < 30: complexity_level = "medium" else: complexity_level = "high" return { "stats": { "lines_added": stats.lines_added, "lines_removed": stats.lines_removed, "lines_modified": stats.lines_modified, "net_change": stats.net_change, "hunks": stats.hunks }, "patterns": patterns, "complexity": { "score": round(complexity_score, 1), "level": complexity_level } } # Private helper functions def _calculate_stats(diff_lines: List[str], diff_format: DiffFormat) -> DiffStats: """Calculate statistics from diff lines.""" stats = DiffStats() if diff_format == DiffFormat.UNIFIED: for line in diff_lines: if line.startswith('@@'): stats.hunks += 1 elif line.startswith('+') and not line.startswith('+++'): stats.lines_added += 1 elif line.startswith('-') and not line.startswith('---'): stats.lines_removed += 1 elif diff_format == DiffFormat.CONTEXT: in_old_section = False in_new_section = False for line in diff_lines: if line.startswith('***************'): stats.hunks += 1 elif line.startswith('*** '): in_old_section = True in_new_section = False elif line.startswith('--- '): in_old_section = False in_new_section = True elif line.startswith('! '): # Modified line if in_old_section: stats.lines_removed += 1 elif in_new_section: stats.lines_added += 1 elif line.startswith('+ '): # Added line (in new section) if in_new_section: stats.lines_added += 1 elif line.startswith('- '): # Removed line (in old section) if in_old_section: stats.lines_removed += 1 elif diff_format == DiffFormat.NDIFF: for line in diff_lines: if line.startswith('+ '): stats.lines_added += 1 elif line.startswith('- '): stats.lines_removed += 1 stats.lines_modified = min(stats.lines_added, stats.lines_removed) return stats def _parse_unified_diff(diff_content: str) -> List[Hunk]: """Parse a unified diff into hunks.""" lines = diff_content.splitlines() hunks = [] i = 0 while i < len(lines): line = lines[i] if line.startswith('@@'): # Parse hunk header: @@ -start,count +start,count @@ match = re.match(r'@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@', line) if not match: logger.warning(f"Invalid hunk header: {line}") i += 1 continue old_start = int(match.group(1)) old_count = int(match.group(2)) if match.group(2) else 1 new_start = int(match.group(3)) new_count = int(match.group(4)) if match.group(4) else 1 # Collect hunk lines hunk_lines = [] j = i + 1 while j < len(lines): if lines[j].startswith('@@'): break if lines[j].startswith('\\'): # "\ No newline at end of file" j += 1 continue hunk_lines.append(lines[j]) j += 1 hunks.append(Hunk( old_start=old_start, old_count=old_count, new_start=new_start, new_count=new_count, lines=hunk_lines )) i = j else: i += 1 return hunks def _apply_hunk(lines: List[str], hunk: Hunk) -> int: """Apply a single hunk to the lines.""" lines_applied = 0 start_line = hunk.old_start - 1 # Convert to 0-based indexing current_line = start_line # Track changes to apply deletions = [] additions = [] # Process hunk lines for hunk_line in hunk.lines: if hunk_line.startswith(' '): # Context line - verify it matches expected = hunk_line[1:] if current_line >= len(lines): raise ValueError(f"Context line {current_line + 1} beyond file end") if lines[current_line] != expected: raise ValueError(f"Context mismatch at line {current_line + 1}") current_line += 1 elif hunk_line.startswith('-'): # Deletion expected = hunk_line[1:] if current_line >= len(lines): raise ValueError(f"Deletion line {current_line + 1} beyond file end") if lines[current_line] != expected: raise ValueError(f"Deletion mismatch at line {current_line + 1}") deletions.append(current_line) current_line += 1 elif hunk_line.startswith('+'): # Addition content = hunk_line[1:] additions.append((current_line, content)) # Apply deletions in reverse order to maintain line numbers for line_num in reversed(deletions): if line_num < len(lines): del lines[line_num] lines_applied += 1 # Apply additions (adjust for deletions) offset = len(deletions) for line_num, content in additions: adjusted_line = line_num - offset if adjusted_line < 0: adjusted_line = 0 elif adjusted_line > len(lines): adjusted_line = len(lines) lines.insert(adjusted_line, content) lines_applied += 1 return lines_applied def _analyze_line_patterns(line: str, patterns: Dict[str, Any], is_addition: bool): """Analyze patterns in a diff line.""" content = line[1:].strip() # Import changes if any(keyword in content for keyword in ['import ', 'from ', '#include', 'require']): patterns["imports_changed"] = True # Comment changes if any(content.strip().startswith(marker) for marker in ['#', '//', '/*', '*', '<!--']): if is_addition: patterns["comments_added"] += 1 else: patterns["comments_removed"] += 1 # Potential issues issue_keywords = ['todo', 'fixme', 'hack', 'xxx', 'bug', 'temporary'] if any(keyword in content.lower() for keyword in issue_keywords): patterns["potential_issues"].append(line.strip())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/54rt1n/container-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server