"""
Semantic searcher using Claude to generate ripgrep patterns.
Flow:
1. User query → Claude (structured output) → list of ripgrep patterns
2. Execute ripgrep for each pattern
3. Aggregate and deduplicate results
4. Return code snippets
"""
from __future__ import annotations
# Load environment variables BEFORE importing langchain (for tracing setup)
from dotenv import load_dotenv
load_dotenv()
import os
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from .base import BaseSearcher, SearchItem, SearchResult
class RipgrepPattern(BaseModel):
"""A single ripgrep search pattern."""
pattern: str = Field(
description="Regex pattern for ripgrep. Use PCRE2 syntax."
)
is_fixed_string: bool = Field(
default=False,
description="If true, pattern is treated as literal string (ripgrep -F flag)"
)
case_insensitive: bool = Field(
default=False,
description="If true, search is case-insensitive (ripgrep -i flag)"
)
file_glob: Optional[str] = Field(
default=None,
description="Optional glob pattern to filter files (e.g., '*.py', '*.ts')"
)
explanation: str = Field(
description="Brief explanation of what this pattern is looking for"
)
class SearchPatterns(BaseModel):
"""Structured output from Claude with search patterns."""
patterns: List[RipgrepPattern] = Field(
description="List of ripgrep patterns to execute. Order by expected relevance."
)
reasoning: str = Field(
description="Brief explanation of the search strategy"
)
# System prompt for pattern generation
SYSTEM_PROMPT = """You are an expert code search assistant. Your task is to convert natural language queries into effective ripgrep search patterns.
Given a user's question about a codebase, generate a list of ripgrep regex patterns that will find the relevant code.
CRITICAL: Think about BOTH:
1. The symptom/problem described (what the user sees)
2. The underlying CAUSE/IMPLEMENTATION (what code creates this behavior)
For example, if user asks about "context corruption" - also search for:
- Pool/reuse patterns that could cause this (sync.Pool, object pooling)
- Reset/cleanup methods that prepare objects for reuse
- Copy methods that create safe copies
Guidelines:
1. Generate 5-10 patterns, ordered by expected relevance
2. Include patterns for BOTH symptoms AND root causes
3. Use regex features wisely: word boundaries (\\b), character classes, groups
4. Consider common naming conventions (camelCase, snake_case, PascalCase)
5. For implementation questions, look for: class definitions, function definitions, method calls
6. For debugging questions, think about what code CAUSES the described behavior
7. Use file_glob to narrow down by file type when the language is clear
Pattern Tips:
- Use \\b for word boundaries to avoid partial matches
- Use (?i) prefix or case_insensitive=true for case-insensitive search
- Escape special regex chars: . * + ? [ ] ( ) {{ }} | \\ ^ $
- For function definitions: "def\\s+function_name\\b" or "function\\s+functionName\\b"
- For class definitions: "class\\s+ClassName\\b"
- For method calls: "\\.methodName\\s*\\("
Examples of good patterns:
- "class\\s+BaseHandler\\b" - finds class definition
- "def\\s+(get_response|_get_response)\\b" - finds specific methods
- "sync\\.Pool" - finds Go object pooling (common cause of object reuse bugs)
- "func.*reset\\(" - finds reset methods used in pooling
- "func.*Copy\\(" - finds safe copy methods"""
USER_PROMPT_TEMPLATE = """Find code that answers this question:
{query}
The search will be performed in a code repository. Generate ripgrep patterns to find the relevant code."""
@dataclass
class RipgrepMatch:
"""A single match from ripgrep output."""
file_path: str
line_number: int
line_content: str
class RipgrepClaudeSearcher(BaseSearcher):
"""
Semantic searcher that uses Claude to generate ripgrep patterns.
This is the primary implementation following the architecture:
"LLM → patterns → ripgrep → raw matches → snippet filtering"
"""
def __init__(
self,
model: str = "claude-sonnet-4-20250514",
max_patterns: int = 7,
context_lines: int = 15,
max_matches_per_pattern: int = 30,
max_total_items: int = 10,
):
"""
Initialize the searcher.
Args:
model: Claude model to use
max_patterns: Maximum number of patterns to generate
context_lines: Lines of context around each match (-C flag)
max_matches_per_pattern: Maximum matches to collect per pattern
max_total_items: Maximum items to return in final result
"""
self.model = model
self.max_patterns = max_patterns
self.context_lines = context_lines
self.max_matches_per_pattern = max_matches_per_pattern
self.max_total_items = max_total_items
# Initialize LLM
api_key = os.getenv("CLAUDE_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError(
"CLAUDE_API_KEY or ANTHROPIC_API_KEY must be set in environment"
)
self.llm = ChatAnthropic(
model=model,
api_key=api_key,
max_tokens=1024,
)
# Create prompt template
self.prompt = ChatPromptTemplate.from_messages([
("system", SYSTEM_PROMPT),
("human", USER_PROMPT_TEMPLATE),
])
@property
def name(self) -> str:
return f"RipgrepClaude ({self.model})"
def _generate_patterns(self, query: str) -> SearchPatterns:
"""Use Claude to generate search patterns from natural language query."""
chain = self.prompt | self.llm.with_structured_output(SearchPatterns)
result = chain.invoke({"query": query})
return result
def _run_ripgrep(
self,
pattern: RipgrepPattern,
repo_path: str,
subpath: Optional[str] = None,
) -> List[RipgrepMatch]:
"""Execute ripgrep with the given pattern and return matches."""
# Build command with JSON output for reliable parsing
cmd = ["rg", "--json"]
# Add context lines
cmd.extend(["-C", str(self.context_lines)])
# Add flags based on pattern settings
if pattern.is_fixed_string:
cmd.append("-F")
if pattern.case_insensitive:
cmd.append("-i")
if pattern.file_glob:
cmd.extend(["-g", pattern.file_glob])
# Add max count to avoid too many results
cmd.extend(["-m", str(self.max_matches_per_pattern)])
# Add pattern
cmd.append(pattern.pattern)
# Add search path
search_path = Path(repo_path)
if subpath:
search_path = search_path / subpath
cmd.append(str(search_path))
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30, # 30 second timeout per pattern
)
# ripgrep returns 1 if no matches, 0 if matches found
if result.returncode not in (0, 1):
# Error occurred
return []
return self._parse_ripgrep_json(result.stdout, repo_path)
except subprocess.TimeoutExpired:
return []
except FileNotFoundError:
# ripgrep not installed
raise RuntimeError(
"ripgrep (rg) is not installed. Please install it: "
"https://github.com/BurntSushi/ripgrep#installation"
)
def _parse_ripgrep_json(
self,
output: str,
repo_path: str,
) -> List[RipgrepMatch]:
"""Parse ripgrep JSON output into structured matches."""
import json
matches = []
for line in output.split("\n"):
if not line:
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
continue
# We care about "match" and "context" types
if data.get("type") not in ("match", "context"):
continue
msg = data.get("data", {})
path_data = msg.get("path", {})
file_path = path_data.get("text", "")
# Get line info
lines = msg.get("lines", {})
line_text = lines.get("text", "").rstrip("\n")
line_number = msg.get("line_number", 0)
if not file_path or not line_number:
continue
# Make path relative to repo
try:
rel_path = str(Path(file_path).relative_to(repo_path))
except ValueError:
rel_path = file_path
matches.append(RipgrepMatch(
file_path=rel_path,
line_number=line_number,
line_content=line_text,
))
return matches
def _aggregate_matches(
self,
all_matches: List[RipgrepMatch],
) -> List[SearchItem]:
"""
Aggregate matches into coherent code snippets.
Groups consecutive lines from the same file into single snippets.
"""
if not all_matches:
return []
# Group by file
by_file: dict[str, List[RipgrepMatch]] = {}
for match in all_matches:
if match.file_path not in by_file:
by_file[match.file_path] = []
by_file[match.file_path].append(match)
items = []
for file_path, file_matches in by_file.items():
# Sort by line number
file_matches.sort(key=lambda m: m.line_number)
# Merge consecutive or nearby lines into snippets
snippets = []
current_snippet_lines = []
current_start = None
current_end = None
for match in file_matches:
if current_start is None:
# First line in snippet
current_start = match.line_number
current_end = match.line_number
current_snippet_lines.append(match.line_content)
elif match.line_number <= current_end + 3:
# Close enough to merge (within 3 lines)
# Fill in gaps if needed
gap = match.line_number - current_end - 1
if gap > 0:
current_snippet_lines.extend(["..."] * min(gap, 1))
current_snippet_lines.append(match.line_content)
current_end = match.line_number
else:
# Start new snippet
snippets.append({
"start": current_start,
"end": current_end,
"lines": current_snippet_lines,
})
current_start = match.line_number
current_end = match.line_number
current_snippet_lines = [match.line_content]
# Don't forget last snippet
if current_snippet_lines:
snippets.append({
"start": current_start,
"end": current_end,
"lines": current_snippet_lines,
})
# Create SearchItems from snippets
for snippet in snippets:
content = "\n".join(snippet["lines"])
items.append(SearchItem(
file_path=file_path,
content=content,
line_start=snippet["start"],
line_end=snippet["end"],
))
return items
def _deduplicate_items(self, items: List[SearchItem]) -> List[SearchItem]:
"""Remove duplicate or overlapping snippets."""
if not items:
return []
# Sort by file and start line
items.sort(key=lambda x: (x.file_path, x.line_start or 0))
result = []
seen_ranges: dict[str, List[tuple[int, int]]] = {}
for item in items:
file_path = item.file_path
start = item.line_start or 0
end = item.line_end or start
if file_path not in seen_ranges:
seen_ranges[file_path] = []
# Check if this range overlaps with any existing
overlaps = False
for existing_start, existing_end in seen_ranges[file_path]:
if start <= existing_end and end >= existing_start:
overlaps = True
break
if not overlaps:
result.append(item)
seen_ranges[file_path].append((start, end))
return result
def search(
self,
query: str,
repo_path: str,
path: Optional[str] = None,
) -> SearchResult:
"""
Perform semantic search using Claude-generated ripgrep patterns.
Args:
query: Natural language description of what to find
repo_path: Path to the repository root
path: Optional subdirectory to limit search scope
Returns:
SearchResult with found code snippets
"""
start_time = time.time()
ripgrep_time = 0.0 # Track ripgrep execution time (excluded per task spec)
try:
# Generate patterns using Claude (LLM time - counted)
patterns_result = self._generate_patterns(query)
patterns = patterns_result.patterns[:self.max_patterns]
# Execute ripgrep for each pattern (tool time - NOT counted)
all_matches: List[RipgrepMatch] = []
patterns_used = []
for pattern in patterns:
rg_start = time.time()
matches = self._run_ripgrep(pattern, repo_path, path)
ripgrep_time += (time.time() - rg_start) * 1000
all_matches.extend(matches)
if matches:
patterns_used.append(pattern.pattern)
# Aggregate matches into snippets (verification - counted)
items = self._aggregate_matches(all_matches)
# Deduplicate (verification - counted)
items = self._deduplicate_items(items)
# Limit results
items = items[:self.max_total_items]
total_time = (time.time() - start_time) * 1000
llm_time = total_time - ripgrep_time # LLM + verification (excludes ripgrep)
return SearchResult(
items=items,
patterns_used=patterns_used,
execution_time_ms=llm_time, # Per task spec: excludes grep time
total_time_ms=total_time,
tool_time_ms=ripgrep_time,
)
except Exception as e:
total_time = (time.time() - start_time) * 1000
llm_time = total_time - ripgrep_time
return SearchResult(
items=[],
execution_time_ms=llm_time,
total_time_ms=total_time,
tool_time_ms=ripgrep_time,
error=str(e),
)