Skip to main content
Glama
analyzer.py29.3 kB
"""Git log analyzer for generating merge request summaries.""" import re import time import logging import subprocess import os from typing import Dict, List, Optional, Set, Iterator, Tuple from dataclasses import dataclass from .models import CommitInfo, MergeRequestSummary # Create logger for this module logger = logging.getLogger(__name__) class TimeoutError(Exception): """Custom timeout exception.""" pass # Constants for better maintainability class GitPatterns: """Regex patterns for git output parsing.""" INSERTION_DELETION_PATTERNS = [ r"(\d+)\s+insertions?\(\+\),\s*(\d+)\s+deletions?\(-\)", # "13 insertions(+), 25 deletions(-)" r"(\d+)\s+insertions?,\s*(\d+)\s+deletions?", # "13 insertions, 25 deletions" r"(\d+)\s+insertions?\(\+\)", # "13 insertions(+)" (insertions only) r"(\d+)\s+deletions?\(-\)", # "25 deletions(-)" (deletions only) ] COMMIT_HASH_PATTERN = r"^[0-9a-f]{40}$" class CategoryPatterns: """Patterns for commit categorization.""" PATTERNS = { "refactoring": { "refactor", "refactoring", "cleanup", "clean up", "restructure", }, "bug_fix": {"fix", "bug", "issue", "error", "resolve", "patch", "hotfix"}, "new_feature": { "add", "new", "feature", "implement", "create", "introduce", "feat", }, "cleanup": {"remove", "delete", "drop", "deprecate", "clean"}, "update": {"update", "upgrade", "bump", "dependenc", "version"}, "test": {"test", "spec", "specs", "testing", "unit", "integration"}, "documentation": {"docs", "documentation", "readme", "comment", "doc"}, } class FilePatterns: """Patterns for file categorization.""" PATTERNS = { "Services": {"patterns": ["service", "api", "client"], "extensions": set()}, "Models": { "patterns": ["model", "entity", "dto", "schema"], "extensions": set(), }, "Controllers": { "patterns": ["controller", "handler", "route"], "extensions": set(), }, "Tests": { "patterns": ["test", "spec", "specs", "testing"], "extensions": set(), }, "Configuration": { "patterns": [], "extensions": { ".json", ".config", ".yml", ".yaml", ".xml", ".toml", ".ini", }, }, "Documentation": { "patterns": [], "extensions": {".md", ".txt", ".rst", ".adoc"}, }, "Frontend": { "patterns": [], "extensions": { ".js", ".jsx", ".ts", ".tsx", ".vue", ".svelte", ".html", ".css", ".scss", ".sass", }, }, "Backend": { "patterns": [], "extensions": { ".py", ".java", ".cs", ".go", ".rs", ".php", ".rb", ".js", ".ts", }, }, "Other": {"patterns": [], "extensions": set()}, } @dataclass class GitLogSection: """Represents a section of git log output.""" hash: str author: str date: str message: str stats_lines: List[str] class GitLogAnalyzer: """Analyzes git logs and generates structured summaries.""" def __init__(self, repo_path: str = ".") -> None: """Initialize the analyzer with a repository path.""" self.repo_path = repo_path # Pre-compile regex patterns for better performance self._compiled_patterns = [ re.compile(pattern) for pattern in GitPatterns.INSERTION_DELETION_PATTERNS ] self._commit_hash_pattern = re.compile(GitPatterns.COMMIT_HASH_PATTERN) def _is_testing(self) -> bool: """Check if we're running in a test environment.""" import sys import os return ( any("pytest" in arg for arg in sys.argv) or "PYTEST_CURRENT_TEST" in os.environ ) def _build_git_command(self, subcommand: List[str]) -> List[str]: """Build a git command with the repository path.""" # If the command already starts with "git", extract the subcommand if subcommand[0] == "git": # Skip "git" and "--no-pager" if present start_index = ( 2 if len(subcommand) > 1 and subcommand[1] == "--no-pager" else 1 ) actual_subcommand = subcommand[start_index:] else: actual_subcommand = subcommand return ["git", "--no-pager", "-C", self.repo_path] + actual_subcommand def _execute_git_command( self, cmd: List[str], timeout: int = 30 ) -> subprocess.CompletedProcess: """Execute a git command synchronously.""" cmd_with_path = self._build_git_command(cmd) logger.debug(f"Executing git command: {' '.join(cmd_with_path)}") try: logger.debug(f"Starting git command execution with timeout: {timeout}s") result = subprocess.run( cmd_with_path, capture_output=True, text=True, check=False, timeout=timeout, # Prevent hanging on user input prompts input=None, # Set environment to prevent interactive prompts env={**os.environ, "GIT_TERMINAL_PROMPT": "0", "GIT_ASKPASS": ""}, ) logger.debug(f"Git command completed with return code: {result.returncode}") return result except subprocess.TimeoutExpired as e: logger.error(f"Git command timed out: {' '.join(cmd_with_path)}") raise e except Exception as e: logger.error(f"Git command failed: {' '.join(cmd_with_path)} - {e}") raise e def _validate_repo_path(self) -> None: """Validate that the repository path exists and is a valid git repository.""" import os logger.debug(f"Validating repo path: {self.repo_path}") if not os.path.exists(self.repo_path): raise ValueError(f"Repository path does not exist: {self.repo_path}") # Check if it's a directory if not os.path.isdir(self.repo_path): raise ValueError(f"Repository path is not a directory: {self.repo_path}") # Check if .git directory exists git_dir = os.path.join(self.repo_path, ".git") if not os.path.exists(git_dir): raise ValueError(f"No .git directory found in: {self.repo_path}") logger.debug(f"Found .git directory at: {git_dir}") # First, test if git is available try: test_result = self._execute_git_command(["git", "--version"]) if test_result.returncode != 0: raise ValueError("Git command failed to execute") logger.debug(f"Git version: {test_result.stdout.strip()}") except Exception as e: logger.error(f"Git availability test failed: {e}") raise ValueError(f"Git is not available or not working: {e}") try: # Validate git repository result1 = self._execute_git_command( ["git", "--no-pager", "rev-parse", "--git-dir"] ) if result1.returncode != 0: logger.error( f"Git rev-parse --git-dir failed with return code {result1.returncode}" ) logger.error(f"stderr: {result1.stderr}") raise ValueError(f"Not a valid git repository: {self.repo_path}") logger.debug(f"Git rev-parse --git-dir output: {result1.stdout.strip()}") result2 = self._execute_git_command( ["git", "--no-pager", "rev-parse", "--show-toplevel"] ) if result2.returncode != 0: logger.error( f"Git rev-parse --show-toplevel failed with return code {result2.returncode}" ) logger.error(f"stderr: {result2.stderr}") raise ValueError(f"Invalid git repository state: {self.repo_path}") logger.debug( f"Git rev-parse --show-toplevel output: {result2.stdout.strip()}" ) except subprocess.TimeoutExpired: raise ValueError(f"Git repository validation timed out: {self.repo_path}") except FileNotFoundError: raise ValueError( "Git command not found. Please ensure git is installed and in your PATH." ) except Exception as e: raise ValueError(f"Error validating git repository: {e}") def _is_commit_hash(self, line: str) -> bool: """Check if a line is a valid git commit hash.""" return bool(self._commit_hash_pattern.match(line)) def _validate_branches(self, base_branch: str, current_branch: str) -> None: """Validate that the specified branches exist in the repository.""" # Skip validation for commit hashes if self._is_commit_hash(base_branch) or self._is_commit_hash(current_branch): logger.debug( f"Skipping branch validation for commit hashes: {base_branch}, {current_branch}" ) return try: cmd = ["git", "--no-pager", "branch", "-a", "--format=%(refname:short)"] result = self._execute_git_command(cmd) if result.returncode != 0: raise Exception( f"Failed to get branches: {result.stderr or 'No stderr output'}" ) stdout_output = result.stdout.strip() available_branches = ( set(stdout_output.split("\n")) if stdout_output else set() ) # Check if branches exist missing_branches = [] if base_branch not in available_branches: missing_branches.append(base_branch) if current_branch != "HEAD" and current_branch not in available_branches: missing_branches.append(current_branch) if missing_branches: raise ValueError( f"Branch(es) not found: {', '.join(missing_branches)}. " f"Available branches: {', '.join(sorted(list(available_branches)))}" ) except subprocess.TimeoutExpired: raise TimeoutError("Branch validation timed out") except FileNotFoundError: raise Exception( "Git command not found. Please ensure git is installed and in your PATH." ) except (ValueError, TimeoutError): raise except Exception as e: raise Exception(f"Error validating branches: {e}") def get_git_log( self, base_branch: str = "master", current_branch: str = "HEAD" ) -> List[CommitInfo]: """Retrieve git log between two branches synchronously.""" start_time = time.time() logger.debug(f"Starting git log retrieval: {base_branch}..{current_branch}") try: # Validate repository and branches if not self._is_testing(): self._validate_repo_path() self._validate_branches(base_branch, current_branch) # Execute git log command cmd = [ "git", "--no-pager", "log", f"{base_branch}..{current_branch}", "--stat", "--format=format:%H%n%an%n%ad%n%s%n", "--date=short", ] result = self._execute_git_command(cmd, timeout=30) if result.returncode != 0: if result.returncode == 128: logger.debug("No commits found between branches (return code 128)") return [] else: raise Exception( f"Git command failed with return code {result.returncode}: {result.stderr}" ) output = result.stdout if not output: logger.debug("No git output found, returning empty list") return [] # Parse the output parse_start = time.time() commits = self._parse_git_output_sync_modern(output) parse_time = time.time() - parse_start total_time = time.time() - start_time logger.debug( f"Git log retrieval completed in {total_time:.2f}s, found {len(commits)} commits" ) return commits except subprocess.TimeoutExpired: raise TimeoutError("Git command timed out after 30 seconds") except Exception as e: raise Exception(f"Unexpected error getting git log: {e}") def _parse_git_output_sync_modern(self, output: str) -> List[CommitInfo]: """Modern synchronous parsing of git output using iterators and generators.""" lines = output.split("\n") logger.debug(f"Parsing {len(lines)} lines of git output") commits = [] line_iter = iter(lines) try: while True: section = self._extract_commit_section(line_iter) if not section: break commit_info = self._parse_commit_section(section) if commit_info: commits.append(commit_info) except StopIteration: pass logger.debug(f"Parsed {len(commits)} commits") return commits def _extract_commit_section( self, line_iter: Iterator[str] ) -> Optional[GitLogSection]: """Extract a complete commit section from the iterator.""" try: # Find the next commit hash while True: line = next(line_iter).strip() if self._is_commit_hash(line): break commit_hash = line # Extract author, date, message try: author = next(line_iter).strip() date = next(line_iter).strip() message = next(line_iter).strip() except StopIteration: logger.debug( f"Skipping commit {commit_hash[:8]} - insufficient header lines" ) return None # Validate required fields if not all([author, date, message]): logger.debug( f"Skipping commit {commit_hash[:8]} - missing required fields" ) return None # Collect stats lines stats_lines = [] try: # Skip empty lines while True: line = next(line_iter).strip() if line: break # Collect stats lines until we hit another commit while line and not self._is_commit_hash(line): stats_lines.append(line) try: line = next(line_iter).strip() except StopIteration: break except StopIteration: pass logger.debug( f"Collected stats lines for commit {commit_hash[:8]}: {stats_lines}" ) return GitLogSection( hash=commit_hash, author=author, date=date, message=message, stats_lines=stats_lines, ) except StopIteration: return None def _parse_commit_section(self, section: GitLogSection) -> Optional[CommitInfo]: """Parse a commit section into a CommitInfo object.""" files_changed = [] insertions = 0 deletions = 0 logger.debug( f"Parsing commit section with {len(section.stats_lines)} stats lines" ) for line in section.stats_lines: logger.debug(f"Processing stats line: {line}") if "|" in line and any(c.isdigit() for c in line): file_name, _ = self._parse_file_stats_line(line) if file_name: files_changed.append(file_name) logger.debug(f"Added file: {file_name}") elif "file changed" in line or "files changed" in line: # This is the summary line with total stats logger.debug(f"Parsing summary line: {line}") ins, dels = self._extract_insertions_deletions(line) logger.debug(f"Extracted insertions: {ins}, deletions: {dels}") insertions = ins deletions = dels logger.debug( f"Final commit info: files={files_changed}, insertions={insertions}, deletions={deletions}" ) return CommitInfo( hash=section.hash, author=section.author, date=section.date, message=section.message, files_changed=files_changed, insertions=insertions, deletions=deletions, ) def _parse_file_stats_line(self, line: str) -> Tuple[Optional[str], str]: """Parse a file stats line and return (filename, stats_part).""" parts = line.split("|", 1) if len(parts) < 2: return None, "" file_name = parts[0].strip() stats_part = parts[1].strip() return file_name, stats_part def _extract_insertions_deletions(self, stats_part: str) -> Tuple[int, int]: """Extract insertions and deletions from stats part.""" insertions = 0 deletions = 0 logger.debug(f"Extracting from stats part: {stats_part}") for pattern in self._compiled_patterns: stats_match = pattern.search(stats_part) if stats_match: logger.debug(f"Pattern matched: {pattern.pattern}") # If pattern has both groups, use them if len(stats_match.groups()) >= 2: insertions = int(stats_match.group(1)) deletions = int(stats_match.group(2)) logger.debug(f"Both groups: {insertions}, {deletions}") else: # Single group pattern if "insertions" in pattern.pattern: insertions = int(stats_match.group(1)) logger.debug(f"Insertions only: {insertions}") if "deletions" in pattern.pattern: deletions = int(stats_match.group(1)) logger.debug(f"Deletions only: {deletions}") logger.debug(f"Final result: {insertions}, {deletions}") return insertions, deletions def categorize_commit(self, commit: CommitInfo) -> List[str]: """Categorize a commit based on its message and changes.""" message_lower = commit.message.lower() message_words = set(message_lower.split()) # Use set intersection for efficient matching categories = [] for category, keywords in CategoryPatterns.PATTERNS.items(): if keywords & message_words: categories.append(category) # If no categories found, add a default category based on change size if not categories: total_changes = commit.insertions + commit.deletions categories.append("significant_change" if total_changes > 50 else "other") return categories def generate_summary(self, commits: List[CommitInfo]) -> MergeRequestSummary: """Generate a comprehensive merge request summary.""" if not commits: return MergeRequestSummary( title="No changes detected", description="No commits found between the specified branches.", total_commits=0, total_files_changed=0, total_insertions=0, total_deletions=0, key_changes=[], breaking_changes=[], new_features=[], bug_fixes=[], refactoring=[], files_affected=[], estimated_review_time="0 minutes", ) return self._generate_summary_sync(commits) def _generate_summary_sync(self, commits: List[CommitInfo]) -> MergeRequestSummary: """Synchronous summary generation.""" # Calculate totals total_commits = len(commits) total_insertions = sum(c.insertions for c in commits) total_deletions = sum(c.deletions for c in commits) # Collect all unique files all_files: Set[str] = set() for commit in commits: all_files.update(commit.files_changed) total_files_changed = len(all_files) # Categorize commits categorized_commits = self._categorize_commits(commits) # Generate title and description title = self._generate_title(commits, categorized_commits) description = self._generate_description( commits, total_commits, total_files_changed, total_insertions, total_deletions, categorized_commits, all_files, ) # Estimate review time estimated_time = self._estimate_review_time( total_commits, total_files_changed, total_insertions + total_deletions ) return MergeRequestSummary( title=title, description=description, total_commits=total_commits, total_files_changed=total_files_changed, total_insertions=total_insertions, total_deletions=total_deletions, key_changes=categorized_commits["key_changes"], breaking_changes=categorized_commits["breaking_changes"], new_features=categorized_commits["new_features"], bug_fixes=categorized_commits["bug_fixes"], refactoring=categorized_commits["refactoring"], files_affected=sorted(list(all_files)), estimated_review_time=estimated_time, ) def _categorize_commits(self, commits: List[CommitInfo]) -> Dict[str, List[str]]: """Categorize commits into different types.""" categories = { "new_features": [], "bug_fixes": [], "refactoring": [], "breaking_changes": [], "key_changes": [], } for commit in commits: commit_categories = self.categorize_commit(commit) commit_entry = f"- {commit.message} ({commit.hash[:8]})" if "new_feature" in commit_categories: categories["new_features"].append(commit_entry) elif "bug_fix" in commit_categories: categories["bug_fixes"].append(commit_entry) elif "refactoring" in commit_categories: categories["refactoring"].append(commit_entry) # Check for breaking changes if any( word in commit.message.lower() for word in ["breaking", "deprecate", "remove"] ): categories["breaking_changes"].append(commit_entry) # Key changes (commits with significant impact) if commit.insertions + commit.deletions > 100: categories["key_changes"].append( f"{commit_entry} - {commit.insertions + commit.deletions} lines changed" ) return categories def _generate_title( self, commits: List[CommitInfo], categorized_commits: Dict[str, List[str]] ) -> str: """Generate a concise title for the merge request.""" if len(commits) == 1: return f"feat: {commits[0].message}" # Determine primary type if categorized_commits["new_features"]: return f"feat: {len(categorized_commits['new_features'])} new features and improvements" elif categorized_commits["refactoring"]: return "refactor: Code quality improvements and optimizations" elif categorized_commits["bug_fixes"]: return f"fix: {len(categorized_commits['bug_fixes'])} bug fixes and improvements" else: return f"chore: {len(commits)} commits with various improvements" def _generate_description( self, commits: List[CommitInfo], total_commits: int, total_files_changed: int, total_insertions: int, total_deletions: int, categorized_commits: Dict[str, List[str]], all_files: Set[str], ) -> str: """Generate a comprehensive description for the merge request.""" description = f"""## Overview This merge request contains {total_commits} commits with {total_files_changed} files changed ({total_insertions} insertions, {total_deletions} deletions). ## Key Changes """ if categorized_commits["key_changes"]: description += "\n".join(categorized_commits["key_changes"][:5]) + "\n\n" for category, items in categorized_commits.items(): if items and category != "key_changes": category_name = category.replace("_", " ").title() description += f"### {category_name} ({len(items)})\n" description += "\n".join(items) + "\n\n" # Add file summary description += f"### Files Affected ({len(all_files)})\n" file_categories = self._categorize_files(all_files) for category, files in file_categories.items(): if files: description += f"\n**{category}:**\n" for file in files[:10]: # Limit to 10 files per category description += f"- `{file}`\n" if len(files) > 10: description += f"- ... and {len(files) - 10} more\n" description += f"\n### Summary\n" description += f"- **Total Commits:** {total_commits}\n" description += f"- **Files Changed:** {total_files_changed}\n" description += f"- **Lines Added:** {total_insertions}\n" description += f"- **Lines Removed:** {total_deletions}\n" description += f"- **Estimated Review Time:** {self._estimate_review_time(total_commits, total_files_changed, total_insertions + total_deletions)}\n" return description def _categorize_files(self, files: Set[str]) -> Dict[str, List[str]]: """Categorize files by type.""" categories = {category: [] for category in FilePatterns.PATTERNS.keys()} for file in files: category = self._categorize_single_file(file) categories[category].append(file) return categories def _categorize_single_file(self, file: str) -> str: """Categorize a single file efficiently.""" file_lower = file.lower() # Check for special cases first if file == "utils.py": return "Other" # Check pattern-based categories first for category, config in FilePatterns.PATTERNS.items(): if any(pattern in file_lower for pattern in config["patterns"]): return category # Check extensions file_ext = self._get_file_extension(file_lower) for category, config in FilePatterns.PATTERNS.items(): if file_ext in config["extensions"]: # Special case: .js and .ts can be both frontend and backend if file_ext in {".js", ".ts"}: if any( pattern in file_lower for pattern in ["component", "page", "view", "ui"] ): return "Frontend" else: return "Backend" return category return "Other" def _get_file_extension(self, file_lower: str) -> str: """Get file extension efficiently.""" last_dot = file_lower.rfind(".") return file_lower[last_dot:] if last_dot != -1 else "" def _estimate_review_time(self, commits: int, files: int, lines: int) -> str: """Estimate review time based on changes.""" # Rough estimation: 2 minutes per commit + 1 minute per 50 lines + 30 seconds per file total_minutes = (commits * 2) + (lines // 50) + (files // 2) if total_minutes < 1: return "Less than a minute" if total_minutes < 60: return f"{total_minutes} minutes" hours = total_minutes // 60 minutes = total_minutes % 60 if minutes == 0: return f"{hours}h" return f"{hours}h {minutes}m"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sonicjoy/mcp_merge_request_summarizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server