git_analyzer.py•22.8 kB
"""
Git History and Evolution Analyzer for Smart Code Search
Provides blame information, change frequency, author attribution, and bug detection
"""
import subprocess
import json
import re
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple, NamedTuple, Any
from dataclasses import dataclass, asdict
from collections import defaultdict
import hashlib
@dataclass
class BlameInfo:
"""Information from git blame for a specific line"""
file_path: str
line_number: int
commit_hash: str
author_name: str
author_email: str
commit_date: datetime
line_content: str
commit_message: Optional[str] = None
@dataclass
class CommitInfo:
"""Information about a single commit"""
commit_hash: str
author_name: str
author_email: str
commit_date: datetime
commit_message: str
files_changed: List[str]
additions: int
deletions: int
is_bug_fix: bool = False
@dataclass
class FileEvolution:
"""Evolution metrics for a file"""
file_path: str
total_commits: int
unique_authors: int
last_modified: datetime
first_commit: datetime
change_frequency: float # commits per month
bug_fix_count: int
stability_score: float # 0-1, higher is more stable
@dataclass
class AuthorExpertise:
"""Author expertise for a specific file or area"""
author_email: str
author_name: str
file_path: str
contribution_count: int
last_contribution: datetime
expertise_score: float # 0-1, based on contribution frequency and recency
class GitAnalyzer:
"""Analyzes git history for code understanding and search enhancement"""
# Bug fix detection patterns
BUG_FIX_PATTERNS = [
r'\bfix\b', r'\bfixed\b', r'\bfixes\b',
r'\bbug\b', r'\bbugs\b',
r'\bpatch\b', r'\bpatched\b',
r'\bresolve\b', r'\bresolved\b',
r'\bcorrect\b', r'\bcorrected\b',
r'\brepair\b', r'\brepaired\b',
r'\bissue\s*#?\d+',
r'\bhotfix\b',
r'\bbugfix\b',
r'\bdefect\b'
]
def __init__(self, repo_path: Path, cache_dir: Optional[Path] = None):
"""
Initialize GitAnalyzer
Args:
repo_path: Path to git repository
cache_dir: Optional directory for caching git data
"""
self.repo_path = Path(repo_path)
self.cache_dir = cache_dir or (self.repo_path / ".claude-symbols" / "git_cache")
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Check if git is available and this is a git repo
self.git_available = self._check_git()
self._blame_cache: Dict[str, List[BlameInfo]] = {}
self._commit_cache: Dict[str, CommitInfo] = {}
def _check_git(self) -> bool:
"""Check if git is available and we're in a git repository"""
try:
result = subprocess.run(
["git", "rev-parse", "--git-dir"],
cwd=self.repo_path,
capture_output=True,
text=True,
check=False
)
return result.returncode == 0
except (subprocess.SubprocessError, FileNotFoundError):
return False
def _run_git_command(self, args: List[str]) -> Optional[str]:
"""Run a git command and return output"""
if not self.git_available:
return None
try:
result = subprocess.run(
["git"] + args,
cwd=self.repo_path,
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError:
return None
def get_blame(self, file_path: Path, use_cache: bool = True) -> List[BlameInfo]:
"""
Get blame information for a file
Args:
file_path: Path to file relative to repo root
use_cache: Whether to use cached blame data
Returns:
List of BlameInfo for each line in the file
"""
str_path = str(file_path)
# Check cache
if use_cache and str_path in self._blame_cache:
return self._blame_cache[str_path]
blame_info = []
# Run git blame with porcelain format for easier parsing
output = self._run_git_command([
"blame", "--porcelain", "--line-porcelain", str(file_path)
])
if not output:
return blame_info
# Parse porcelain output
lines = output.strip().split('\n')
i = 0
while i < len(lines):
if not lines[i]:
i += 1
continue
# First line has commit hash and line numbers
parts = lines[i].split()
if len(parts) < 3:
i += 1
continue
# Check if first part is a valid commit hash (40 hex chars)
if not all(c in '0123456789abcdef' for c in parts[0]):
i += 1
continue
commit_hash = parts[0]
try:
original_line = int(parts[1])
final_line = int(parts[2])
except (ValueError, IndexError):
i += 1
continue
# Parse metadata
author_name = ""
author_email = ""
commit_time = 0
line_content = ""
i += 1
while i < len(lines):
line = lines[i]
if line.startswith('author '):
author_name = line[7:]
elif line.startswith('author-mail '):
author_email = line[12:].strip('<>')
elif line.startswith('author-time '):
commit_time = int(line[12:])
elif line.startswith('\t'):
# This is the actual line content
line_content = line[1:]
break
i += 1
# Create BlameInfo
if commit_time > 0:
blame_info.append(BlameInfo(
file_path=str_path,
line_number=final_line,
commit_hash=commit_hash,
author_name=author_name,
author_email=author_email,
commit_date=datetime.fromtimestamp(commit_time),
line_content=line_content
))
i += 1
# Cache the result
if use_cache:
self._blame_cache[str_path] = blame_info
return blame_info
def get_file_history(self, file_path: Path, limit: int = 100) -> List[CommitInfo]:
"""
Get commit history for a specific file
Args:
file_path: Path to file relative to repo root
limit: Maximum number of commits to return
Returns:
List of CommitInfo for the file
"""
commits = []
# Get commit history with stats
output = self._run_git_command([
"log", f"--max-count={limit}", "--pretty=format:%H|%an|%ae|%at|%s",
"--numstat", "--", str(file_path)
])
if not output:
return commits
lines = output.strip().split('\n')
i = 0
while i < len(lines):
if not lines[i] or not '|' in lines[i]:
i += 1
continue
# Parse commit line
parts = lines[i].split('|')
if len(parts) != 5:
i += 1
continue
commit_hash = parts[0]
author_name = parts[1]
author_email = parts[2]
commit_time = int(parts[3])
commit_message = parts[4]
# Check for bug fix
is_bug_fix = self._is_bug_fix_commit(commit_message)
# Parse numstat (next line)
additions = 0
deletions = 0
files_changed = []
i += 1
if i < len(lines) and lines[i] and '\t' in lines[i]:
stat_parts = lines[i].split('\t')
if len(stat_parts) >= 3:
try:
additions = int(stat_parts[0]) if stat_parts[0] != '-' else 0
deletions = int(stat_parts[1]) if stat_parts[1] != '-' else 0
files_changed = [stat_parts[2]]
except ValueError:
pass
commits.append(CommitInfo(
commit_hash=commit_hash,
author_name=author_name,
author_email=author_email,
commit_date=datetime.fromtimestamp(commit_time),
commit_message=commit_message,
files_changed=files_changed,
additions=additions,
deletions=deletions,
is_bug_fix=is_bug_fix
))
i += 1
return commits
def analyze_change_frequency(self,
since: Optional[datetime] = None) -> Dict[str, float]:
"""
Calculate change frequency for all files in the repository
Args:
since: Only consider commits after this date
Returns:
Dictionary mapping file paths to commits per month
"""
frequency_map = {}
# Get all files with their commit counts
since_arg = []
if since:
since_arg = [f"--since={since.isoformat()}"]
output = self._run_git_command([
"log", "--pretty=format:", "--name-only"
] + since_arg)
if not output:
return frequency_map
# Count commits per file
file_commits = defaultdict(int)
for line in output.strip().split('\n'):
if line:
file_commits[line] += 1
# Calculate time range
first_commit_output = self._run_git_command([
"log", "--reverse", "--pretty=format:%at", "-1"
])
if first_commit_output:
first_commit_time = int(first_commit_output.strip())
first_commit_date = datetime.fromtimestamp(first_commit_time)
if since and since > first_commit_date:
first_commit_date = since
months_elapsed = max(1, (datetime.now() - first_commit_date).days / 30)
# Calculate frequency
for file_path, commit_count in file_commits.items():
frequency_map[file_path] = commit_count / months_elapsed
return frequency_map
def detect_bug_fixes(self, limit: int = 1000) -> List[CommitInfo]:
"""
Identify commits that are likely bug fixes
Args:
limit: Maximum number of commits to analyze
Returns:
List of CommitInfo for bug fix commits
"""
bug_fixes = []
# Get recent commits
output = self._run_git_command([
"log", f"--max-count={limit}",
"--pretty=format:%H|%an|%ae|%at|%s",
"--numstat"
])
if not output:
return bug_fixes
lines = output.strip().split('\n')
i = 0
while i < len(lines):
if not lines[i] or not '|' in lines[i]:
i += 1
continue
parts = lines[i].split('|')
if len(parts) != 5:
i += 1
continue
commit_message = parts[4]
# Check if it's a bug fix
if self._is_bug_fix_commit(commit_message):
commit_hash = parts[0]
# Get full commit info
if commit_hash not in self._commit_cache:
commit_info = CommitInfo(
commit_hash=commit_hash,
author_name=parts[1],
author_email=parts[2],
commit_date=datetime.fromtimestamp(int(parts[3])),
commit_message=commit_message,
files_changed=[],
additions=0,
deletions=0,
is_bug_fix=True
)
# Parse file changes
i += 1
while i < len(lines) and lines[i] and '\t' in lines[i]:
stat_parts = lines[i].split('\t')
if len(stat_parts) >= 3:
commit_info.files_changed.append(stat_parts[2])
try:
if stat_parts[0] != '-':
commit_info.additions += int(stat_parts[0])
if stat_parts[1] != '-':
commit_info.deletions += int(stat_parts[1])
except ValueError:
pass
i += 1
self._commit_cache[commit_hash] = commit_info
bug_fixes.append(commit_info)
else:
bug_fixes.append(self._commit_cache[commit_hash])
else:
i += 1
return bug_fixes
def map_author_expertise(self,
min_contributions: int = 3) -> Dict[str, List[AuthorExpertise]]:
"""
Map authors to their areas of expertise based on contribution patterns
Args:
min_contributions: Minimum contributions to consider expertise
Returns:
Dictionary mapping author emails to their expertise areas
"""
expertise_map = defaultdict(list)
author_file_stats = defaultdict(lambda: defaultdict(dict))
# Get contribution statistics
output = self._run_git_command([
"log", "--pretty=format:%ae|%an|%at", "--name-only"
])
if not output:
return expertise_map
lines = output.strip().split('\n')
i = 0
while i < len(lines):
if not lines[i] or not '|' in lines[i]:
i += 1
continue
parts = lines[i].split('|')
if len(parts) != 3:
i += 1
continue
author_email = parts[0]
author_name = parts[1]
commit_time = int(parts[2])
# Get files changed in this commit
i += 1
while i < len(lines) and lines[i] and '|' not in lines[i]:
file_path = lines[i]
# Update author statistics for this file
if 'count' not in author_file_stats[author_email][file_path]:
author_file_stats[author_email][file_path] = {
'count': 0,
'name': author_name,
'last_time': 0,
'first_time': commit_time
}
stats = author_file_stats[author_email][file_path]
stats['count'] += 1
stats['last_time'] = max(stats['last_time'], commit_time)
i += 1
# Calculate expertise scores
now = datetime.now()
for author_email, file_stats in author_file_stats.items():
for file_path, stats in file_stats.items():
if stats['count'] >= min_contributions:
# Calculate expertise score based on:
# - Number of contributions (40%)
# - Recency of contributions (40%)
# - Consistency over time (20%)
contribution_score = min(1.0, stats['count'] / 20) # Cap at 20 contributions
last_contribution = datetime.fromtimestamp(stats['last_time'])
days_since_last = (now - last_contribution).days
recency_score = max(0, 1.0 - (days_since_last / 365)) # Decay over a year
first_contribution = datetime.fromtimestamp(stats['first_time'])
time_span_days = (last_contribution - first_contribution).days + 1
consistency_score = min(1.0, time_span_days / 180) # Cap at 6 months
expertise_score = (
contribution_score * 0.4 +
recency_score * 0.4 +
consistency_score * 0.2
)
expertise = AuthorExpertise(
author_email=author_email,
author_name=stats['name'],
file_path=file_path,
contribution_count=stats['count'],
last_contribution=last_contribution,
expertise_score=expertise_score
)
expertise_map[author_email].append(expertise)
# Sort expertise areas by score
for author_email in expertise_map:
expertise_map[author_email].sort(key=lambda x: x.expertise_score, reverse=True)
return dict(expertise_map)
def get_file_evolution(self, file_path: Path) -> Optional[FileEvolution]:
"""
Get evolution metrics for a specific file
Args:
file_path: Path to file relative to repo root
Returns:
FileEvolution metrics or None if file not in git
"""
# Get file history
history = self.get_file_history(file_path, limit=1000)
if not history:
return None
# Calculate metrics
unique_authors = len(set(c.author_email for c in history))
bug_fix_count = sum(1 for c in history if c.is_bug_fix)
# Get first and last commits
first_commit = history[-1] if history else None
last_commit = history[0] if history else None
if not first_commit or not last_commit:
return None
# Calculate change frequency
time_span_months = max(1, (last_commit.commit_date - first_commit.commit_date).days / 30)
change_frequency = len(history) / time_span_months
# Calculate stability score (inverse of bug fix ratio and change frequency)
bug_fix_ratio = bug_fix_count / len(history) if history else 0
normalized_frequency = min(1.0, change_frequency / 10) # Normalize to 0-1
stability_score = max(0, 1.0 - (bug_fix_ratio * 0.6 + normalized_frequency * 0.4))
return FileEvolution(
file_path=str(file_path),
total_commits=len(history),
unique_authors=unique_authors,
last_modified=last_commit.commit_date,
first_commit=first_commit.commit_date,
change_frequency=change_frequency,
bug_fix_count=bug_fix_count,
stability_score=stability_score
)
def _is_bug_fix_commit(self, commit_message: str) -> bool:
"""Check if a commit message indicates a bug fix"""
message_lower = commit_message.lower()
return any(re.search(pattern, message_lower) for pattern in self.BUG_FIX_PATTERNS)
def clear_cache(self):
"""Clear all cached git data"""
self._blame_cache.clear()
self._commit_cache.clear()
def get_recent_changes(self,
days: int = 7,
author: Optional[str] = None) -> List[CommitInfo]:
"""
Get recent changes in the repository
Args:
days: Number of days to look back
author: Filter by author email or name
Returns:
List of recent commits
"""
since_date = datetime.now() - timedelta(days=days)
args = [
"log",
f"--since={since_date.isoformat()}",
"--pretty=format:%H|%an|%ae|%at|%s",
"--numstat"
]
if author:
args.append(f"--author={author}")
output = self._run_git_command(args)
if not output:
return []
# Parse output similar to get_file_history
commits = []
lines = output.strip().split('\n')
i = 0
while i < len(lines):
if not lines[i] or not '|' in lines[i]:
i += 1
continue
parts = lines[i].split('|')
if len(parts) != 5:
i += 1
continue
commit_info = CommitInfo(
commit_hash=parts[0],
author_name=parts[1],
author_email=parts[2],
commit_date=datetime.fromtimestamp(int(parts[3])),
commit_message=parts[4],
files_changed=[],
additions=0,
deletions=0,
is_bug_fix=self._is_bug_fix_commit(parts[4])
)
# Parse file changes
i += 1
while i < len(lines) and lines[i] and '\t' in lines[i]:
stat_parts = lines[i].split('\t')
if len(stat_parts) >= 3:
commit_info.files_changed.append(stat_parts[2])
try:
if stat_parts[0] != '-':
commit_info.additions += int(stat_parts[0])
if stat_parts[1] != '-':
commit_info.deletions += int(stat_parts[1])
except ValueError:
pass
i += 1
commits.append(commit_info)
return commits