Skip to main content
Glama

Smart Code Search MCP Server

git_analyzer.py22.8 kB
""" Git History and Evolution Analyzer for Smart Code Search Provides blame information, change frequency, author attribution, and bug detection """ import subprocess import json import re from pathlib import Path from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple, NamedTuple, Any from dataclasses import dataclass, asdict from collections import defaultdict import hashlib @dataclass class BlameInfo: """Information from git blame for a specific line""" file_path: str line_number: int commit_hash: str author_name: str author_email: str commit_date: datetime line_content: str commit_message: Optional[str] = None @dataclass class CommitInfo: """Information about a single commit""" commit_hash: str author_name: str author_email: str commit_date: datetime commit_message: str files_changed: List[str] additions: int deletions: int is_bug_fix: bool = False @dataclass class FileEvolution: """Evolution metrics for a file""" file_path: str total_commits: int unique_authors: int last_modified: datetime first_commit: datetime change_frequency: float # commits per month bug_fix_count: int stability_score: float # 0-1, higher is more stable @dataclass class AuthorExpertise: """Author expertise for a specific file or area""" author_email: str author_name: str file_path: str contribution_count: int last_contribution: datetime expertise_score: float # 0-1, based on contribution frequency and recency class GitAnalyzer: """Analyzes git history for code understanding and search enhancement""" # Bug fix detection patterns BUG_FIX_PATTERNS = [ r'\bfix\b', r'\bfixed\b', r'\bfixes\b', r'\bbug\b', r'\bbugs\b', r'\bpatch\b', r'\bpatched\b', r'\bresolve\b', r'\bresolved\b', r'\bcorrect\b', r'\bcorrected\b', r'\brepair\b', r'\brepaired\b', r'\bissue\s*#?\d+', r'\bhotfix\b', r'\bbugfix\b', r'\bdefect\b' ] def __init__(self, repo_path: Path, cache_dir: Optional[Path] = None): """ Initialize GitAnalyzer Args: repo_path: Path to git repository cache_dir: Optional directory for caching git data """ self.repo_path = Path(repo_path) self.cache_dir = cache_dir or (self.repo_path / ".claude-symbols" / "git_cache") self.cache_dir.mkdir(parents=True, exist_ok=True) # Check if git is available and this is a git repo self.git_available = self._check_git() self._blame_cache: Dict[str, List[BlameInfo]] = {} self._commit_cache: Dict[str, CommitInfo] = {} def _check_git(self) -> bool: """Check if git is available and we're in a git repository""" try: result = subprocess.run( ["git", "rev-parse", "--git-dir"], cwd=self.repo_path, capture_output=True, text=True, check=False ) return result.returncode == 0 except (subprocess.SubprocessError, FileNotFoundError): return False def _run_git_command(self, args: List[str]) -> Optional[str]: """Run a git command and return output""" if not self.git_available: return None try: result = subprocess.run( ["git"] + args, cwd=self.repo_path, capture_output=True, text=True, check=True ) return result.stdout except subprocess.CalledProcessError: return None def get_blame(self, file_path: Path, use_cache: bool = True) -> List[BlameInfo]: """ Get blame information for a file Args: file_path: Path to file relative to repo root use_cache: Whether to use cached blame data Returns: List of BlameInfo for each line in the file """ str_path = str(file_path) # Check cache if use_cache and str_path in self._blame_cache: return self._blame_cache[str_path] blame_info = [] # Run git blame with porcelain format for easier parsing output = self._run_git_command([ "blame", "--porcelain", "--line-porcelain", str(file_path) ]) if not output: return blame_info # Parse porcelain output lines = output.strip().split('\n') i = 0 while i < len(lines): if not lines[i]: i += 1 continue # First line has commit hash and line numbers parts = lines[i].split() if len(parts) < 3: i += 1 continue # Check if first part is a valid commit hash (40 hex chars) if not all(c in '0123456789abcdef' for c in parts[0]): i += 1 continue commit_hash = parts[0] try: original_line = int(parts[1]) final_line = int(parts[2]) except (ValueError, IndexError): i += 1 continue # Parse metadata author_name = "" author_email = "" commit_time = 0 line_content = "" i += 1 while i < len(lines): line = lines[i] if line.startswith('author '): author_name = line[7:] elif line.startswith('author-mail '): author_email = line[12:].strip('<>') elif line.startswith('author-time '): commit_time = int(line[12:]) elif line.startswith('\t'): # This is the actual line content line_content = line[1:] break i += 1 # Create BlameInfo if commit_time > 0: blame_info.append(BlameInfo( file_path=str_path, line_number=final_line, commit_hash=commit_hash, author_name=author_name, author_email=author_email, commit_date=datetime.fromtimestamp(commit_time), line_content=line_content )) i += 1 # Cache the result if use_cache: self._blame_cache[str_path] = blame_info return blame_info def get_file_history(self, file_path: Path, limit: int = 100) -> List[CommitInfo]: """ Get commit history for a specific file Args: file_path: Path to file relative to repo root limit: Maximum number of commits to return Returns: List of CommitInfo for the file """ commits = [] # Get commit history with stats output = self._run_git_command([ "log", f"--max-count={limit}", "--pretty=format:%H|%an|%ae|%at|%s", "--numstat", "--", str(file_path) ]) if not output: return commits lines = output.strip().split('\n') i = 0 while i < len(lines): if not lines[i] or not '|' in lines[i]: i += 1 continue # Parse commit line parts = lines[i].split('|') if len(parts) != 5: i += 1 continue commit_hash = parts[0] author_name = parts[1] author_email = parts[2] commit_time = int(parts[3]) commit_message = parts[4] # Check for bug fix is_bug_fix = self._is_bug_fix_commit(commit_message) # Parse numstat (next line) additions = 0 deletions = 0 files_changed = [] i += 1 if i < len(lines) and lines[i] and '\t' in lines[i]: stat_parts = lines[i].split('\t') if len(stat_parts) >= 3: try: additions = int(stat_parts[0]) if stat_parts[0] != '-' else 0 deletions = int(stat_parts[1]) if stat_parts[1] != '-' else 0 files_changed = [stat_parts[2]] except ValueError: pass commits.append(CommitInfo( commit_hash=commit_hash, author_name=author_name, author_email=author_email, commit_date=datetime.fromtimestamp(commit_time), commit_message=commit_message, files_changed=files_changed, additions=additions, deletions=deletions, is_bug_fix=is_bug_fix )) i += 1 return commits def analyze_change_frequency(self, since: Optional[datetime] = None) -> Dict[str, float]: """ Calculate change frequency for all files in the repository Args: since: Only consider commits after this date Returns: Dictionary mapping file paths to commits per month """ frequency_map = {} # Get all files with their commit counts since_arg = [] if since: since_arg = [f"--since={since.isoformat()}"] output = self._run_git_command([ "log", "--pretty=format:", "--name-only" ] + since_arg) if not output: return frequency_map # Count commits per file file_commits = defaultdict(int) for line in output.strip().split('\n'): if line: file_commits[line] += 1 # Calculate time range first_commit_output = self._run_git_command([ "log", "--reverse", "--pretty=format:%at", "-1" ]) if first_commit_output: first_commit_time = int(first_commit_output.strip()) first_commit_date = datetime.fromtimestamp(first_commit_time) if since and since > first_commit_date: first_commit_date = since months_elapsed = max(1, (datetime.now() - first_commit_date).days / 30) # Calculate frequency for file_path, commit_count in file_commits.items(): frequency_map[file_path] = commit_count / months_elapsed return frequency_map def detect_bug_fixes(self, limit: int = 1000) -> List[CommitInfo]: """ Identify commits that are likely bug fixes Args: limit: Maximum number of commits to analyze Returns: List of CommitInfo for bug fix commits """ bug_fixes = [] # Get recent commits output = self._run_git_command([ "log", f"--max-count={limit}", "--pretty=format:%H|%an|%ae|%at|%s", "--numstat" ]) if not output: return bug_fixes lines = output.strip().split('\n') i = 0 while i < len(lines): if not lines[i] or not '|' in lines[i]: i += 1 continue parts = lines[i].split('|') if len(parts) != 5: i += 1 continue commit_message = parts[4] # Check if it's a bug fix if self._is_bug_fix_commit(commit_message): commit_hash = parts[0] # Get full commit info if commit_hash not in self._commit_cache: commit_info = CommitInfo( commit_hash=commit_hash, author_name=parts[1], author_email=parts[2], commit_date=datetime.fromtimestamp(int(parts[3])), commit_message=commit_message, files_changed=[], additions=0, deletions=0, is_bug_fix=True ) # Parse file changes i += 1 while i < len(lines) and lines[i] and '\t' in lines[i]: stat_parts = lines[i].split('\t') if len(stat_parts) >= 3: commit_info.files_changed.append(stat_parts[2]) try: if stat_parts[0] != '-': commit_info.additions += int(stat_parts[0]) if stat_parts[1] != '-': commit_info.deletions += int(stat_parts[1]) except ValueError: pass i += 1 self._commit_cache[commit_hash] = commit_info bug_fixes.append(commit_info) else: bug_fixes.append(self._commit_cache[commit_hash]) else: i += 1 return bug_fixes def map_author_expertise(self, min_contributions: int = 3) -> Dict[str, List[AuthorExpertise]]: """ Map authors to their areas of expertise based on contribution patterns Args: min_contributions: Minimum contributions to consider expertise Returns: Dictionary mapping author emails to their expertise areas """ expertise_map = defaultdict(list) author_file_stats = defaultdict(lambda: defaultdict(dict)) # Get contribution statistics output = self._run_git_command([ "log", "--pretty=format:%ae|%an|%at", "--name-only" ]) if not output: return expertise_map lines = output.strip().split('\n') i = 0 while i < len(lines): if not lines[i] or not '|' in lines[i]: i += 1 continue parts = lines[i].split('|') if len(parts) != 3: i += 1 continue author_email = parts[0] author_name = parts[1] commit_time = int(parts[2]) # Get files changed in this commit i += 1 while i < len(lines) and lines[i] and '|' not in lines[i]: file_path = lines[i] # Update author statistics for this file if 'count' not in author_file_stats[author_email][file_path]: author_file_stats[author_email][file_path] = { 'count': 0, 'name': author_name, 'last_time': 0, 'first_time': commit_time } stats = author_file_stats[author_email][file_path] stats['count'] += 1 stats['last_time'] = max(stats['last_time'], commit_time) i += 1 # Calculate expertise scores now = datetime.now() for author_email, file_stats in author_file_stats.items(): for file_path, stats in file_stats.items(): if stats['count'] >= min_contributions: # Calculate expertise score based on: # - Number of contributions (40%) # - Recency of contributions (40%) # - Consistency over time (20%) contribution_score = min(1.0, stats['count'] / 20) # Cap at 20 contributions last_contribution = datetime.fromtimestamp(stats['last_time']) days_since_last = (now - last_contribution).days recency_score = max(0, 1.0 - (days_since_last / 365)) # Decay over a year first_contribution = datetime.fromtimestamp(stats['first_time']) time_span_days = (last_contribution - first_contribution).days + 1 consistency_score = min(1.0, time_span_days / 180) # Cap at 6 months expertise_score = ( contribution_score * 0.4 + recency_score * 0.4 + consistency_score * 0.2 ) expertise = AuthorExpertise( author_email=author_email, author_name=stats['name'], file_path=file_path, contribution_count=stats['count'], last_contribution=last_contribution, expertise_score=expertise_score ) expertise_map[author_email].append(expertise) # Sort expertise areas by score for author_email in expertise_map: expertise_map[author_email].sort(key=lambda x: x.expertise_score, reverse=True) return dict(expertise_map) def get_file_evolution(self, file_path: Path) -> Optional[FileEvolution]: """ Get evolution metrics for a specific file Args: file_path: Path to file relative to repo root Returns: FileEvolution metrics or None if file not in git """ # Get file history history = self.get_file_history(file_path, limit=1000) if not history: return None # Calculate metrics unique_authors = len(set(c.author_email for c in history)) bug_fix_count = sum(1 for c in history if c.is_bug_fix) # Get first and last commits first_commit = history[-1] if history else None last_commit = history[0] if history else None if not first_commit or not last_commit: return None # Calculate change frequency time_span_months = max(1, (last_commit.commit_date - first_commit.commit_date).days / 30) change_frequency = len(history) / time_span_months # Calculate stability score (inverse of bug fix ratio and change frequency) bug_fix_ratio = bug_fix_count / len(history) if history else 0 normalized_frequency = min(1.0, change_frequency / 10) # Normalize to 0-1 stability_score = max(0, 1.0 - (bug_fix_ratio * 0.6 + normalized_frequency * 0.4)) return FileEvolution( file_path=str(file_path), total_commits=len(history), unique_authors=unique_authors, last_modified=last_commit.commit_date, first_commit=first_commit.commit_date, change_frequency=change_frequency, bug_fix_count=bug_fix_count, stability_score=stability_score ) def _is_bug_fix_commit(self, commit_message: str) -> bool: """Check if a commit message indicates a bug fix""" message_lower = commit_message.lower() return any(re.search(pattern, message_lower) for pattern in self.BUG_FIX_PATTERNS) def clear_cache(self): """Clear all cached git data""" self._blame_cache.clear() self._commit_cache.clear() def get_recent_changes(self, days: int = 7, author: Optional[str] = None) -> List[CommitInfo]: """ Get recent changes in the repository Args: days: Number of days to look back author: Filter by author email or name Returns: List of recent commits """ since_date = datetime.now() - timedelta(days=days) args = [ "log", f"--since={since_date.isoformat()}", "--pretty=format:%H|%an|%ae|%at|%s", "--numstat" ] if author: args.append(f"--author={author}") output = self._run_git_command(args) if not output: return [] # Parse output similar to get_file_history commits = [] lines = output.strip().split('\n') i = 0 while i < len(lines): if not lines[i] or not '|' in lines[i]: i += 1 continue parts = lines[i].split('|') if len(parts) != 5: i += 1 continue commit_info = CommitInfo( commit_hash=parts[0], author_name=parts[1], author_email=parts[2], commit_date=datetime.fromtimestamp(int(parts[3])), commit_message=parts[4], files_changed=[], additions=0, deletions=0, is_bug_fix=self._is_bug_fix_commit(parts[4]) ) # Parse file changes i += 1 while i < len(lines) and lines[i] and '\t' in lines[i]: stat_parts = lines[i].split('\t') if len(stat_parts) >= 3: commit_info.files_changed.append(stat_parts[2]) try: if stat_parts[0] != '-': commit_info.additions += int(stat_parts[0]) if stat_parts[1] != '-': commit_info.deletions += int(stat_parts[1]) except ValueError: pass i += 1 commits.append(commit_info) return commits

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server