Skip to main content
Glama
enhanced_search.pyโ€ข24.9 kB
""" Enhanced Search Infrastructure - Intelligent Search Pipeline Implements a multi-stage search system that combines ripgrep speed with intelligent result ranking, filtering, and semantic understanding for ultra-fast code discovery. """ import concurrent.futures import json import logging import re import time from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from .ast_rule_intelligence import LLMAstReasoningEngine from .ripgrep_integration import RipgrepIntegration # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SearchStrategy(Enum): """Search strategies for different use cases.""" EXACT = "exact" # Exact pattern matching FUZZY = "fuzzy" # Fuzzy pattern matching SEMANTIC = "semantic" # Semantic similarity search HYBRID = "hybrid" # Combination of multiple strategies class ResultRanking(Enum): """Result ranking strategies.""" RELEVANCE = "relevance" # By pattern relevance FREQUENCY = "frequency" # By occurrence frequency RECENCY = "recency" # By file modification time CONFIDENCE = "confidence" # By confidence score COMBINED = "combined" # Combined ranking @dataclass class SearchContext: """Context for search operations.""" query: str path: str = "." file_types: List[str] = field(default_factory=list) exclude_patterns: List[str] = field(default_factory=list) include_patterns: List[str] = field(default_factory=list) max_results: int = 100 strategy: SearchStrategy = SearchStrategy.HYBRID ranking: ResultRanking = ResultRanking.COMBINED timeout: float = 30.0 case_sensitive: bool = False context_lines: int = 3 @dataclass class EnhancedSearchResult: """Enhanced search result with metadata and ranking.""" file_path: str line_number: int line_content: str context_before: List[str] = field(default_factory=list) context_after: List[str] = field(default_factory=list) match_type: str = "exact" # exact, fuzzy, semantic confidence_score: float = 0.0 relevance_score: float = 0.0 frequency_score: float = 0.0 recency_score: float = 0.0 combined_score: float = 0.0 language: str = "" symbol_type: str = "" metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class SearchMetrics: """Search performance metrics.""" total_files_searched: int = 0 total_matches_found: int = 0 search_duration: float = 0.0 ranking_duration: float = 0.0 cache_hits: int = 0 cache_misses: int = 0 strategy_breakdown: Dict[str, float] = field(default_factory=dict) class SearchCache: """Intelligent caching system for search results.""" def __init__(self, max_entries: int = 1000): self.cache: Dict[str, Tuple[List[EnhancedSearchResult], float]] = {} self.max_entries = max_entries self.access_times: Dict[str, float] = {} def _generate_key(self, context: SearchContext) -> str: """Generate cache key from search context.""" key_data = { "query": context.query, "path": context.path, "file_types": sorted(context.file_types), "exclude_patterns": sorted(context.exclude_patterns), "include_patterns": sorted(context.include_patterns), "strategy": context.strategy.value, "case_sensitive": context.case_sensitive, } return json.dumps(key_data, sort_keys=True) def get(self, context: SearchContext) -> Optional[List[EnhancedSearchResult]]: """Get cached results.""" key = self._generate_key(context) if key in self.cache: results, timestamp = self.cache[key] self.access_times[key] = time.time() return results return None def set(self, context: SearchContext, results: List[EnhancedSearchResult]) -> None: """Cache search results.""" key = self._generate_key(context) current_time = time.time() # Evict oldest entries if cache is full if len(self.cache) >= self.max_entries: oldest_key = min(self.access_times.keys()) del self.cache[oldest_key] del self.access_times[oldest_key] self.cache[key] = (results, current_time) self.access_times[key] = current_time def clear(self) -> None: """Clear cache.""" self.cache.clear() self.access_times.clear() class ResultRanker: """Intelligent result ranking system.""" def __init__(self): self.language_weights = { "python": 1.2, "javascript": 1.1, "typescript": 1.15, "java": 1.0, "cpp": 0.9, "c": 0.85, "go": 1.05, "rust": 1.1, "php": 0.8, "ruby": 0.85 } def calculate_relevance_score(self, result: EnhancedSearchResult, query: str) -> float: """Calculate relevance score based on pattern matching.""" content = result.line_content.lower() query_lower = query.lower() # Exact match bonus if query_lower in content: score = 1.0 else: # Fuzzy match score score = self._fuzzy_match_score(content, query_lower) # Position bonus (matches at beginning of line are more relevant) if content.startswith(query_lower): score *= 1.2 # Symbol type bonus if result.symbol_type in ["function", "class", "method"]: score *= 1.3 return min(score, 1.0) def calculate_frequency_score(self, result: EnhancedSearchResult, all_results: List[EnhancedSearchResult]) -> float: """Calculate frequency score based on occurrence patterns.""" # Count occurrences in same file file_matches = [r for r in all_results if r.file_path == result.file_path] # Fewer matches per file = higher score (more specific) if len(file_matches) == 1: return 1.0 elif len(file_matches) <= 3: return 0.8 elif len(file_matches) <= 10: return 0.6 else: return 0.4 def calculate_recency_score(self, result: EnhancedSearchResult) -> float: """Calculate recency score based on file modification time.""" try: file_path = Path(result.file_path) if file_path.exists(): mod_time = file_path.stat().st_mtime current_time = time.time() age_days = (current_time - mod_time) / (24 * 3600) # Recent files get higher scores if age_days < 1: return 1.0 elif age_days < 7: return 0.9 elif age_days < 30: return 0.8 elif age_days < 90: return 0.7 else: return 0.6 except (OSError, IOError): pass return 0.5 def _fuzzy_match_score(self, text: str, pattern: str) -> float: """Calculate fuzzy match score.""" if not pattern: return 0.0 pattern_len = len(pattern) text_len = len(text) if pattern_len == 0: return 0.0 if text_len == 0: return 0.0 # Simple character-based fuzzy matching pattern_chars = set(pattern.lower()) text_chars = set(text.lower()) intersection = pattern_chars.intersection(text_chars) union = pattern_chars.union(text_chars) return len(intersection) / len(union) def rank_results(self, results: List[EnhancedSearchResult], query: str, ranking: ResultRanking) -> List[EnhancedSearchResult]: """Rank search results using specified strategy.""" if not results: return results # Calculate individual scores for result in results: result.relevance_score = self.calculate_relevance_score(result, query) result.frequency_score = self.calculate_frequency_score(result, results) result.recency_score = self.calculate_recency_score(result) # Language bonus lang_weight = self.language_weights.get(result.language, 1.0) result.relevance_score *= lang_weight # Calculate combined scores for result in results: if ranking == ResultRanking.COMBINED: result.combined_score = ( result.relevance_score * 0.4 + result.confidence_score * 0.3 + result.frequency_score * 0.2 + result.recency_score * 0.1 ) elif ranking == ResultRanking.RELEVANCE: result.combined_score = result.relevance_score elif ranking == ResultRanking.CONFIDENCE: result.combined_score = result.confidence_score elif ranking == ResultRanking.FREQUENCY: result.combined_score = result.frequency_score elif ranking == ResultRanking.RECENCY: result.combined_score = result.recency_score # Sort by combined score results.sort(key=lambda x: x.combined_score, reverse=True) return results class EnhancedSearchInfrastructure: """Enhanced search infrastructure with intelligent pipeline.""" def __init__(self, ripgrep_path: str = "rg"): self.ripgrep = RipgrepIntegration(ripgrep_path) self.ast_intelligence = LLMAstReasoningEngine() self.cache = SearchCache() self.ranker = ResultRanker() self.metrics = SearchMetrics() # Language detection patterns self.language_patterns = { "python": [r"\.py$", r"import\s+", r"from\s+\w+\s+import"], "javascript": [r"\.js$", r"\.jsx$", r"const\s+\w+\s*=", r"function\s+\w+"], "typescript": [r"\.ts$", r"\.tsx$", r"interface\s+\w+", r"type\s+\w+"], "java": [r"\.java$", r"public\s+class", r"private\s+\w+"], "cpp": [r"\.(cpp|cc|cxx|h|hpp)$", r"#include\s*<", r"namespace\s+\w+"], "go": [r"\.go$", r"package\s+\w+", r"func\s+\w+"], "rust": [r"\.rs$", r"fn\s+\w+", r"let\s+mut\s+"], "php": [r"\.php$", r"<\?php", r"function\s+\w+"], "ruby": [r"\.rb$", r"def\s+\w+", r"module\s+\w+"], } def search(self, context: SearchContext) -> Tuple[List[EnhancedSearchResult], SearchMetrics]: """Execute enhanced search with intelligent pipeline.""" start_time = time.time() # Check cache first cached_results = self.cache.get(context) if cached_results: self.metrics.cache_hits += 1 return cached_results, self.metrics self.metrics.cache_misses += 1 try: # Execute search based on strategy if context.strategy == SearchStrategy.EXACT: results = self._exact_search(context) elif context.strategy == SearchStrategy.FUZZY: results = self._fuzzy_search(context) elif context.strategy == SearchStrategy.SEMANTIC: results = self._semantic_search(context) elif context.strategy == SearchStrategy.HYBRID: results = self._hybrid_search(context) else: results = self._exact_search(context) # Rank results ranking_start = time.time() results = self.ranker.rank_results(results, context.query, context.ranking) self.metrics.ranking_duration = time.time() - ranking_start # Update metrics self.metrics.total_matches_found = len(results) self.metrics.search_duration = time.time() - start_time # Cache results self.cache.set(context, results) return results, self.metrics except Exception as e: logger.error(f"Search failed: {e}") return [], self.metrics def _exact_search(self, context: SearchContext) -> List[EnhancedSearchResult]: """Execute exact pattern search using ripgrep.""" results = [] # Build search options for ripgrep from .ripgrep_integration import SearchOptions, SearchType options = SearchOptions( search_type=SearchType.PATTERN, case_sensitive=context.case_sensitive, include_patterns=context.include_patterns, exclude_patterns=context.exclude_patterns, max_results=context.max_results, context_lines=context.context_lines, file_types=[ft.lower() for ft in context.file_types] if context.file_types else None ) # Use ripgrep for fast exact matching ripgrep_results = self.ripgrep.search_files( pattern=context.query, path=context.path or ".", options=options ) # Convert to enhanced results for rg_result in ripgrep_results.results: enhanced_result = EnhancedSearchResult( file_path=rg_result.file_path, line_number=rg_result.line_number, line_content=rg_result.line_text, context_before=rg_result.context_before or [], context_after=rg_result.context_after or [], match_type="exact", confidence_score=1.0, language=self._detect_language(rg_result.file_path), metadata={"submatches": rg_result.submatches} ) results.append(enhanced_result) self.metrics.total_files_searched = ripgrep_results.files_searched return results def _fuzzy_search(self, context: SearchContext) -> List[EnhancedSearchResult]: """Execute fuzzy pattern search.""" results = [] # Generate fuzzy patterns fuzzy_patterns = self._generate_fuzzy_patterns(context.query) for pattern in fuzzy_patterns: # Create new context with fuzzy pattern fuzzy_context = SearchContext( query=pattern, file_types=context.file_types, exclude_patterns=context.exclude_patterns, include_patterns=context.include_patterns, max_results=context.max_results, case_sensitive=context.case_sensitive, context_lines=context.context_lines, path=context.path ) pattern_results = self._exact_search(fuzzy_context) for result in pattern_results: result.match_type = "fuzzy" result.confidence_score = self._calculate_fuzzy_confidence( result.line_content, context.query ) results.append(result) # Remove duplicates and limit results results = self._deduplicate_results(results) return results[:context.max_results] def _semantic_search(self, context: SearchContext) -> List[EnhancedSearchResult]: """Execute semantic similarity search.""" results = [] # Use AST intelligence for semantic understanding if available try: # For now, fall back to enhanced exact search for semantic results # TODO: Integrate with actual semantic analysis when available logger.info("Semantic search using enhanced pattern matching") # Generate semantic-like patterns from the query semantic_patterns = self._generate_semantic_patterns(context.query) for pattern in semantic_patterns: pattern_context = SearchContext( query=pattern, file_types=context.file_types, exclude_patterns=context.exclude_patterns, include_patterns=context.include_patterns, max_results=context.max_results // len(semantic_patterns), strategy=SearchStrategy.EXACT, ranking=ResultRanking.RELEVANCE, timeout=context.timeout / len(semantic_patterns), case_sensitive=context.case_sensitive, context_lines=context.context_lines ) pattern_results = self._exact_search(pattern_context) for result in pattern_results: result.match_type = "semantic" result.confidence_score *= 0.8 # Lower confidence for semantic inference results.append(result) except Exception as e: logger.warning(f"Semantic search failed: {e}") return results def _generate_semantic_patterns(self, query: str) -> List[str]: """Generate semantic search patterns from query.""" patterns = [query] # Include original query query_lower = query.lower() # Add conceptual variations based on common programming patterns if "function" in query_lower or "def" in query_lower: patterns.extend(["def ", "function ", "=> ", "lambda "]) if "class" in query_lower: patterns.extend(["class ", "struct ", "interface "]) if "import" in query_lower: patterns.extend(["import ", "from ", "require ", "#include "]) if "variable" in query_lower or "var" in query_lower: patterns.extend(["var ", "let ", "const ", "="]) return list(set(patterns)) def _hybrid_search(self, context: SearchContext) -> List[EnhancedSearchResult]: """Execute hybrid search combining multiple strategies.""" all_results = [] # Execute different search strategies in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Submit search tasks exact_future = executor.submit(self._exact_search, context) fuzzy_future = executor.submit(self._fuzzy_search, context) semantic_future = executor.submit(self._semantic_search, context) # Collect results exact_results = exact_future.result(timeout=context.timeout) fuzzy_results = fuzzy_future.result(timeout=context.timeout) semantic_results = semantic_future.result(timeout=context.timeout) all_results.extend(exact_results) all_results.extend(fuzzy_results) all_results.extend(semantic_results) # Remove duplicates and limit results results = self._deduplicate_results(all_results) return results[:context.max_results] def _generate_fuzzy_patterns(self, query: str) -> List[str]: """Generate fuzzy search patterns.""" patterns = [query] # Original pattern # Add character variations if len(query) > 3: # Remove one character at each position for i in range(len(query)): pattern = query[:i] + query[i+1:] patterns.append(pattern) # Add common typos common_subs = { 'a': ['s', 'e'], 'e': ['a', 'i'], 'i': ['e', 'o'], 'o': ['i', 'u'], 's': ['z'], 'z': ['s'] } for old_char, new_chars in common_subs.items(): if old_char in query: for new_char in new_chars: patterns.append(query.replace(old_char, new_char)) return list(set(patterns)) def _calculate_fuzzy_confidence(self, text: str, pattern: str) -> float: """Calculate fuzzy match confidence score.""" text_lower = text.lower() pattern_lower = pattern.lower() # Direct substring match if pattern_lower in text_lower: return 0.9 # Character overlap pattern_chars = set(pattern_lower) text_chars = set(text_lower) overlap = len(pattern_chars.intersection(text_chars)) overlap_ratio = overlap / len(pattern_chars) if pattern_chars else 0 return min(overlap_ratio, 0.8) def _detect_language(self, file_path: str) -> str: """Detect programming language from file path and content.""" # First check file extension for language, patterns in self.language_patterns.items(): if re.search(patterns[0], file_path, re.IGNORECASE): return language # Try to detect from content if file exists try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read(1000) # Read first 1KB for language, patterns in self.language_patterns.items(): for pattern in patterns[1:]: # Skip extension pattern if re.search(pattern, content, re.IGNORECASE): return language except (OSError, IOError): pass return "unknown" def _deduplicate_results(self, results: List[EnhancedSearchResult]) -> List[EnhancedSearchResult]: """Remove duplicate search results.""" seen = set() unique_results = [] for result in results: key = (result.file_path, result.line_number, result.line_content) if key not in seen: seen.add(key) unique_results.append(result) return unique_results def get_search_statistics(self) -> Dict[str, Any]: """Get search performance statistics.""" return { "cache_size": len(self.cache.cache), "cache_hit_rate": ( self.metrics.cache_hits / (self.metrics.cache_hits + self.metrics.cache_misses) if (self.metrics.cache_hits + self.metrics.cache_misses) > 0 else 0 ), "average_search_time": ( self.metrics.search_duration / max(self.metrics.cache_misses, 1) ), "total_searches": self.metrics.cache_hits + self.metrics.cache_misses, "strategy_breakdown": self.metrics.strategy_breakdown } def clear_cache(self) -> None: """Clear search cache.""" self.cache.clear() logger.info("Search cache cleared") def optimize_for_patterns(self, common_patterns: List[str]) -> None: """Optimize search infrastructure for common patterns.""" # Pre-warm cache with common patterns for pattern in common_patterns: context = SearchContext(query=pattern, max_results=10) self.search(context) logger.info(f"Optimized for {len(common_patterns)} common patterns") def _detect_optimal_strategy(self, query: str, context: str, language: str) -> SearchStrategy: """Detect optimal search strategy based on query and context.""" query_lower = query.lower() # Detect intent from query keywords if any(keyword in query_lower for keyword in ["find", "search", "locate"]): if any(keyword in query_lower for keyword in ["exact", "precise", "match"]): return SearchStrategy.EXACT elif any(keyword in query_lower for keyword in ["similar", "like", "related"]): return SearchStrategy.FUZZY elif any(keyword in query_lower for keyword in ["understand", "analyze", "meaning"]): return SearchStrategy.SEMANTIC elif any(keyword in query_lower for keyword in ["understand", "analyze", "meaning"]): return SearchStrategy.SEMANTIC # Use language-specific optimizations if language and language.lower() in ["python", "javascript", "typescript"]: return SearchStrategy.HYBRID # Default to hybrid for comprehensive results return SearchStrategy.HYBRID def get_context(self, context_id: str) -> SearchContext: """Get or create search context for context_id.""" # For now, create a new context each time # In a real implementation, this would maintain context state return SearchContext( query="", max_results=100, strategy=SearchStrategy.HYBRID, ranking=ResultRanking.COMBINED ) def refine_search_results(self, query: str, previous_results: List[Any], context: SearchContext) -> List[EnhancedSearchResult]: """Refine existing search results based on new query.""" # For now, re-execute search with refined context context.query = query results, _ = self.search(context) return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/betmoar/FastApply-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server