"""
UltimateSearcher - The best of all approaches combined.
COMBINES:
1. From TreeSearcher: Multiple hypotheses + beam search
2. From IterativeSearcher: Multi-hop search + pattern mining + deep search
3. From ReasoningSearcher: Chain-of-thought + function expansion
4. Cross-hypothesis learning: patterns discovered in one branch help others
ALGORITHM:
┌─────────────────────────────────────────────────────────────────────┐
│ QUERY: "context data corrupted in goroutine" │
└───────────────────────┬─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ PHASE 1: Generate 3-4 diverse hypotheses │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Object │ │ Race │ │ Context │ │ Shallow │ │
│ │ Pooling │ │ Condition│ │ Lifecycle│ │ Copy │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼────────────┼────────────┼────────────┼──────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ PHASE 2: Parallel initial search for each hypothesis │
│ Patterns: pool,reset mutex,lock Done,cancel Copy,Clone │
│ Results merged into shared pattern pool │
└───────────────────────┬─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ PHASE 3: Pattern mining from ALL results (cross-hypothesis) │
│ - Extract classes, methods, imports │
│ - Generate usage patterns: sync.Pool → pool.Get/Put │
│ - Rank hypotheses by score, keep top N (beam search) │
└───────────────────────┬─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ PHASE 4: Deep search in top files │
│ - For each priority file, search mined patterns │
│ - Expand snippets to full functions │
└───────────────────────┬─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ PHASE 5: Final merge & rank │
│ - Merge overlapping snippets │
│ - Score by: pattern count, file type, location, hypothesis score │
│ - Return top K │
└─────────────────────────────────────────────────────────────────────┘
"""
from __future__ import annotations
from dotenv import load_dotenv
load_dotenv()
import json
import logging
import os
import re
import subprocess
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from .base import BaseSearcher, SearchItem, SearchResult
logger = logging.getLogger(__name__)
# =============================================================================
# Data Models
# =============================================================================
class Hypothesis(BaseModel):
"""A hypothesis about the root cause."""
name: str = Field(description="Short name")
explanation: str = Field(description="Why this could be the cause")
confidence: float = Field(default=0.5, description="0.0-1.0 likelihood")
patterns: List[str] = Field(default_factory=list, description="Search patterns")
file_hints: List[str] = Field(default_factory=list, description="Expected file/dir names")
class HypothesisSet(BaseModel):
"""Multiple hypotheses from analysis."""
problem_category: str = Field(description="Category: pooling, lifecycle, concurrency, etc.")
hypotheses: List[Hypothesis] = Field(description="3-4 diverse hypotheses")
@dataclass
class Snippet:
"""A code snippet with metadata."""
file_path: str
content: str
line_start: int
line_end: int
patterns: Set[str] = field(default_factory=set)
hypotheses: Set[str] = field(default_factory=set)
score: float = 0.0
@dataclass
class SearchState:
"""Shared state across all search phases."""
searched_patterns: Set[str] = field(default_factory=set)
found_files: Dict[str, int] = field(default_factory=dict) # file -> snippet count
snippets: List[Snippet] = field(default_factory=list)
mined_patterns: Set[str] = field(default_factory=set)
hypothesis_scores: Dict[str, float] = field(default_factory=dict)
# =============================================================================
# Prompts
# =============================================================================
ANALYSIS_PROMPT = """Analyze this developer problem and generate DIVERSE hypotheses.
PROBLEM:
"{query}"
Generate 3-4 DIFFERENT hypotheses about the root cause.
Each should explore a DIFFERENT technical angle:
- Architecture issues (pooling, caching, lifecycle)
- Concurrency issues (races, locks, channels)
- Data flow issues (copying, references, scope)
- Framework patterns (middleware, handlers, pipelines)
For PATTERNS - RULES:
1. Use PLAIN TEXT only (no regex)
2. SHORT patterns (5-15 chars) that appear in actual code
3. Mix of:
- Method calls: ".Get()", ".Put()", ".Copy()", ".reset()"
- Type patterns: "struct", "class", "type"
- Lifecycle: "ServeHTTP", "Handle", "Process"
- Internal: patterns starting with "_" like "_pool", "_chain"
GOOD patterns (universal, work in any codebase):
- "pool.Get", "pool.Put" (pooling lifecycle)
- ".Copy()", ".Clone()" (object duplication)
- ".reset()", ".Reset()" (object reuse)
- "ServeHTTP", "HandleFunc" (HTTP handlers)
- "_middleware", "_handler" (internal state)
BAD patterns (too generic, return too many results):
- Single words: "pool", "handler", "response", "request"
- Very short: "get", "set", "new"
Think about WHAT CODE IMPLEMENTS the behavior, not what USES it."""
# =============================================================================
# Pattern Mining
# =============================================================================
def extract_patterns_from_code(content: str) -> Dict[str, Set[str]]:
"""Extract structural patterns from code."""
patterns = {
"classes": set(),
"functions": set(),
"methods": set(),
"imports": set(),
"calls": set(),
"usage": set(),
}
lines = content.split('\n')
for line in lines:
line_stripped = line.strip()
# Skip comments
if line_stripped.startswith(('//', '#', '*', '/*')):
continue
# Classes/structs/types
for match in re.finditer(r'(?:class|struct|type)\s+(\w+)', line):
name = match.group(1)
if len(name) > 3:
patterns["classes"].add(name)
# Functions/methods
for match in re.finditer(r'(?:func|def|function)\s+(\w+)', line):
name = match.group(1)
if len(name) > 3 and not name.startswith('_'):
patterns["functions"].add(name)
# Go methods: func (x *Type) Method
match = re.search(r'func\s+\(\w+\s+\*?(\w+)\)\s+(\w+)', line)
if match:
patterns["classes"].add(match.group(1))
patterns["methods"].add(match.group(2))
# Method calls: .Method(
for match in re.finditer(r'\.(\w{4,})\s*\(', line):
patterns["calls"].add(match.group(1))
# Imports
for match in re.finditer(r'(?:from|import)\s+["\']?([.\w]+)', line):
parts = match.group(1).split('/')[-1].split('.')
for part in parts:
if len(part) > 3:
patterns["imports"].add(part)
# Pool usage patterns
if 'sync.Pool' in line or re.search(r'\bpool\s*[:=]', line, re.I):
patterns["usage"].add("pool.Get")
patterns["usage"].add("pool.Put")
# Context patterns
if re.search(r'type\s+\w*Context\w*\s+struct', line):
patterns["usage"].add(".Copy()")
patterns["usage"].add(".reset()")
# Middleware patterns
if 'middleware' in line.lower():
patterns["usage"].add("process_request")
patterns["usage"].add("process_response")
patterns["usage"].add("get_response")
return patterns
def generate_follow_up_patterns(content: str, searched: Set[str]) -> List[str]:
"""Generate new search patterns from found code."""
extracted = extract_patterns_from_code(content)
follow_up = []
# Priority order: usage > methods > classes > functions > calls
for category in ["usage", "methods", "classes", "functions", "calls"]:
for pattern in extracted[category]:
if pattern not in searched and len(pattern) >= 4:
follow_up.append(pattern)
return follow_up[:15]
# =============================================================================
# Function Expansion
# =============================================================================
def expand_to_function(content: str, match_line: int) -> Tuple[int, int]:
"""Find function boundaries around a match line."""
lines = content.split('\n')
total_lines = len(lines)
# Go backwards to find function start
start = match_line
brace_count = 0
for i in range(match_line - 1, max(0, match_line - 100), -1):
line = lines[i] if i < total_lines else ""
brace_count += line.count('}') - line.count('{')
# Check for function definition
if re.match(r'^(func|def|function|class)\s', line.strip()):
start = i + 1
break
if re.match(r'^func\s+\(', line): # Go method
start = i + 1
break
# Go forwards to find function end
end = match_line
brace_count = 0
in_func = False
for i in range(start - 1, min(total_lines, match_line + 100)):
line = lines[i]
if '{' in line:
in_func = True
brace_count += line.count('{') - line.count('}')
if in_func and brace_count <= 0:
end = i + 1
break
else:
end = min(total_lines, match_line + 50)
# Limit size - CRITICAL to avoid huge snippets
if end - start > 80:
center = match_line
start = max(1, center - 40)
end = min(total_lines, center + 40)
return start, end
# =============================================================================
# Main Searcher
# =============================================================================
class UltimateSearcher(BaseSearcher):
"""
Ultimate searcher combining all best techniques:
- Multiple hypotheses (diversity)
- Multi-hop iterative search
- Pattern mining with usage expansion
- Deep search in priority files
- Function context expansion
- Cross-hypothesis learning
"""
def __init__(
self,
model: str = "claude-sonnet-4-20250514",
num_hypotheses: int = 4,
search_iterations: int = 2,
patterns_per_hypothesis: int = 8,
beam_width: int = 3, # Keep top N hypotheses
max_results: int = 10,
context_lines: int = 30,
expand_functions: bool = True,
parallel_workers: int = 4,
verbose: bool = False,
):
self.model = model
self.num_hypotheses = num_hypotheses
self.search_iterations = search_iterations
self.patterns_per_hypothesis = patterns_per_hypothesis
self.beam_width = beam_width
self.max_results = max_results
self.context_lines = context_lines
self.expand_functions = expand_functions
self.parallel_workers = parallel_workers
self.verbose = verbose
api_key = os.getenv("CLAUDE_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("CLAUDE_API_KEY or ANTHROPIC_API_KEY required")
self.llm = ChatAnthropic(model=model, api_key=api_key, max_tokens=4096)
@property
def name(self) -> str:
return f"UltimateSearcher ({self.model})"
def _log(self, msg: str, indent: int = 0) -> None:
if self.verbose:
print(f"[Ultimate] {' ' * indent}{msg}")
logger.debug(msg)
# =========================================================================
# Phase 1: Generate Hypotheses
# =========================================================================
def _generate_hypotheses(self, query: str) -> Tuple[str, List[Hypothesis]]:
"""Generate diverse hypotheses about the problem."""
prompt = ChatPromptTemplate.from_messages([
("system", ANALYSIS_PROMPT),
("human", "Generate diverse hypotheses for this problem."),
])
chain = prompt | self.llm.with_structured_output(HypothesisSet)
result = chain.invoke({"query": query})
# Sort by confidence
hypotheses = sorted(result.hypotheses, key=lambda h: -h.confidence)
self._log(f"Category: {result.problem_category}")
self._log(f"Generated {len(hypotheses)} hypotheses:")
for h in hypotheses[:self.num_hypotheses]:
self._log(f" [{h.confidence:.1f}] {h.name}", 1)
return result.problem_category, hypotheses[:self.num_hypotheses]
# =========================================================================
# Phase 2: Search Execution
# =========================================================================
def _search_pattern(
self,
pattern: str,
repo_path: str,
state: SearchState,
) -> List[Snippet]:
"""Search for a single pattern."""
if pattern in state.searched_patterns or len(pattern) < 4:
return []
state.searched_patterns.add(pattern)
cmd = [
"rg", "-F", "-i", "-n",
"-C", str(self.context_lines),
"-m", "5", "--max-count", "25",
"--json", pattern, repo_path,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode not in (0, 1):
return []
return self._parse_output(result.stdout, repo_path, pattern)
except:
return []
def _parse_output(
self,
output: str,
repo_path: str,
pattern: str,
) -> List[Snippet]:
"""Parse ripgrep JSON output."""
file_lines: Dict[str, List[Tuple[int, str]]] = {}
for line in output.split('\n'):
if not line:
continue
try:
data = json.loads(line)
except:
continue
if data.get('type') not in ('match', 'context'):
continue
msg = data.get('data', {})
fp = msg.get('path', {}).get('text', '')
text = msg.get('lines', {}).get('text', '').rstrip('\n')
num = msg.get('line_number', 0)
if not fp or not num:
continue
# Skip non-essential files (but keep core handlers)
fp_lower = fp.lower()
is_test = '_test.' in fp_lower or '/tests/' in fp_lower
is_core = any(d in fp_lower for d in ['/core/', '/handler', '/middleware', '/utils/'])
# Skip tests unless they're in core directories
if is_test and not is_core:
continue
if any(skip in fp_lower for skip in ['/vendor/', '/node_modules/', '/migrations/']):
continue
try:
rel = str(Path(fp).relative_to(repo_path))
except:
rel = fp
if rel not in file_lines:
file_lines[rel] = []
file_lines[rel].append((num, text))
# Convert to snippets
snippets = []
for fp, lines in file_lines.items():
lines.sort(key=lambda x: x[0])
# Group nearby lines
groups = [[]]
for num, text in lines:
if not groups[-1]:
groups[-1].append((num, text))
elif num <= groups[-1][-1][0] + 5:
groups[-1].append((num, text))
else:
groups.append([(num, text)])
for group in groups:
if not group:
continue
content = '\n'.join(t for _, t in group)
snippets.append(Snippet(
file_path=fp,
content=content,
line_start=group[0][0],
line_end=group[-1][0],
patterns={pattern},
))
return snippets
def _search_hypothesis(
self,
hypothesis: Hypothesis,
repo_path: str,
state: SearchState,
) -> float:
"""Search for patterns from a hypothesis. Returns score."""
patterns = hypothesis.patterns[:self.patterns_per_hypothesis]
total_snippets = 0
files_found = set()
# Parallel search
with ThreadPoolExecutor(max_workers=self.parallel_workers) as executor:
futures = {
executor.submit(self._search_pattern, p, repo_path, state): p
for p in patterns
}
for future in as_completed(futures):
pattern = futures[future]
try:
snippets = future.result()
for s in snippets:
s.hypotheses.add(hypothesis.name)
state.snippets.append(s)
files_found.add(s.file_path)
state.found_files[s.file_path] = state.found_files.get(s.file_path, 0) + 1
if snippets:
self._log(f"'{pattern}': {len(snippets)}", 2)
total_snippets += len(snippets)
except Exception as e:
self._log(f"Error: {e}", 2)
# Calculate score
score = len(files_found) * 2 + total_snippets * 0.5
score *= hypothesis.confidence
# Bonus for matching file hints
for fp in files_found:
for hint in hypothesis.file_hints:
if hint.lower() in fp.lower():
score += 2
state.hypothesis_scores[hypothesis.name] = score
return score
# =========================================================================
# Phase 3: Pattern Mining & Cross-Hypothesis Learning
# =========================================================================
def _mine_and_expand_patterns(
self,
state: SearchState,
repo_path: str,
category: str = "",
) -> List[str]:
"""Mine patterns from found code and expand with usage patterns."""
all_content = "\n".join(s.content for s in state.snippets)
# Extract patterns
follow_up = generate_follow_up_patterns(all_content, state.searched_patterns)
# GENERIC pattern expansions based on what we FOUND (not hardcoded names)
# These are universal patterns that work across ANY codebase
# If we found pooling-related code, search for pool lifecycle methods
if "pool" in all_content.lower() or "Pool" in all_content:
generic_pool = [".Get()", ".Put(", ".Reset()", ".reset()"]
for p in generic_pool:
if p not in state.searched_patterns:
follow_up.append(p)
# If we found context/handler code, search for lifecycle methods
if "context" in all_content.lower() or "handler" in all_content.lower():
generic_lifecycle = [".Copy()", ".Clone()", ".Close()", "ServeHTTP"]
for p in generic_lifecycle:
if p not in state.searched_patterns:
follow_up.append(p)
self._log(f"Mined {len(follow_up)} new patterns")
return follow_up[:20]
# =========================================================================
# Phase 4: Deep Search in Priority Files
# =========================================================================
def _deep_search(
self,
state: SearchState,
repo_path: str,
patterns: List[str],
) -> None:
"""Search specific patterns in priority files."""
# Rank files by how many times they appeared
priority_files = sorted(
[f for f in state.found_files if f.endswith(('.go', '.py', '.ts', '.js', '.cpp', '.java', '.qll'))],
key=lambda f: -state.found_files[f]
)[:10]
self._log(f"Deep search in {len(priority_files)} files with {len(patterns)} patterns")
for fp in priority_files:
full_path = os.path.join(repo_path, fp)
if not os.path.exists(full_path):
continue
for pattern in patterns[:10]:
if len(pattern) < 4:
continue
search_key = f"deep:{fp}:{pattern}"
if search_key in state.searched_patterns:
continue
state.searched_patterns.add(search_key)
cmd = ["rg", "-F", "-n", "-C", str(self.context_lines), pattern, full_path]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip():
lines = []
for line in result.stdout.split('\n'):
match = re.match(r'^(\d+)[:-](.*)$', line)
if match:
lines.append((int(match.group(1)), match.group(2)))
if lines:
content = '\n'.join(t for _, t in lines)
snippet = Snippet(
file_path=fp,
content=content,
line_start=lines[0][0],
line_end=lines[-1][0],
patterns={pattern},
hypotheses=set(state.hypothesis_scores.keys()),
)
state.snippets.append(snippet)
self._log(f" Deep: '{pattern}' in {fp}:{lines[0][0]}", 2)
except:
pass
# =========================================================================
# Phase 5: Expand to Functions
# =========================================================================
def _expand_snippets(
self,
snippets: List[Snippet],
repo_path: str,
) -> List[Snippet]:
"""Expand snippets to include full functions."""
expanded = []
for snippet in snippets:
try:
full_path = os.path.join(repo_path, snippet.file_path)
with open(full_path, 'r', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
match_line = (snippet.line_start + snippet.line_end) // 2
start, end = expand_to_function(content, match_line)
# Ensure we include original match
start = min(start, snippet.line_start)
end = max(end, snippet.line_end)
# Add buffer
start = max(1, start - 5)
end = min(len(lines), end + 5)
# Safety limit on content size
if end - start > 150:
center = (snippet.line_start + snippet.line_end) // 2
start = max(1, center - 50)
end = min(len(lines), center + 50)
new_content = '\n'.join(lines[start-1:end])
expanded.append(Snippet(
file_path=snippet.file_path,
content=new_content,
line_start=start,
line_end=end,
patterns=snippet.patterns,
hypotheses=snippet.hypotheses,
score=snippet.score,
))
except:
expanded.append(snippet)
return expanded
# =========================================================================
# Phase 6: Merge & Rank
# =========================================================================
def _merge_and_rank(
self,
state: SearchState,
) -> List[Snippet]:
"""Merge overlapping snippets and rank by relevance."""
# Group by file
by_file: Dict[str, List[Snippet]] = {}
for s in state.snippets:
if s.file_path not in by_file:
by_file[s.file_path] = []
by_file[s.file_path].append(s)
merged = []
for fp, snippets in by_file.items():
snippets.sort(key=lambda s: s.line_start)
current = snippets[0]
current_patterns = set(current.patterns)
current_hypotheses = set(current.hypotheses)
for next_s in snippets[1:]:
if next_s.line_start <= current.line_end + 15:
# Merge
current = Snippet(
file_path=fp,
content=current.content + "\n...\n" + next_s.content,
line_start=current.line_start,
line_end=max(current.line_end, next_s.line_end),
patterns=current_patterns | next_s.patterns,
hypotheses=current_hypotheses | next_s.hypotheses,
)
current_patterns = current.patterns
current_hypotheses = current.hypotheses
else:
merged.append(current)
current = next_s
current_patterns = set(current.patterns)
current_hypotheses = set(current.hypotheses)
merged.append(current)
# Score snippets
for s in merged:
score = 0.0
# Patterns matched (most important)
score += len(s.patterns) * 5
# Hypotheses supported
score += len(s.hypotheses) * 3
# File type preference
fp = s.file_path.lower()
if fp.endswith(('.go', '.py', '.ts', '.js', '.cpp', '.java', '.qll', '.rs')):
score += 10
# Directory preference
if any(d in fp for d in ['/core/', '/src/', '/lib/', '/internal/', '/pkg/']):
score += 5
if any(d in fp for d in ['/handler', '/middleware', '/util']):
score += 3
# Penalties
if '/doc' in fp or '/release' in fp:
score -= 20
if fp.endswith(('.md', '.txt', '.rst')):
score -= 15
if '_test.' in fp or '/tests/' in fp or 'test_' in fp:
score -= 25 # Strong penalty for test files
# Size preference (not too small, not too large)
lines = s.content.count('\n')
if 20 <= lines <= 80:
score += 3
elif lines > 150:
score -= 3
s.score = score
# Sort and return top
merged.sort(key=lambda s: -s.score)
return merged[:self.max_results]
# =========================================================================
# Main Search
# =========================================================================
def search(
self,
query: str,
repo_path: str,
path: Optional[str] = None,
) -> SearchResult:
"""Perform ultimate search combining all techniques."""
start_time = time.time()
try:
repo_path = os.path.abspath(repo_path)
if path:
repo_path = os.path.join(repo_path, path)
state = SearchState()
self._log(f"Query: {query[:80]}...")
self._log(f"Repo: {repo_path}")
# Phase 1: Generate hypotheses
self._log("\n=== Phase 1: Generating hypotheses ===")
category, hypotheses = self._generate_hypotheses(query)
# Phase 2: Initial parallel search
self._log("\n=== Phase 2: Initial search ===")
for h in hypotheses:
self._log(f"Searching: {h.name}", 1)
score = self._search_hypothesis(h, repo_path, state)
self._log(f"Score: {score:.1f}", 1)
# Beam search - keep top hypotheses
top_hypotheses = sorted(
hypotheses,
key=lambda h: -state.hypothesis_scores.get(h.name, 0)
)[:self.beam_width]
self._log(f"\nTop {self.beam_width} hypotheses:")
for h in top_hypotheses:
self._log(f" [{state.hypothesis_scores.get(h.name, 0):.1f}] {h.name}", 1)
# Phase 3: Pattern mining
self._log("\n=== Phase 3: Pattern mining ===")
new_patterns = self._mine_and_expand_patterns(state, repo_path, category)
# Phase 4: Deep search with mined patterns
if new_patterns:
self._log("\n=== Phase 4: Deep search ===")
self._deep_search(state, repo_path, new_patterns)
# Second iteration with mined patterns
if self.search_iterations > 1:
self._log("\n=== Iteration 2: Refined search ===")
for pattern in new_patterns[:10]:
snippets = self._search_pattern(pattern, repo_path, state)
for s in snippets:
s.hypotheses = set(h.name for h in top_hypotheses)
state.snippets.append(s)
if snippets:
self._log(f"'{pattern}': {len(snippets)}", 1)
# Phase 5: Expand to functions
if self.expand_functions:
self._log("\n=== Phase 5: Function expansion ===")
state.snippets = self._expand_snippets(state.snippets, repo_path)
# Phase 6: Merge & rank
self._log("\n=== Phase 6: Merge & rank ===")
final = self._merge_and_rank(state)
self._log(f"\nTotal snippets: {len(state.snippets)}")
self._log(f"Final results: {len(final)}")
for s in final[:5]:
self._log(f" {s.file_path}:{s.line_start}-{s.line_end} (score={s.score:.1f})", 1)
# Convert to result
items = [
SearchItem(
file_path=s.file_path,
content=s.content,
line_start=s.line_start,
line_end=s.line_end,
match_context=f"Hypotheses: {', '.join(list(s.hypotheses)[:2])}",
)
for s in final
]
total_time = (time.time() - start_time) * 1000
return SearchResult(
items=items,
patterns_used=list(state.searched_patterns)[:20],
execution_time_ms=total_time,
total_time_ms=total_time,
)
except Exception as e:
total_time = (time.time() - start_time) * 1000
self._log(f"Error: {e}")
import traceback
self._log(traceback.format_exc())
return SearchResult(
items=[],
execution_time_ms=total_time,
total_time_ms=total_time,
error=str(e),
)
class UltimateSearcherVerbose(UltimateSearcher):
"""UltimateSearcher with verbose logging."""
def __init__(self, **kwargs):
kwargs["verbose"] = True
super().__init__(**kwargs)