"""
SimpleSearcher - A straightforward, robust semantic search implementation.
Architecture:
1. Keyword Extraction: Extract meaningful terms from query (deterministic)
2. Multi-grep Search: Run ripgrep with each keyword, rank files by hit count
3. LLM Selection: Send top candidate files to LLM to extract relevant snippets
This approach is:
- Zero-setup: Works immediately on any repo
- Stateless: No pre-indexing required
- Fast: Single LLM call for final selection
- Robust: No complex regex that can fail
"""
import os
import re
import subprocess
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel, Field
from .base import BaseSearcher, SearchResult, SearchItem
STOP_WORDS = {
"a", "an", "the", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "must", "shall", "can", "need", "dare",
"ought", "used", "to", "of", "in", "for", "on", "with", "at", "by",
"from", "as", "into", "through", "during", "before", "after", "above",
"below", "between", "under", "again", "further", "then", "once",
"here", "there", "when", "where", "why", "how", "all", "each", "few",
"more", "most", "other", "some", "such", "no", "nor", "not", "only",
"own", "same", "so", "than", "too", "very", "just", "and", "but",
"if", "or", "because", "until", "while", "this", "that", "these",
"those", "what", "which", "who", "whom", "whose", "it", "its",
"code", "file", "files", "function", "functions", "class", "classes",
"method", "methods", "implementation", "implemented", "implement",
"logic", "find", "search", "look", "looking", "show", "get", "set",
"where", "how", "does", "work", "works", "working", "used", "using",
"call", "called", "calling", "handle", "handled", "handling",
}
@dataclass
class FileScore:
path: str
keyword_hits: int
matched_keywords: Set[str]
total_matches: int
preview_lines: List[str]
class SnippetSelection(BaseModel):
file_path: str = Field(description="Relative path to the file")
start_line: int = Field(description="Starting line number (1-indexed)")
end_line: int = Field(description="Ending line number (1-indexed)")
relevance: str = Field(description="Brief explanation of relevance")
class SelectionResult(BaseModel):
snippets: List[SnippetSelection] = Field(default_factory=list)
SELECTION_PROMPT = """You are a code search assistant. Given a query and candidate files, select the most relevant code snippets.
QUERY: {query}
CANDIDATE FILES:
{candidates}
TASK:
1. Read the candidate files
2. Identify code sections that DIRECTLY answer the query
3. Return up to {max_snippets} most relevant snippets
4. For each snippet, specify file_path, start_line, end_line (include ~20 lines context)
RULES:
- Only select code that helps answer the query
- Prefer implementation over tests
- Include enough context to be understandable
- If a file is irrelevant, don't select from it
Return JSON with "snippets" array."""
class SimpleSearcher(BaseSearcher):
"""Simple two-stage semantic search."""
@property
def name(self) -> str:
return "SimpleSearcher"
def __init__(
self,
model: str = "claude-3-5-haiku-latest",
max_keywords: int = 10,
max_candidate_files: int = 20,
max_results: int = 10,
verbose: bool = False,
):
self.model = model
self.max_keywords = max_keywords
self.max_candidate_files = max_candidate_files
self.max_results = max_results
self.verbose = verbose
if "gemini" in model.lower():
self.llm = ChatGoogleGenerativeAI(model=model, temperature=0)
else:
self.llm = ChatAnthropic(model=model, temperature=0, max_tokens=4096)
def _extract_keywords(self, query: str) -> List[str]:
"""Extract meaningful keywords from query."""
words = re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', query.lower())
keywords = [w for w in words if w not in STOP_WORDS and len(w) >= 3]
camel_case = re.findall(r'\b[A-Z][a-z]+(?:[A-Z][a-z]+)+\b', query)
snake_case = re.findall(r'\b[a-z]+(?:_[a-z]+)+\b', query)
all_keywords = []
seen = set()
for kw in camel_case + snake_case:
kw_lower = kw.lower()
if kw_lower not in seen:
all_keywords.append(kw)
seen.add(kw_lower)
for kw in keywords:
if kw not in seen:
all_keywords.append(kw)
seen.add(kw)
if self.verbose:
print(f"[Keywords] {all_keywords[:self.max_keywords]}")
return all_keywords[:self.max_keywords]
def _run_ripgrep(
self,
keyword: str,
repo_path: str,
limit_path: Optional[str] = None,
) -> Dict[str, List[Tuple[int, str]]]:
"""Run ripgrep for a single keyword."""
search_path = repo_path
if limit_path:
candidate = str(Path(repo_path) / limit_path)
if Path(candidate).exists():
search_path = candidate
cmd = [
"rg", "-i", "-n", "-F",
"--max-count", "50",
"--max-filesize", "1M",
keyword, search_path,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode not in (0, 1):
return {}
matches: Dict[str, List[Tuple[int, str]]] = defaultdict(list)
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split(":", 2)
if len(parts) >= 3:
file_path = parts[0]
try:
line_num = int(parts[1])
content = parts[2]
try:
rel_path = str(Path(file_path).relative_to(repo_path))
except ValueError:
rel_path = file_path
matches[rel_path].append((line_num, content))
except ValueError:
continue
return dict(matches)
except (subprocess.TimeoutExpired, FileNotFoundError):
return {}
def _score_files(
self,
keyword_matches: Dict[str, Dict[str, List[Tuple[int, str]]]],
) -> List[FileScore]:
"""Score files by keyword coverage."""
file_scores: Dict[str, FileScore] = {}
for keyword, matches in keyword_matches.items():
for file_path, lines in matches.items():
if file_path not in file_scores:
file_scores[file_path] = FileScore(
path=file_path,
keyword_hits=0,
matched_keywords=set(),
total_matches=0,
preview_lines=[],
)
score = file_scores[file_path]
score.keyword_hits += 1
score.matched_keywords.add(keyword)
score.total_matches += len(lines)
for ln, content in lines[:3]:
score.preview_lines.append(f"L{ln}: {content[:100]}")
return sorted(
file_scores.values(),
key=lambda s: (
0 if "test" in s.path.lower() else 1,
s.keyword_hits,
s.total_matches,
),
reverse=True,
)
def _read_file_content(self, repo_path: str, file_path: str) -> Optional[str]:
"""Read file content with line numbers."""
try:
with open(Path(repo_path) / file_path, "r", errors="ignore") as f:
lines = f.readlines()
return "\n".join(f"{i+1:4d}| {l.rstrip()}" for i, l in enumerate(lines))
except Exception:
return None
def _select_snippets(
self, query: str, candidates: List[Tuple[str, str]],
) -> List[SnippetSelection]:
"""Use LLM to select relevant snippets."""
candidates_text = ""
for file_path, content in candidates:
lines = content.split("\n")
if len(lines) > 500:
content = "\n".join(lines[:500]) + "\n... (truncated)"
candidates_text += f"\n\n=== FILE: {file_path} ===\n{content}"
if len(candidates_text) > 50000:
candidates_text = candidates_text[:50000] + "\n... (truncated)"
prompt = ChatPromptTemplate.from_template(SELECTION_PROMPT)
chain = prompt | self.llm.with_structured_output(SelectionResult)
try:
result = chain.invoke({
"query": query,
"candidates": candidates_text,
"max_snippets": self.max_results,
})
return result.snippets
except Exception as e:
if self.verbose:
print(f"[LLM ERROR] {e}")
return []
def _extract_snippet(
self, repo_path: str, selection: SnippetSelection,
) -> Optional[SearchItem]:
"""Extract actual snippet content from file."""
try:
with open(Path(repo_path) / selection.file_path, "r", errors="ignore") as f:
lines = f.readlines()
start = max(0, selection.start_line - 1)
end = min(len(lines), selection.end_line)
return SearchItem(
file_path=selection.file_path,
content="".join(lines[start:end]),
line_start=start + 1,
line_end=end,
)
except Exception:
return None
def search(
self,
query: str,
repo_path: str = ".",
path: Optional[str] = None,
) -> SearchResult:
"""Perform semantic search."""
import time
start_time = time.time()
repo_path = os.path.abspath(repo_path)
# Step 1: Extract keywords
keywords = self._extract_keywords(query)
if not keywords:
return SearchResult(items=[], error="No keywords extracted")
if self.verbose:
print(f"[Step 1] Keywords: {keywords}")
# Step 2: Ripgrep each keyword
keyword_matches: Dict[str, Dict[str, List[Tuple[int, str]]]] = {}
for kw in keywords:
matches = self._run_ripgrep(kw, repo_path, path)
if matches:
keyword_matches[kw] = matches
if not keyword_matches:
return SearchResult(items=[], error="No matches found")
if self.verbose:
total = len(set(f for m in keyword_matches.values() for f in m))
print(f"[Step 2] Found matches in {total} files")
# Step 3: Score and rank files
ranked = self._score_files(keyword_matches)
top_files = ranked[:self.max_candidate_files]
if self.verbose:
print(f"[Step 3] Top candidates:")
for f in top_files[:5]:
print(f" {f.path}: {f.keyword_hits}kw, {f.total_matches}m")
# Step 4: Read files
candidates = []
for fs in top_files:
content = self._read_file_content(repo_path, fs.path)
if content:
candidates.append((fs.path, content))
if not candidates:
return SearchResult(items=[], error="Could not read files")
# Step 5: LLM selection
selections = self._select_snippets(query, candidates)
if self.verbose:
print(f"[Step 5] LLM selected {len(selections)} snippets")
# Step 6: Extract snippets
items = []
for sel in selections:
snippet = self._extract_snippet(repo_path, sel)
if snippet:
items.append(snippet)
# Fallback: return top files if LLM failed
if not items:
if self.verbose:
print("[Fallback] Returning top files")
for fp, content in candidates[:self.max_results]:
clean = "\n".join(
l.split("| ", 1)[1] if "| " in l else l
for l in content.split("\n")[:200]
)
items.append(SearchItem(file_path=fp, content=clean))
return SearchResult(
items=items[:self.max_results],
execution_time_ms=(time.time() - start_time) * 1000,
)
class SimpleSearcherVerbose(SimpleSearcher):
def __init__(self, **kwargs):
kwargs["verbose"] = True
super().__init__(**kwargs)
class SimpleSearcherNoLLM(BaseSearcher):
"""
Simplest searcher: keyword extraction + ripgrep ranking, NO LLM.
Returns top files based on keyword hit count.
Fast, deterministic, zero API calls.
"""
@property
def name(self) -> str:
return "SimpleSearcherNoLLM"
def __init__(
self,
max_keywords: int = 10,
max_results: int = 10,
max_lines_per_file: int = 500,
verbose: bool = False,
):
self.max_keywords = max_keywords
self.max_results = max_results
self.max_lines_per_file = max_lines_per_file
self.verbose = verbose
def _extract_keywords(self, query: str) -> List[str]:
"""Extract keywords from query."""
words = re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', query.lower())
keywords = [w for w in words if w not in STOP_WORDS and len(w) >= 3]
camel_case = re.findall(r'\b[A-Z][a-z]+(?:[A-Z][a-z]+)+\b', query)
snake_case = re.findall(r'\b[a-z]+(?:_[a-z]+)+\b', query)
all_kw = []
seen = set()
for kw in camel_case + snake_case + keywords:
kw_l = kw.lower()
if kw_l not in seen:
all_kw.append(kw)
seen.add(kw_l)
return all_kw[:self.max_keywords]
def _run_ripgrep(self, keyword: str, repo_path: str, limit_path: Optional[str] = None) -> Dict[str, int]:
"""Run ripgrep, return file -> match count."""
search_path = repo_path
if limit_path:
candidate = str(Path(repo_path) / limit_path)
if Path(candidate).exists():
search_path = candidate
cmd = ["rg", "-i", "-c", "-F", "--max-filesize", "1M", keyword, search_path]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode not in (0, 1):
return {}
file_counts: Dict[str, int] = {}
for line in result.stdout.strip().split("\n"):
if ":" in line:
parts = line.rsplit(":", 1)
if len(parts) == 2:
fp, count = parts
try:
rel_path = str(Path(fp).relative_to(repo_path))
except ValueError:
rel_path = fp
file_counts[rel_path] = int(count)
return file_counts
except Exception:
return {}
def _read_file(self, repo_path: str, file_path: str) -> Optional[str]:
"""Read file content."""
try:
with open(Path(repo_path) / file_path, "r", errors="ignore") as f:
lines = f.readlines()
return "".join(lines[:self.max_lines_per_file])
except Exception:
return None
def search(
self,
query: str,
repo_path: str = ".",
path: Optional[str] = None,
) -> SearchResult:
import time
start_time = time.time()
repo_path = os.path.abspath(repo_path)
# Extract keywords
keywords = self._extract_keywords(query)
if not keywords:
return SearchResult(items=[], error="No keywords")
if self.verbose:
print(f"[Keywords] {keywords}")
# Score files by keyword hits
file_scores: Dict[str, Tuple[int, int]] = {} # file -> (keyword_count, total_matches)
for kw in keywords:
matches = self._run_ripgrep(kw, repo_path, path)
for fp, count in matches.items():
if fp not in file_scores:
file_scores[fp] = (0, 0)
kw_count, total = file_scores[fp]
file_scores[fp] = (kw_count + 1, total + count)
if not file_scores:
return SearchResult(items=[], error="No matches")
# Rank: keyword count desc, then total matches desc
# Penalize test files
ranked = sorted(
file_scores.items(),
key=lambda x: (
0 if "test" in x[0].lower() else 1,
x[1][0], # keyword count
x[1][1], # total matches
),
reverse=True,
)
if self.verbose:
print(f"[Top files]")
for fp, (kc, tc) in ranked[:5]:
print(f" {fp}: {kc}kw, {tc}m")
# Return top files
items = []
for fp, _ in ranked[:self.max_results]:
content = self._read_file(repo_path, fp)
if content:
items.append(SearchItem(file_path=fp, content=content))
return SearchResult(
items=items,
execution_time_ms=(time.time() - start_time) * 1000,
)
class SimpleSearcherNoLLMVerbose(SimpleSearcherNoLLM):
def __init__(self, **kwargs):
kwargs["verbose"] = True
super().__init__(**kwargs)