clean_search_git.py•17.6 kB
"""
Clean SmartCodeSearch with Git History Integration
Enhances search with temporal context, author attribution, and change patterns
"""
import sys
import os
import sqlite3
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any
from datetime import datetime, timedelta
import json
# Import base search functionality
from src.core.clean_search import CleanSmartCodeSearch
from src.core.git_analyzer import GitAnalyzer, FileEvolution, AuthorExpertise
from src.core.db_wrapper import ThreadSafeDB
class GitAwareCleanSmartCodeSearch(CleanSmartCodeSearch):
"""Enhanced search with Git history integration"""
def __init__(self, project_root=".", quiet=True):
"""Initialize with Git analysis capabilities"""
super().__init__(project_root, quiet)
# Initialize Git analyzer
self.git_analyzer = GitAnalyzer(self.root)
self.git_available = self.git_analyzer.git_available
# Extended database schema for git data
self._init_git_tables()
# Cache for git metrics
self._evolution_cache: Dict[str, FileEvolution] = {}
self._expertise_cache: Optional[Dict[str, List[AuthorExpertise]]] = None
def _init_git_tables(self):
"""Initialize Git-related database tables"""
with self.db.get_connection() as conn:
# Git history table
conn.execute("""
CREATE TABLE IF NOT EXISTS git_history (
file_path TEXT,
line_number INTEGER,
commit_hash TEXT,
author_name TEXT,
author_email TEXT,
commit_date TEXT,
commit_message TEXT,
is_bug_fix BOOLEAN DEFAULT 0,
PRIMARY KEY (file_path, line_number)
)
""")
# File evolution metrics
conn.execute("""
CREATE TABLE IF NOT EXISTS file_evolution (
file_path TEXT PRIMARY KEY,
total_commits INTEGER,
unique_authors INTEGER,
last_modified TEXT,
first_commit TEXT,
change_frequency REAL,
bug_fix_count INTEGER,
stability_score REAL,
last_indexed TEXT
)
""")
# Author expertise
conn.execute("""
CREATE TABLE IF NOT EXISTS author_expertise (
author_email TEXT,
file_path TEXT,
contribution_count INTEGER,
last_contribution TEXT,
expertise_score REAL,
PRIMARY KEY (author_email, file_path)
)
""")
# Create indexes for better query performance
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_git_history_author
ON git_history(author_email)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_evolution_stability
ON file_evolution(stability_score DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_evolution_frequency
ON file_evolution(change_frequency DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_expertise_score
ON author_expertise(expertise_score DESC)
""")
conn.commit()
def index_git_history(self, force: bool = False):
"""
Index Git history for all files in the project
Args:
force: Force re-indexing even if already indexed
"""
if not self.git_available:
return
print("Indexing Git history...")
# Get all tracked files
with self.db.get_connection() as conn:
cursor = conn.execute("SELECT DISTINCT file_path FROM files")
files = [row[0] for row in cursor.fetchall()]
# Index evolution metrics for each file
for file_path in files:
path = Path(file_path)
if not path.exists():
continue
# Check if already indexed recently (within last day)
if not force:
with self.db.get_connection() as conn:
cursor = conn.execute(
"SELECT last_indexed FROM file_evolution WHERE file_path = ?",
(file_path,)
)
row = cursor.fetchone()
if row and row[0]:
last_indexed = datetime.fromisoformat(row[0])
if (datetime.now() - last_indexed).days < 1:
continue
# Get file evolution metrics
evolution = self.git_analyzer.get_file_evolution(path)
if evolution:
self._store_file_evolution(evolution)
# Index author expertise
self._index_author_expertise()
print(f"Git history indexed for {len(files)} files")
def _store_file_evolution(self, evolution: FileEvolution):
"""Store file evolution metrics in database"""
with self.db.get_connection() as conn:
conn.execute("""
INSERT OR REPLACE INTO file_evolution
(file_path, total_commits, unique_authors, last_modified,
first_commit, change_frequency, bug_fix_count,
stability_score, last_indexed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
evolution.file_path,
evolution.total_commits,
evolution.unique_authors,
evolution.last_modified.isoformat(),
evolution.first_commit.isoformat(),
evolution.change_frequency,
evolution.bug_fix_count,
evolution.stability_score,
datetime.now().isoformat()
))
conn.commit()
# Update cache
self._evolution_cache[evolution.file_path] = evolution
def _index_author_expertise(self):
"""Index author expertise across the codebase"""
expertise_map = self.git_analyzer.map_author_expertise(min_contributions=2)
with self.db.get_connection() as conn:
# Clear existing expertise data
conn.execute("DELETE FROM author_expertise")
# Insert new expertise data
for author_email, expertise_areas in expertise_map.items():
for expertise in expertise_areas:
conn.execute("""
INSERT INTO author_expertise
(author_email, file_path, contribution_count,
last_contribution, expertise_score)
VALUES (?, ?, ?, ?, ?)
""", (
expertise.author_email,
expertise.file_path,
expertise.contribution_count,
expertise.last_contribution.isoformat(),
expertise.expertise_score
))
conn.commit()
# Update cache
self._expertise_cache = expertise_map
def search_with_git_context(self,
query: str,
limit: int = 20,
author: Optional[str] = None,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
sort_by: str = "relevance",
stability_filter: Optional[float] = None) -> List[Tuple[str, float, Dict]]:
"""
Enhanced search with Git context filtering and ranking
Args:
query: Search query
limit: Maximum results
author: Filter by author email or name
since: Only include files modified after this date
until: Only include files modified before this date
sort_by: Sort order - "relevance", "recent", "stable", "frequent"
stability_filter: Minimum stability score (0-1)
Returns:
List of (file_path, score, metadata) tuples
"""
# Get base search results
results = self.search(query, limit=limit*2) # Get extra for filtering
# Enhance with git metadata
enhanced_results = []
for file_path, base_score in results:
# Get git metadata
metadata = self._get_file_git_metadata(file_path)
# Apply filters
if author and author not in metadata.get('authors', []):
continue
if since:
last_modified = metadata.get('last_modified')
if last_modified and datetime.fromisoformat(last_modified) < since:
continue
if until:
last_modified = metadata.get('last_modified')
if last_modified and datetime.fromisoformat(last_modified) > until:
continue
if stability_filter:
stability = metadata.get('stability_score', 0)
if stability < stability_filter:
continue
# Adjust score based on sort preference
adjusted_score = self._adjust_score_for_git_context(
base_score, metadata, sort_by
)
enhanced_results.append((file_path, adjusted_score, metadata))
# Sort by adjusted score
enhanced_results.sort(key=lambda x: x[1], reverse=True)
return enhanced_results[:limit]
def _get_file_git_metadata(self, file_path: str) -> Dict[str, Any]:
"""Get Git metadata for a file"""
metadata = {}
# Try cache first
if file_path in self._evolution_cache:
evolution = self._evolution_cache[file_path]
else:
# Query database
with self.db.get_connection() as conn:
cursor = conn.execute("""
SELECT total_commits, unique_authors, last_modified,
first_commit, change_frequency, bug_fix_count,
stability_score
FROM file_evolution
WHERE file_path = ?
""", (file_path,))
row = cursor.fetchone()
if row:
metadata = {
'total_commits': row[0],
'unique_authors': row[1],
'last_modified': row[2],
'first_commit': row[3],
'change_frequency': row[4],
'bug_fix_count': row[5],
'stability_score': row[6]
}
# Get authors for this file
with self.db.get_connection() as conn:
cursor = conn.execute("""
SELECT DISTINCT author_email
FROM author_expertise
WHERE file_path = ?
ORDER BY expertise_score DESC
LIMIT 5
""", (file_path,))
metadata['authors'] = [row[0] for row in cursor.fetchall()]
return metadata
def _adjust_score_for_git_context(self,
base_score: float,
metadata: Dict[str, Any],
sort_by: str) -> float:
"""Adjust search score based on Git context"""
adjusted_score = base_score
if sort_by == "recent":
# Boost recently modified files
if metadata.get('last_modified'):
last_modified = datetime.fromisoformat(metadata['last_modified'])
days_old = (datetime.now() - last_modified).days
recency_boost = max(0, 1.0 - (days_old / 365))
adjusted_score *= (1 + recency_boost * 0.5)
elif sort_by == "stable":
# Boost stable files
stability = metadata.get('stability_score', 0.5)
adjusted_score *= (1 + stability * 0.3)
elif sort_by == "frequent":
# Boost frequently changed files
frequency = metadata.get('change_frequency', 0)
freq_boost = min(1.0, frequency / 10)
adjusted_score *= (1 + freq_boost * 0.4)
return adjusted_score
def get_file_authors(self, file_path: str) -> List[Dict[str, Any]]:
"""
Get list of authors who have contributed to a file
Args:
file_path: Path to file
Returns:
List of author information with expertise scores
"""
with self.db.get_connection() as conn:
cursor = conn.execute("""
SELECT author_email, contribution_count,
last_contribution, expertise_score
FROM author_expertise
WHERE file_path = ?
ORDER BY expertise_score DESC
""", (file_path,))
authors = []
for row in cursor.fetchall():
authors.append({
'email': row[0],
'contributions': row[1],
'last_contribution': row[2],
'expertise_score': row[3]
})
return authors
def find_expert_for_area(self, area_pattern: str) -> List[Dict[str, Any]]:
"""
Find experts for a specific area of the codebase
Args:
area_pattern: Pattern to match file paths (e.g., "src/core/%")
Returns:
List of experts with their expertise scores
"""
with self.db.get_connection() as conn:
cursor = conn.execute("""
SELECT author_email, SUM(contribution_count) as total_contributions,
MAX(last_contribution) as last_contrib,
AVG(expertise_score) as avg_expertise
FROM author_expertise
WHERE file_path LIKE ?
GROUP BY author_email
ORDER BY avg_expertise DESC
LIMIT 10
""", (area_pattern,))
experts = []
for row in cursor.fetchall():
experts.append({
'email': row[0],
'total_contributions': row[1],
'last_contribution': row[2],
'average_expertise': row[3]
})
return experts
def get_bug_prone_files(self, min_bugs: int = 2) -> List[Dict[str, Any]]:
"""
Get files with high bug fix counts
Args:
min_bugs: Minimum bug fixes to be considered bug-prone
Returns:
List of bug-prone files with metrics
"""
with self.db.get_connection() as conn:
cursor = conn.execute("""
SELECT file_path, bug_fix_count, total_commits,
stability_score, change_frequency
FROM file_evolution
WHERE bug_fix_count >= ?
ORDER BY bug_fix_count DESC
""", (min_bugs,))
bug_prone = []
for row in cursor.fetchall():
bug_prone.append({
'file_path': row[0],
'bug_fixes': row[1],
'total_commits': row[2],
'stability_score': row[3],
'change_frequency': row[4],
'bug_fix_ratio': row[1] / row[2] if row[2] > 0 else 0
})
return bug_prone
def get_stable_files(self, min_stability: float = 0.8) -> List[Dict[str, Any]]:
"""
Get highly stable files in the codebase
Args:
min_stability: Minimum stability score (0-1)
Returns:
List of stable files with metrics
"""
with self.db.get_connection() as conn:
cursor = conn.execute("""
SELECT file_path, stability_score, total_commits,
bug_fix_count, last_modified
FROM file_evolution
WHERE stability_score >= ?
ORDER BY stability_score DESC
""", (min_stability,))
stable_files = []
for row in cursor.fetchall():
stable_files.append({
'file_path': row[0],
'stability_score': row[1],
'total_commits': row[2],
'bug_fixes': row[3],
'last_modified': row[4]
})
return stable_files