Skip to main content
Glama
ripgrep_integration.py25 kB
""" Ripgrep Integration Layer Ultra-fast pattern discovery using ripgrep for large codebases. Provides foundation for multi-stage search pipeline (ripgrep → AST → semantic). """ import asyncio import json import re import subprocess import time from dataclasses import asdict, dataclass from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional class SearchType(Enum): """Types of search operations supported by ripgrep.""" PATTERN = "pattern" LITERAL = "literal" WORD = "word" REGEX = "regex" class OutputFormat(Enum): """Output formats for ripgrep results.""" JSON = "json" TEXT = "text" PATHS_ONLY = "paths" @dataclass class SearchOptions: """Configuration options for ripgrep searches.""" search_type: SearchType = SearchType.PATTERN case_sensitive: bool = True include_patterns: Optional[List[str]] = None exclude_patterns: Optional[List[str]] = None max_results: Optional[int] = None context_lines: int = 0 file_types: Optional[List[str]] = None max_filesize: Optional[str] = None # e.g., "10M", "1G" max_depth: Optional[int] = None follow_symlinks: bool = False output_format: OutputFormat = OutputFormat.JSON def __post_init__(self): if self.include_patterns is None: self.include_patterns = [] if self.exclude_patterns is None: self.exclude_patterns = [] @dataclass class SearchResult: """Individual search result from ripgrep.""" file_path: str line_number: int line_text: str byte_offset: int context_before: Optional[List[str]] = None context_after: Optional[List[str]] = None submatches: Optional[List[Dict[str, Any]]] = None def __post_init__(self): if self.context_before is None: self.context_before = [] if self.context_after is None: self.context_after = [] if self.submatches is None: self.submatches = [] @dataclass class SearchResults: """Collection of search results with metadata.""" results: List[SearchResult] total_matches: int files_searched: int search_time: float pattern: str search_options: SearchOptions def to_dict(self) -> Dict[str, Any]: """Convert results to dictionary for JSON serialization.""" return { "results": [asdict(result) for result in self.results], "total_matches": self.total_matches, "files_searched": self.files_searched, "search_time": self.search_time, "pattern": self.pattern, "search_options": asdict(self.search_options), } @dataclass class FileMetrics: """Metrics about file analysis.""" file_path: str size_bytes: int line_count: int language: str encoding: str last_modified: float def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class SymbolCandidate: """Potential symbol found by ripgrep pattern matching.""" symbol_name: str symbol_type: str # function, class, variable, etc. file_path: str line_number: int confidence_score: float # 0.0 to 1.0 context_text: str def to_dict(self) -> Dict[str, Any]: return asdict(self) class RipgrepIntegration: """ High-performance ripgrep integration for pattern discovery. Provides ultra-fast search capabilities with intelligent result processing and integration with the multi-stage search pipeline. """ def __init__(self, ripgrep_path: str = "rg"): """ Initialize ripgrep integration. Args: ripgrep_path: Path to ripgrep executable (defaults to 'rg') """ self.ripgrep_path = ripgrep_path self._verify_ripgrep_installation() # Common programming language file extensions self.language_patterns = { "python": [".py"], "javascript": [".js", ".jsx", ".mjs"], "typescript": [".ts", ".tsx"], "java": [".java"], "c": [".c", ".h"], "cpp": [".cpp", ".hpp", ".cc", ".cxx"], "go": [".go"], "rust": [".rs"], "ruby": [".rb"], "php": [".php"], "swift": [".swift"], "kotlin": [".kt"], "scala": [".scala"], } # Common symbol patterns for different languages self.symbol_patterns = { "function": { "python": [r"def\s+(\w+)\s*\(", r"async\s+def\s+(\w+)\s*\("], "javascript": [r"function\s+(\w+)\s*\(", r"const\s+(\w+)\s*=\s*(\([^)]*\)\s*=>|\w+\s*\([^)]*\)\s*\{)"], "typescript": [r"function\s+(\w+)\s*\(", r"const\s+(\w+)\s*=\s*(\([^)]*\)\s*=>|\w+\s*\([^)]*\)\s*\{)"], "java": [r"(public|private|protected)?\s*(static)?\s+\w+\s+(\w+)\s*\("], "c": [r"\w+\s+(\w+)\s*\("], "cpp": [r"\w+\s+(\w+)\s*\("], "go": [r"func\s+(\w+)\s*\("], "rust": [r"fn\s+(\w+)\s*\("], }, "class": { "python": [r"class\s+(\w+)"], "javascript": [r"class\s+(\w+)"], "typescript": [r"class\s+(\w+)"], "java": [r"(public|private|protected)?\s*class\s+(\w+)"], "c": [r"struct\s+(\w+)"], "cpp": [r"class\s+(\w+)"], "go": [r"type\s+(\w+)\s+struct"], "rust": [r"struct\s+(\w+)"], }, "variable": { "python": [r"(\w+)\s*=", r"(\w+)\s*:"], "javascript": [r"(const|let|var)\s+(\w+)\s*="], "typescript": [r"(const|let|var)\s+(\w+)\s*=", r"(\w+):\s*\w+"], "java": [r"\w+\s+(\w+)\s*="], "c": [r"\w+\s+(\w+)\s*="], "cpp": [r"\w+\s+(\w+)\s*="], "go": [r"var\s+(\w+)\s*", r"(\w+)\s*:="], "rust": [r"let\s+(mut\s+)?(\w+)\s*="], }, } def _verify_ripgrep_installation(self) -> None: """Verify that ripgrep is installed and accessible.""" try: result = subprocess.run([self.ripgrep_path, "--version"], capture_output=True, text=True, timeout=5) if result.returncode != 0: raise RuntimeError(f"ripgrep not found at {self.ripgrep_path}") except (subprocess.TimeoutExpired, FileNotFoundError) as e: raise RuntimeError(f"ripgrep installation verification failed: {e}") def search_files(self, pattern: str, path: str, options: Optional[SearchOptions] = None) -> SearchResults: """ Search for files using ripgrep with the specified pattern. Args: pattern: Pattern to search for path: Directory or file path to search in options: Search configuration options Returns: SearchResults containing matched files and metadata """ if options is None: options = SearchOptions() start_time = time.time() # Build ripgrep command cmd = self._build_ripgrep_command(pattern, path, options) # Execute search try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, # 30 second timeout for searches ) if result.returncode != 0: if result.returncode == 1: # No matches found return SearchResults([], 0, 0, time.time() - start_time, pattern, options) else: raise RuntimeError(f"ripgrep search failed: {result.stderr}") # Parse results search_results = self._parse_ripgrep_output(result.stdout, options.output_format) # Estimate files searched (ripgrep doesn't provide this directly) files_searched = self._estimate_files_searched(path, options) search_time = time.time() - start_time return SearchResults( results=search_results, total_matches=len(search_results), files_searched=files_searched, search_time=search_time, pattern=pattern, search_options=options, ) except subprocess.TimeoutExpired: raise RuntimeError("ripgrep search timed out after 30 seconds") def search_code_patterns(self, pattern: str, language: str, path: str, options: Optional[SearchOptions] = None) -> SearchResults: """ Search for code patterns specific to a programming language. Args: pattern: Code pattern to search for language: Programming language (python, javascript, etc.) path: Directory or file path to search in options: Search configuration options Returns: SearchResults with language-specific optimizations """ if options is None: options = SearchOptions() # Add language-specific file extensions if language in self.language_patterns: lang_extensions = self.language_patterns[language] file_type_patterns = [f"*{ext}" for ext in lang_extensions] # Merge with existing include patterns if options.include_patterns: options.include_patterns.extend(file_type_patterns) else: options.include_patterns = file_type_patterns # Apply language-specific optimizations if language in self.symbol_patterns: # For symbol searches, use more targeted patterns for symbol_type, patterns in self.symbol_patterns[language].items(): if any(pattern in p for p in patterns): # This looks like a symbol search, enhance the pattern enhanced_pattern = self._enhance_symbol_pattern(pattern, language, symbol_type) return self.search_files(enhanced_pattern, path, options) return self.search_files(pattern, path, options) def find_symbol_candidates( self, symbol_name: str, path: str, symbol_type: Optional[str] = None, language: Optional[str] = None ) -> List[SymbolCandidate]: """ Find potential symbol candidates using ripgrep pattern matching. Args: symbol_name: Name of the symbol to find path: Directory or file path to search in symbol_type: Type of symbol (function, class, variable) language: Programming language for context Returns: List of SymbolCandidate objects with confidence scores """ candidates = [] # Determine search patterns based on symbol type and language search_patterns = self._get_symbol_search_patterns(symbol_name, symbol_type, language) for pattern, candidate_type, conf_modifier in search_patterns: options = SearchOptions( search_type=SearchType.REGEX, context_lines=2, # Get context for confidence scoring output_format=OutputFormat.JSON, ) try: results = self.search_files(pattern, path, options) for result in results.results: # Calculate confidence score based on various factors confidence = self._calculate_symbol_confidence(result, symbol_name, candidate_type, language) * conf_modifier candidate = SymbolCandidate( symbol_name=symbol_name, symbol_type=candidate_type, file_path=result.file_path, line_number=result.line_number, confidence_score=min(confidence, 1.0), context_text=result.line_text, ) candidates.append(candidate) except Exception: # Log error but continue with other patterns continue # Sort by confidence score and remove duplicates candidates.sort(key=lambda x: x.confidence_score, reverse=True) unique_candidates = self._remove_duplicate_candidates(candidates) return unique_candidates def analyze_file_metrics(self, file_path: str) -> FileMetrics: """ Analyze file metrics using ripgrep and system tools. Args: file_path: Path to the file to analyze Returns: FileMetrics object with file statistics """ path_obj = Path(file_path) if not path_obj.exists(): raise FileNotFoundError(f"File not found: {file_path}") # Basic file information stat = path_obj.stat() # Count lines using ripgrep (very fast) line_count = int( subprocess.run([self.ripgrep_path, "--count-matches", ".", file_path], capture_output=True, text=True).stdout.strip() or "0" ) # Detect language based on file extension language = self._detect_file_language(file_path) # Detect encoding (basic detection) encoding = self._detect_file_encoding(file_path) return FileMetrics( file_path=file_path, size_bytes=stat.st_size, line_count=line_count, language=language, encoding=encoding, last_modified=stat.st_mtime, ) def _build_ripgrep_command(self, pattern: str, path: str, options: SearchOptions) -> List[str]: """Build ripgrep command line arguments.""" cmd = [self.ripgrep_path] # Search type options if options.search_type == SearchType.LITERAL: cmd.append("--fixed-strings") elif options.search_type == SearchType.WORD: cmd.append("--word-regexp") # Case sensitivity if not options.case_sensitive: cmd.append("--ignore-case") # Context lines if options.context_lines > 0: cmd.extend(["--context", str(options.context_lines)]) # File type filters if options.file_types: type_patterns = [] for file_type in options.file_types: if file_type in self.language_patterns: type_patterns.extend(self.language_patterns[file_type]) if type_patterns: cmd.extend(["--type-add", f"custom:*{','.join(type_patterns)}", "--type", "custom"]) # Include patterns if options.include_patterns: for pattern in options.include_patterns: cmd.extend(["--glob", pattern]) # Exclude patterns if options.exclude_patterns: for pattern in options.exclude_patterns: cmd.extend(["--glob", f"!{pattern}"]) # Max file size if options.max_filesize: cmd.extend(["--max-filesize", options.max_filesize]) # Max depth if options.max_depth: cmd.extend(["--max-depth", str(options.max_depth)]) # Follow symlinks if options.follow_symlinks: cmd.append("--follow") # Output format if options.output_format == OutputFormat.JSON: cmd.extend(["--json"]) # Max results if options.max_results: cmd.extend(["--max-count", str(options.max_results)]) # Add pattern and path cmd.append(pattern) cmd.append(path) return cmd def _parse_ripgrep_output(self, output: str, format_type: OutputFormat) -> List[SearchResult]: """Parse ripgrep output into SearchResult objects.""" results = [] if format_type == OutputFormat.JSON: for line in output.strip().split("\n"): if line: try: data = json.loads(line) # Only process match entries if data.get("type") == "match": result = SearchResult( file_path=data.get("data", {}).get("path", {}).get("text", ""), line_number=data.get("data", {}).get("line_number", 0), line_text=data.get("data", {}).get("lines", {}).get("text", ""), byte_offset=data.get("data", {}).get("absolute_offset", 0), context_before=[], context_after=[], submatches=data.get("data", {}).get("submatches", []), ) results.append(result) except json.JSONDecodeError: continue else: # Simple text parsing for line_num, line in enumerate(output.strip().split("\n"), 1): if line.strip(): result = SearchResult( file_path="unknown", # Would need more context for path line_number=line_num, line_text=line, byte_offset=0, context_before=[], context_after=[], ) results.append(result) return results def _estimate_files_searched(self, path: str, options: SearchOptions) -> int: """Estimate number of files searched (ripgrep doesn't provide this directly).""" try: # Use find to count files that would be searched cmd = ["find", path, "-type", "f"] # Apply similar filtering logic as ripgrep if options.include_patterns: for pattern in options.include_patterns: cmd.extend(["-name", pattern]) if options.exclude_patterns: for pattern in options.exclude_patterns: cmd.extend(["!", "-name", pattern]) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: return len(result.stdout.strip().split("\n")) if result.stdout.strip() else 0 except Exception: pass # Fallback estimate return 100 # Conservative estimate def _enhance_symbol_pattern(self, pattern: str, language: str, symbol_type: str) -> str: """Enhance search pattern for better symbol matching.""" # Add language-specific enhancements if language == "python" and symbol_type == "function": return f"(def\\s+|async\\s+def\\s+){pattern}\\s*\\(" elif language in ["javascript", "typescript"] and symbol_type == "function": return f"(function\\s+{pattern}\\s*\\(|const\\s+{pattern}\\s*=\\s*\\()" elif language == "java" and symbol_type == "class": return f"class\\\\s+{pattern}\\\\s*\\{{" return pattern def _get_symbol_search_patterns(self, symbol_name: str, symbol_type: Optional[str], language: Optional[str]) -> List[tuple]: """Get search patterns for symbol detection.""" patterns = [] # Escape special regex characters in symbol name escaped_name = re.escape(symbol_name) if language and language in self.symbol_patterns: lang_patterns = self.symbol_patterns[language] if symbol_type and symbol_type in lang_patterns: # Use specific patterns for this symbol type for pattern in lang_patterns[symbol_type]: patterns.append((pattern.replace(r"(\w+)", escaped_name), symbol_type, 1.0)) else: # Try all symbol types for this language for sym_type, sym_patterns in lang_patterns.items(): for pattern in sym_patterns: patterns.append((pattern.replace(r"(\w+)", escaped_name), sym_type, 0.8)) else: # Generic patterns that work across languages generic_patterns = [ (f"\\b{escaped_name}\\b", "variable", 0.6), (f"def\\s+{escaped_name}\\s*\\(", "function", 0.7), (f"class\\s+{escaped_name}\\b", "class", 0.7), (f"function\\s+{escaped_name}\\s*\\(", "function", 0.7), ] patterns.extend(generic_patterns) return patterns def _calculate_symbol_confidence(self, result: SearchResult, symbol_name: str, symbol_type: str, language: Optional[str]) -> float: """Calculate confidence score for symbol candidate.""" confidence = 0.5 # Base confidence line_text = result.line_text.lower() # Boost confidence for exact matches if symbol_name.lower() in line_text: confidence += 0.2 # Boost confidence for symbol-specific patterns if symbol_type == "function": if any(keyword in line_text for keyword in ["def ", "function ", "fn ", "func "]): confidence += 0.2 elif symbol_type == "class": if "class " in line_text: confidence += 0.2 elif symbol_type == "variable": if any(op in line_text for op in [" = ", ":=", "const ", "let ", "var "]): confidence += 0.2 # Boost confidence for language-specific patterns if language: if language == "python" and "def " in line_text: confidence += 0.1 elif language in ["javascript", "typescript"] and "function " in line_text: confidence += 0.1 elif language == "java" and ("class " in line_text or "public " in line_text): confidence += 0.1 return confidence def _remove_duplicate_candidates(self, candidates: List[SymbolCandidate]) -> List[SymbolCandidate]: """Remove duplicate candidates (same file and line number).""" seen = set() unique_candidates = [] for candidate in candidates: key = (candidate.file_path, candidate.line_number, candidate.symbol_type) if key not in seen: seen.add(key) unique_candidates.append(candidate) return unique_candidates def _detect_file_language(self, file_path: str) -> str: """Detect programming language based on file extension.""" path_obj = Path(file_path) suffix = path_obj.suffix.lower() for language, extensions in self.language_patterns.items(): if suffix in extensions: return language return "unknown" def _detect_file_encoding(self, file_path: str) -> str: """Basic file encoding detection.""" try: with open(file_path, "rb") as f: # Read first few bytes to check for BOM bom = f.read(4) if bom.startswith(b"\xef\xbb\xbf"): return "utf-8-sig" elif bom.startswith(b"\xff\xfe"): return "utf-16-le" elif bom.startswith(b"\xfe\xff"): return "utf-16-be" else: # Default to utf-8 return "utf-8" except Exception: return "unknown" def batch_search( self, patterns: List[str], path: str, options: Optional[SearchOptions] = None, max_concurrent: int = 4 ) -> Dict[str, SearchResults]: """ Perform multiple searches concurrently. Args: patterns: List of patterns to search for path: Directory or file path to search in options: Search configuration options max_concurrent: Maximum concurrent searches Returns: Dictionary mapping patterns to SearchResults """ async def async_search(pattern: str) -> tuple: try: result = self.search_files(pattern, path, options) return (pattern, result) except Exception: return (pattern, SearchResults([], 0, 0, 0, pattern, options or SearchOptions())) # Run searches concurrently with semaphore async def run_batch_search(): semaphore = asyncio.Semaphore(max_concurrent) async def search_with_semaphore(pattern: str): async with semaphore: return await async_search(pattern) tasks = [search_with_semaphore(pattern) for pattern in patterns] return await asyncio.gather(*tasks) # Run async function from sync context try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) results = loop.run_until_complete(run_batch_search()) return dict(results)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/betmoar/FastApply-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server