"""
Hybrid searcher combining multiple search strategies:
1. File name search - find files with names matching query keywords
2. Directory structure search - find files in relevant directories
3. Content search - search within found files using LLM-generated patterns
4. Fallback broad search - if nothing found, do broad content search
This approach is especially effective for "grep-resistant" queries
that describe behavior rather than code patterns.
"""
from __future__ import annotations
from dotenv import load_dotenv
load_dotenv()
import json
import logging
import os
import re
import subprocess
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from .base import BaseSearcher, SearchItem, SearchResult
logger = logging.getLogger(__name__)
# =============================================================================
# LLM Pattern Generation
# =============================================================================
class SearchStrategy(BaseModel):
"""Search strategy generated by LLM."""
file_name_patterns: List[str] = Field(
default_factory=list,
description="Patterns to match file names (e.g., 'handler', 'router', 'context')"
)
directory_patterns: List[str] = Field(
default_factory=list,
description="Directory names to prioritize (e.g., 'core', 'handlers', 'middleware')"
)
content_patterns: List[str] = Field(
default_factory=list,
description="Patterns to search in file content"
)
class_patterns: List[str] = Field(
default_factory=list,
description="Class/type names to search for"
)
function_patterns: List[str] = Field(
default_factory=list,
description="Function/method names to search for"
)
STRATEGY_PROMPT = """You are CodeSearchStrategyGenerator.
Analyze this query about a codebase and generate a comprehensive search strategy:
QUERY: "{query}"
REPOSITORY INFO:
- Name: {repo_name}
- Main languages: {languages}
- Key directories: {directories}
Generate a multi-pronged search strategy as JSON:
{{{{
"file_name_patterns": ["handler", "router", "context"],
"directory_patterns": ["core", "handlers", "api"],
"content_patterns": ["get_response", "middleware_chain"],
"class_patterns": ["BaseHandler", "RouterGroup"],
"function_patterns": ["process_request", "handle"]
}}}}
RULES:
1. file_name_patterns: Words likely in file names (singular, lowercase)
2. directory_patterns: Directory names where implementation lives
3. content_patterns: Plain text to grep for in file contents
4. class_patterns: PascalCase class/struct names
5. function_patterns: Function/method names
Think about:
- What FILES would contain this code? (file name patterns)
- What DIRECTORIES would this be in? (directory patterns)
- What would the CLASS be called? (class patterns)
- What FUNCTIONS would implement this? (function patterns)
For Django middleware: files="handler,base,wsgi", dirs="handlers,middleware", classes="BaseHandler,WSGIHandler"
For Go routing: files="tree,router,route", dirs="", classes="node,RouterGroup", funcs="getValue,addRoute"
NO regex syntax. Use plain lowercase words. 5-8 patterns per category."""
# =============================================================================
# Hybrid Searcher
# =============================================================================
@dataclass
class RawSnippet:
"""A raw code snippet."""
file_path: str
lines: List[Tuple[int, str]]
matched_patterns: List[str]
match_type: str = "content" # "file_name", "directory", "content"
@property
def line_start(self) -> int:
return self.lines[0][0] if self.lines else 0
@property
def line_end(self) -> int:
return self.lines[-1][0] if self.lines else 0
@property
def content(self) -> str:
return "\n".join(line for _, line in self.lines)
class HybridSearcher(BaseSearcher):
"""
Hybrid searcher combining file name, directory, and content search.
"""
def __init__(
self,
model: str = "claude-sonnet-4-20250514",
context_lines: int = 50, # Large context to capture more related code
max_results: int = 10,
verbose: bool = False,
):
self.model = model
self.context_lines = context_lines
self.max_results = max_results
self.verbose = verbose
api_key = os.getenv("CLAUDE_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("CLAUDE_API_KEY or ANTHROPIC_API_KEY required")
self.llm = ChatAnthropic(
model=model,
api_key=api_key,
max_tokens=1024,
)
@property
def name(self) -> str:
return f"HybridSearcher ({self.model})"
def _log(self, msg: str) -> None:
if self.verbose:
print(f"[Hybrid] {msg}")
logger.debug(msg)
def _get_repo_info(self, repo_path: str) -> Tuple[str, List[str], List[str]]:
"""Get basic repo info for LLM context."""
repo_path = Path(repo_path)
name = repo_path.name
# Detect languages
languages = []
for ext, lang in [('.py', 'Python'), ('.go', 'Go'), ('.ts', 'TypeScript'),
('.js', 'JavaScript'), ('.cpp', 'C++'), ('.qll', 'CodeQL')]:
result = subprocess.run(
['find', str(repo_path), '-name', f'*{ext}', '-type', 'f'],
capture_output=True, text=True, timeout=5
)
if result.stdout.strip():
languages.append(lang)
# Find key directories
directories = []
try:
for item in repo_path.iterdir():
if item.is_dir() and not item.name.startswith('.'):
if item.name in ('src', 'lib', 'core', 'pkg', 'internal', 'app'):
directories.append(item.name)
except:
pass
return name, languages[:3], directories
def _generate_strategy(self, query: str, repo_path: str) -> SearchStrategy:
"""Use LLM to generate search strategy."""
name, languages, directories = self._get_repo_info(repo_path)
prompt = ChatPromptTemplate.from_messages([
("system", STRATEGY_PROMPT),
("human", "Generate search strategy for the query."),
])
chain = prompt | self.llm.with_structured_output(SearchStrategy)
result = chain.invoke({
"query": query,
"repo_name": name,
"languages": ", ".join(languages) if languages else "unknown",
"directories": ", ".join(directories) if directories else "none found",
})
return result
def _find_files_by_name(
self,
repo_path: str,
patterns: List[str],
) -> List[str]:
"""Find files whose names match patterns."""
found_files = set()
for pattern in patterns:
if len(pattern) < 3:
continue
try:
result = subprocess.run(
['find', repo_path, '-type', 'f', '-iname', f'*{pattern}*',
'!', '-path', '*/test*', '!', '-path', '*/.git/*',
'!', '-path', '*/node_modules/*'],
capture_output=True, text=True, timeout=10
)
for f in result.stdout.strip().split('\n'):
if f and not '_test.' in f.lower():
found_files.add(f)
except:
pass
return list(found_files)[:30]
def _find_files_in_directories(
self,
repo_path: str,
dir_patterns: List[str],
) -> List[str]:
"""Find code files in directories matching patterns."""
found_files = set()
for pattern in dir_patterns:
if len(pattern) < 3:
continue
try:
# Find directories matching pattern
result = subprocess.run(
['find', repo_path, '-type', 'd', '-iname', f'*{pattern}*',
'!', '-path', '*/.git/*', '!', '-path', '*/test*'],
capture_output=True, text=True, timeout=10
)
for dir_path in result.stdout.strip().split('\n'):
if not dir_path:
continue
# Find code files in this directory
files_result = subprocess.run(
['find', dir_path, '-maxdepth', '2', '-type', 'f',
'(', '-name', '*.py', '-o', '-name', '*.go',
'-o', '-name', '*.ts', '-o', '-name', '*.js',
'-o', '-name', '*.cpp', '-o', '-name', '*.h',
'-o', '-name', '*.qll', ')'],
capture_output=True, text=True, timeout=10
)
for f in files_result.stdout.strip().split('\n'):
if f and not '_test.' in f.lower():
found_files.add(f)
except:
pass
return list(found_files)[:30]
def _search_content(
self,
repo_path: str,
patterns: List[str],
target_files: Optional[List[str]] = None,
) -> List[RawSnippet]:
"""Search file content for patterns."""
snippets = []
for pattern in patterns:
if len(pattern) < 4:
continue
# Skip regex-like patterns
if any(c in pattern for c in '*+?[]()|\^$'):
continue
# Determine search paths
if target_files:
# Prioritize core/src files, increase limit
prioritized = sorted(target_files, key=lambda f: (
0 if '/core/' in f.lower() else 1,
0 if '/src/' in f.lower() else 1,
0 if 'handler' in f.lower() else 1,
f
))
search_paths = prioritized[:40] # Increased limit
else:
search_paths = [repo_path]
for search_path in search_paths:
if not Path(search_path).exists():
continue
cmd = [
'rg', '-F', '-i', '-n',
'-C', str(self.context_lines),
'--json', '-m', '20',
pattern, search_path
]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
parsed = self._parse_rg_output(result.stdout, repo_path, pattern)
snippets.extend(parsed)
except:
pass
return snippets
def _parse_rg_output(
self,
output: str,
repo_path: str,
pattern: str,
) -> List[RawSnippet]:
"""Parse ripgrep JSON output."""
file_lines: Dict[str, List[Tuple[int, str]]] = {}
for line in output.split('\n'):
if not line:
continue
try:
data = json.loads(line)
except:
continue
if data.get('type') not in ('match', 'context'):
continue
msg = data.get('data', {})
file_path = msg.get('path', {}).get('text', '')
line_text = msg.get('lines', {}).get('text', '').rstrip('\n')
line_num = msg.get('line_number', 0)
if not file_path or not line_num:
continue
try:
rel_path = str(Path(file_path).relative_to(repo_path))
except:
rel_path = file_path
file_lines.setdefault(rel_path, []).append((line_num, line_text))
# Convert to snippets
snippets = []
for file_path, lines in file_lines.items():
lines.sort(key=lambda x: x[0])
current = []
for line_num, content in lines:
if not current:
current.append((line_num, content))
elif line_num <= current[-1][0] + 50: # Larger merge gap in parsing
current.append((line_num, content))
else:
snippets.append(RawSnippet(file_path, current, [pattern]))
current = [(line_num, content)]
if current:
snippets.append(RawSnippet(file_path, current, [pattern]))
return snippets
def _read_file_snippet(
self,
file_path: str,
repo_path: str,
max_lines: int = 250, # Read more of the file
) -> Optional[RawSnippet]:
"""Read a snippet from a file."""
try:
with open(file_path, 'r', errors='ignore') as f:
lines = f.readlines()[:max_lines]
numbered_lines = [(i + 1, line.rstrip('\n')) for i, line in enumerate(lines)]
try:
rel_path = str(Path(file_path).relative_to(repo_path))
except:
rel_path = file_path
return RawSnippet(rel_path, numbered_lines, ["file_match"], "file_name")
except:
return None
def _merge_snippets(self, snippets: List[RawSnippet]) -> List[RawSnippet]:
"""Merge overlapping snippets from same file."""
if not snippets:
return []
by_file: Dict[str, List[RawSnippet]] = {}
for s in snippets:
by_file.setdefault(s.file_path, []).append(s)
merged = []
for file_path, file_snippets in by_file.items():
file_snippets.sort(key=lambda s: s.line_start)
current = None
for snippet in file_snippets:
if current is None:
current = snippet
elif snippet.line_start <= current.line_end + 100: # Large merge gap
all_lines = dict(current.lines)
all_lines.update(dict(snippet.lines))
current = RawSnippet(
file_path,
sorted(all_lines.items()),
list(set(current.matched_patterns + snippet.matched_patterns)),
current.match_type,
)
else:
merged.append(current)
current = snippet
if current:
merged.append(current)
return merged
def _rank_snippets(self, snippets: List[RawSnippet]) -> List[RawSnippet]:
"""Rank snippets by relevance."""
def score(s: RawSnippet) -> float:
# Pattern match count
pattern_score = len(s.matched_patterns) * 4
# File name match bonus
if s.match_type == "file_name":
type_score = 3.0
elif s.match_type == "directory":
type_score = 2.0
else:
type_score = 1.0
# Length preference
length = len(s.lines)
if 20 <= length <= 80:
length_score = 2.0
elif length < 20:
length_score = length / 10
else:
length_score = max(0.5, 2.0 - (length - 80) / 50)
# File type priority
ext = Path(s.file_path).suffix.lower()
if ext in ('.py', '.go', '.ts', '.cpp', '.qll'):
ext_score = 2.0
else:
ext_score = 0.5
# Penalize tests
path_lower = s.file_path.lower()
if '/test' in path_lower or '_test.' in path_lower:
test_penalty = -8.0
else:
test_penalty = 0.0
# Core directory bonus
if '/core/' in path_lower or '/src/' in path_lower or '/lib/' in path_lower:
core_bonus = 3.0
else:
core_bonus = 0.0
return pattern_score + type_score + length_score + ext_score + test_penalty + core_bonus
return sorted(snippets, key=score, reverse=True)
def search(
self,
query: str,
repo_path: str,
path: Optional[str] = None,
) -> SearchResult:
"""Perform hybrid search."""
start_time = time.time()
tool_time = 0.0
try:
repo_path = os.path.abspath(repo_path)
if path:
repo_path = os.path.join(repo_path, path)
self._log(f"Searching: {repo_path}")
self._log(f"Query: {query[:100]}...")
# Step 1: Generate search strategy
strategy = self._generate_strategy(query, repo_path)
self._log(f"File patterns: {strategy.file_name_patterns}")
self._log(f"Dir patterns: {strategy.directory_patterns}")
self._log(f"Content patterns: {strategy.content_patterns}")
self._log(f"Class patterns: {strategy.class_patterns}")
self._log(f"Function patterns: {strategy.function_patterns}")
all_snippets = []
# Step 2: Find files by name
t0 = time.time()
found_by_name = self._find_files_by_name(repo_path, strategy.file_name_patterns)
tool_time += (time.time() - t0) * 1000
self._log(f"Files by name: {len(found_by_name)}")
# Step 3: Find files in directories
t0 = time.time()
found_by_dir = self._find_files_in_directories(repo_path, strategy.directory_patterns)
tool_time += (time.time() - t0) * 1000
self._log(f"Files by dir: {len(found_by_dir)}")
# Combine target files
target_files = list(set(found_by_name + found_by_dir))[:40]
self._log(f"Target files: {len(target_files)}")
# Step 4: Search content in target files
content_patterns = (
strategy.content_patterns +
strategy.class_patterns +
strategy.function_patterns
)
t0 = time.time()
if target_files:
# Search in found files first
content_snippets = self._search_content(repo_path, content_patterns, target_files)
self._log(f"Snippets from target files: {len(content_snippets)}")
all_snippets.extend(content_snippets)
# If not enough, read whole files
if len(all_snippets) < 5:
for f in target_files[:10]:
snippet = self._read_file_snippet(f, repo_path)
if snippet:
all_snippets.append(snippet)
# Step 5: Fallback - broad content search
if len(all_snippets) < 5:
self._log("Fallback: broad content search")
broad_snippets = self._search_content(repo_path, content_patterns[:10])
all_snippets.extend(broad_snippets)
tool_time += (time.time() - t0) * 1000
self._log(f"Total snippets: {len(all_snippets)}")
# Step 6: Merge and rank
merged = self._merge_snippets(all_snippets)
self._log(f"After merge: {len(merged)}")
ranked = self._rank_snippets(merged)
final = ranked[:self.max_results]
# Convert to SearchResult
items = [
SearchItem(
file_path=s.file_path,
content=s.content,
line_start=s.line_start,
line_end=s.line_end,
match_context=", ".join(s.matched_patterns),
)
for s in final
]
total_time = (time.time() - start_time) * 1000
return SearchResult(
items=items,
patterns_used=content_patterns[:10],
execution_time_ms=total_time - tool_time,
total_time_ms=total_time,
tool_time_ms=tool_time,
)
except Exception as e:
total_time = (time.time() - start_time) * 1000
self._log(f"Error: {e}")
import traceback
self._log(traceback.format_exc())
return SearchResult(
items=[],
execution_time_ms=total_time,
total_time_ms=total_time,
tool_time_ms=0,
error=str(e),
)
class HybridSearcherVerbose(HybridSearcher):
"""Hybrid searcher with verbose logging."""
def __init__(self, **kwargs):
kwargs["verbose"] = True
super().__init__(**kwargs)