Skip to main content
Glama

Smart Code Search MCP Server

clean_search_git.py17.6 kB
""" Clean SmartCodeSearch with Git History Integration Enhances search with temporal context, author attribution, and change patterns """ import sys import os import sqlite3 from pathlib import Path from typing import List, Tuple, Optional, Dict, Any from datetime import datetime, timedelta import json # Import base search functionality from src.core.clean_search import CleanSmartCodeSearch from src.core.git_analyzer import GitAnalyzer, FileEvolution, AuthorExpertise from src.core.db_wrapper import ThreadSafeDB class GitAwareCleanSmartCodeSearch(CleanSmartCodeSearch): """Enhanced search with Git history integration""" def __init__(self, project_root=".", quiet=True): """Initialize with Git analysis capabilities""" super().__init__(project_root, quiet) # Initialize Git analyzer self.git_analyzer = GitAnalyzer(self.root) self.git_available = self.git_analyzer.git_available # Extended database schema for git data self._init_git_tables() # Cache for git metrics self._evolution_cache: Dict[str, FileEvolution] = {} self._expertise_cache: Optional[Dict[str, List[AuthorExpertise]]] = None def _init_git_tables(self): """Initialize Git-related database tables""" with self.db.get_connection() as conn: # Git history table conn.execute(""" CREATE TABLE IF NOT EXISTS git_history ( file_path TEXT, line_number INTEGER, commit_hash TEXT, author_name TEXT, author_email TEXT, commit_date TEXT, commit_message TEXT, is_bug_fix BOOLEAN DEFAULT 0, PRIMARY KEY (file_path, line_number) ) """) # File evolution metrics conn.execute(""" CREATE TABLE IF NOT EXISTS file_evolution ( file_path TEXT PRIMARY KEY, total_commits INTEGER, unique_authors INTEGER, last_modified TEXT, first_commit TEXT, change_frequency REAL, bug_fix_count INTEGER, stability_score REAL, last_indexed TEXT ) """) # Author expertise conn.execute(""" CREATE TABLE IF NOT EXISTS author_expertise ( author_email TEXT, file_path TEXT, contribution_count INTEGER, last_contribution TEXT, expertise_score REAL, PRIMARY KEY (author_email, file_path) ) """) # Create indexes for better query performance conn.execute(""" CREATE INDEX IF NOT EXISTS idx_git_history_author ON git_history(author_email) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_evolution_stability ON file_evolution(stability_score DESC) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_evolution_frequency ON file_evolution(change_frequency DESC) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_expertise_score ON author_expertise(expertise_score DESC) """) conn.commit() def index_git_history(self, force: bool = False): """ Index Git history for all files in the project Args: force: Force re-indexing even if already indexed """ if not self.git_available: return print("Indexing Git history...") # Get all tracked files with self.db.get_connection() as conn: cursor = conn.execute("SELECT DISTINCT file_path FROM files") files = [row[0] for row in cursor.fetchall()] # Index evolution metrics for each file for file_path in files: path = Path(file_path) if not path.exists(): continue # Check if already indexed recently (within last day) if not force: with self.db.get_connection() as conn: cursor = conn.execute( "SELECT last_indexed FROM file_evolution WHERE file_path = ?", (file_path,) ) row = cursor.fetchone() if row and row[0]: last_indexed = datetime.fromisoformat(row[0]) if (datetime.now() - last_indexed).days < 1: continue # Get file evolution metrics evolution = self.git_analyzer.get_file_evolution(path) if evolution: self._store_file_evolution(evolution) # Index author expertise self._index_author_expertise() print(f"Git history indexed for {len(files)} files") def _store_file_evolution(self, evolution: FileEvolution): """Store file evolution metrics in database""" with self.db.get_connection() as conn: conn.execute(""" INSERT OR REPLACE INTO file_evolution (file_path, total_commits, unique_authors, last_modified, first_commit, change_frequency, bug_fix_count, stability_score, last_indexed) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( evolution.file_path, evolution.total_commits, evolution.unique_authors, evolution.last_modified.isoformat(), evolution.first_commit.isoformat(), evolution.change_frequency, evolution.bug_fix_count, evolution.stability_score, datetime.now().isoformat() )) conn.commit() # Update cache self._evolution_cache[evolution.file_path] = evolution def _index_author_expertise(self): """Index author expertise across the codebase""" expertise_map = self.git_analyzer.map_author_expertise(min_contributions=2) with self.db.get_connection() as conn: # Clear existing expertise data conn.execute("DELETE FROM author_expertise") # Insert new expertise data for author_email, expertise_areas in expertise_map.items(): for expertise in expertise_areas: conn.execute(""" INSERT INTO author_expertise (author_email, file_path, contribution_count, last_contribution, expertise_score) VALUES (?, ?, ?, ?, ?) """, ( expertise.author_email, expertise.file_path, expertise.contribution_count, expertise.last_contribution.isoformat(), expertise.expertise_score )) conn.commit() # Update cache self._expertise_cache = expertise_map def search_with_git_context(self, query: str, limit: int = 20, author: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, sort_by: str = "relevance", stability_filter: Optional[float] = None) -> List[Tuple[str, float, Dict]]: """ Enhanced search with Git context filtering and ranking Args: query: Search query limit: Maximum results author: Filter by author email or name since: Only include files modified after this date until: Only include files modified before this date sort_by: Sort order - "relevance", "recent", "stable", "frequent" stability_filter: Minimum stability score (0-1) Returns: List of (file_path, score, metadata) tuples """ # Get base search results results = self.search(query, limit=limit*2) # Get extra for filtering # Enhance with git metadata enhanced_results = [] for file_path, base_score in results: # Get git metadata metadata = self._get_file_git_metadata(file_path) # Apply filters if author and author not in metadata.get('authors', []): continue if since: last_modified = metadata.get('last_modified') if last_modified and datetime.fromisoformat(last_modified) < since: continue if until: last_modified = metadata.get('last_modified') if last_modified and datetime.fromisoformat(last_modified) > until: continue if stability_filter: stability = metadata.get('stability_score', 0) if stability < stability_filter: continue # Adjust score based on sort preference adjusted_score = self._adjust_score_for_git_context( base_score, metadata, sort_by ) enhanced_results.append((file_path, adjusted_score, metadata)) # Sort by adjusted score enhanced_results.sort(key=lambda x: x[1], reverse=True) return enhanced_results[:limit] def _get_file_git_metadata(self, file_path: str) -> Dict[str, Any]: """Get Git metadata for a file""" metadata = {} # Try cache first if file_path in self._evolution_cache: evolution = self._evolution_cache[file_path] else: # Query database with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT total_commits, unique_authors, last_modified, first_commit, change_frequency, bug_fix_count, stability_score FROM file_evolution WHERE file_path = ? """, (file_path,)) row = cursor.fetchone() if row: metadata = { 'total_commits': row[0], 'unique_authors': row[1], 'last_modified': row[2], 'first_commit': row[3], 'change_frequency': row[4], 'bug_fix_count': row[5], 'stability_score': row[6] } # Get authors for this file with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT DISTINCT author_email FROM author_expertise WHERE file_path = ? ORDER BY expertise_score DESC LIMIT 5 """, (file_path,)) metadata['authors'] = [row[0] for row in cursor.fetchall()] return metadata def _adjust_score_for_git_context(self, base_score: float, metadata: Dict[str, Any], sort_by: str) -> float: """Adjust search score based on Git context""" adjusted_score = base_score if sort_by == "recent": # Boost recently modified files if metadata.get('last_modified'): last_modified = datetime.fromisoformat(metadata['last_modified']) days_old = (datetime.now() - last_modified).days recency_boost = max(0, 1.0 - (days_old / 365)) adjusted_score *= (1 + recency_boost * 0.5) elif sort_by == "stable": # Boost stable files stability = metadata.get('stability_score', 0.5) adjusted_score *= (1 + stability * 0.3) elif sort_by == "frequent": # Boost frequently changed files frequency = metadata.get('change_frequency', 0) freq_boost = min(1.0, frequency / 10) adjusted_score *= (1 + freq_boost * 0.4) return adjusted_score def get_file_authors(self, file_path: str) -> List[Dict[str, Any]]: """ Get list of authors who have contributed to a file Args: file_path: Path to file Returns: List of author information with expertise scores """ with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT author_email, contribution_count, last_contribution, expertise_score FROM author_expertise WHERE file_path = ? ORDER BY expertise_score DESC """, (file_path,)) authors = [] for row in cursor.fetchall(): authors.append({ 'email': row[0], 'contributions': row[1], 'last_contribution': row[2], 'expertise_score': row[3] }) return authors def find_expert_for_area(self, area_pattern: str) -> List[Dict[str, Any]]: """ Find experts for a specific area of the codebase Args: area_pattern: Pattern to match file paths (e.g., "src/core/%") Returns: List of experts with their expertise scores """ with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT author_email, SUM(contribution_count) as total_contributions, MAX(last_contribution) as last_contrib, AVG(expertise_score) as avg_expertise FROM author_expertise WHERE file_path LIKE ? GROUP BY author_email ORDER BY avg_expertise DESC LIMIT 10 """, (area_pattern,)) experts = [] for row in cursor.fetchall(): experts.append({ 'email': row[0], 'total_contributions': row[1], 'last_contribution': row[2], 'average_expertise': row[3] }) return experts def get_bug_prone_files(self, min_bugs: int = 2) -> List[Dict[str, Any]]: """ Get files with high bug fix counts Args: min_bugs: Minimum bug fixes to be considered bug-prone Returns: List of bug-prone files with metrics """ with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT file_path, bug_fix_count, total_commits, stability_score, change_frequency FROM file_evolution WHERE bug_fix_count >= ? ORDER BY bug_fix_count DESC """, (min_bugs,)) bug_prone = [] for row in cursor.fetchall(): bug_prone.append({ 'file_path': row[0], 'bug_fixes': row[1], 'total_commits': row[2], 'stability_score': row[3], 'change_frequency': row[4], 'bug_fix_ratio': row[1] / row[2] if row[2] > 0 else 0 }) return bug_prone def get_stable_files(self, min_stability: float = 0.8) -> List[Dict[str, Any]]: """ Get highly stable files in the codebase Args: min_stability: Minimum stability score (0-1) Returns: List of stable files with metrics """ with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT file_path, stability_score, total_commits, bug_fix_count, last_modified FROM file_evolution WHERE stability_score >= ? ORDER BY stability_score DESC """, (min_stability,)) stable_files = [] for row in cursor.fetchall(): stable_files.append({ 'file_path': row[0], 'stability_score': row[1], 'total_commits': row[2], 'bug_fixes': row[3], 'last_modified': row[4] }) return stable_files

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server