Skip to main content
Glama

fast_search

Explore and understand codebases using natural language queries to find relevant files and patterns before applying modifications.

Instructions

Run Agentic Codebase Search over the configured base_dir.

Use this tool to explore and understand the codebase. The search agent will examine files, search for patterns, and report back with relevant files and line ranges for the given query.

Queries can be natural language (e.g., "find where auth is handled") or precise patterns. The agent will autonomously use grep, ls, and file_view tools to investigate.

This is useful before using fast_apply to understand which files need to be modified and how they relate to each other.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes

Implementation Reference

  • The primary handler for the 'fast_search' MCP tool. It is registered via the @mcp.tool decorator within the register_tools function. Resolves the base directory dynamically, detects LSP languages, and delegates to FastAgenticSearchHarness.run_async for the agentic search execution.
    @mcp.tool( annotations={ "readOnlyHint": True, # Does not modify environment "destructiveHint": False, # Read-only = non-destructive "idempotentHint": True, # Same query = same results "openWorldHint": False, # Only local codebase } ) async def fast_search(query: str, ctx: Context) -> dict[str, Any]: """Run Agentic Codebase Search over the configured base_dir. Use this tool to explore and understand the codebase. The search agent will examine files, search for patterns, and report back with relevant files and line ranges for the given query. Queries can be natural language (e.g., "find where auth is handled") or precise patterns. The agent will autonomously use grep, ls, and file_view tools to investigate. This is useful before using fast_apply to understand which files need to be modified and how they relate to each other. """ # Resolve base_dir dynamically from MCP Roots if not configured base_dir, _ = await resolve_base_dir(config.base_dir, ctx) # Get cached LSP languages (auto-detects on first call per base_dir) from ..lsp.languages import get_lsp_languages lsp_languages = get_lsp_languages(Path(base_dir)) effective_config = replace(config, base_dir=base_dir) # Avoid shared mutable state across concurrent calls. return await FastAgenticSearchHarness( effective_config, search_client, lsp_languages=lsp_languages ).run_async(query=query)
  • Core implementation of the agentic search logic used by fast_search. This class orchestrates an LLM-powered agent loop using specialized search tools (grep, glob, view_file, lsp, etc.), tracks observed file ranges, manages context truncation, and produces the final output with explanation and relevant code locations upon report_back tool call.
    class FastAgenticSearchHarness(ObservedFilesMixin, MessageHistoryMixin, ToolCallsMixin): """Fast Agentic Search Agent Harness. Responsible for executing the relace-search model's agent loop, processing tool calls and terminating upon receiving report_back. """ def __init__( self, config: RelaceConfig, client: SearchLLMClient, *, lsp_languages: frozenset[str] | None = None, ) -> None: self._config = config self._client = client self._observed_files: dict[str, list[list[int]]] = {} self._view_line_re = re.compile(r"^(\d+)\s") self._lsp_languages = lsp_languages if lsp_languages is not None else frozenset() # Select base prompts based on API compatibility mode if client.api_compat == RELACE_PROVIDER: base_prompt = SYSTEM_PROMPT self._user_prompt_template = USER_PROMPT_TEMPLATE self._turn_hint_template = TURN_HINT_TEMPLATE self._turn_instructions = TURN_INSTRUCTIONS else: base_prompt = SYSTEM_PROMPT_OPENAI self._user_prompt_template = USER_PROMPT_TEMPLATE_OPENAI self._turn_hint_template = TURN_HINT_TEMPLATE_OPENAI self._turn_instructions = TURN_INSTRUCTIONS_OPENAI # Build dynamic system prompt with LSP language info and enabled tools enabled_tools = self._enabled_tool_names() self._system_prompt = build_system_prompt(base_prompt, self._lsp_languages, enabled_tools) def _get_turn_hint(self, turn: int, max_turns: int, chars_used: int) -> str: """Generate turn status hint. Only shows urgency instruction on final turn. Args: turn: Current turn number (0-indexed internally, displayed as 1-indexed). max_turns: Maximum allowed turns. chars_used: Total characters used in context so far. """ remaining = max_turns - turn mode = "final" if remaining == 1 else "normal" instruction = self._turn_instructions[mode] chars_pct = int((chars_used / MAX_CONTEXT_BUDGET_CHARS) * 100) return self._turn_hint_template.format( turn=turn + 1, max_turns=max_turns, chars_pct=chars_pct, instruction=instruction, ) def run(self, query: str) -> dict[str, Any]: """Execute one Fast Agentic Search. Args: query: User query describing what to search/understand. Returns: Dict containing explanation and files: { "query": str, "explanation": str, "files": {path: [[start, end], ...]}, "turns_used": int, "partial": bool, # optional, True when error or max turns exceeded "error": str, # optional, present when error occurred } Note: This method always returns a dict, never raises exceptions. When errors occur, returns a partial report with error field. """ trace_id = str(uuid.uuid4())[:8] # Safe query truncation (avoid cutting in middle of multi-byte characters) logger.info("[%s] Starting Fast Agentic Search (query_len=%d)", trace_id, len(query)) log_search_start(trace_id, query) start_time = time.perf_counter() # Reset observed_files (used to accumulate explored files) self._observed_files = {} try: result = self._run_search_loop(query, trace_id) total_ms = (time.perf_counter() - start_time) * 1000 log_search_complete( trace_id, result.get("turns_used", 0), len(result.get("files", {})), result.get("partial", False), total_ms, ) return result except Exception as exc: logger.exception("[%s] Search failed with error", trace_id) log_search_error(trace_id, str(exc)) merged_files = self._merge_observed_ranges() return { "query": query, "explanation": f"[ERROR] Search failed: {exc}", "files": merged_files, "turns_used": 0, "partial": True, "error": str(exc), } async def run_async(self, query: str) -> dict[str, Any]: """Execute one Fast Agentic Search asynchronously. Note: This method always returns a dict, never raises exceptions. When errors occur, returns a partial report with error field. """ trace_id = str(uuid.uuid4())[:8] # Safe query truncation (avoid cutting in middle of multi-byte characters) query_preview = query[:100] if len(query) <= 100 else query[:97] + "..." # Sanitize preview for log injection safety (remove newlines and control chars) query_preview = query_preview.replace("\n", " ").replace("\r", " ") logger.info( "[%s] Starting Fast Agentic Search async (query_len=%d, preview=%s)", trace_id, len(query), query_preview, ) log_search_start(trace_id, query) start_time = time.perf_counter() # Reset observed_files (used to accumulate explored files) self._observed_files = {} try: result = await self._run_search_loop_async(query, trace_id) total_ms = (time.perf_counter() - start_time) * 1000 log_search_complete( trace_id, result.get("turns_used", 0), len(result.get("files", {})), result.get("partial", False), total_ms, ) return result except Exception as exc: logger.exception("[%s] Search failed with error", trace_id) log_search_error(trace_id, str(exc)) merged_files = self._merge_observed_ranges() return { "query": query, "explanation": f"[ERROR] Search failed: {exc}", "files": merged_files, "turns_used": 0, "partial": True, "error": str(exc), } def _run_search_loop(self, query: str, trace_id: str) -> dict[str, Any]: """Internal method to execute the search loop.""" messages: list[dict[str, Any]] = [ {"role": "system", "content": self._system_prompt}, {"role": "user", "content": self._user_prompt_template.format(query=query)}, ] for turn in range(_harness_mod.SEARCH_MAX_TURNS): logger.debug( "[%s] Turn %d/%d", trace_id, turn + 1, _harness_mod.SEARCH_MAX_TURNS, ) # Inject unified turn hint (from turn 2 onwards) if turn > 0: chars_for_hint = estimate_context_size(messages) turn_hint = self._get_turn_hint(turn, _harness_mod.SEARCH_MAX_TURNS, chars_for_hint) messages.append({"role": "user", "content": turn_hint}) logger.debug( "[%s] Injected turn hint at turn %d (chars: %d/%d)", trace_id, turn + 1, chars_for_hint, MAX_CONTEXT_BUDGET_CHARS, ) # Check context size AFTER all user messages are added ctx_size = estimate_context_size(messages) if ctx_size > MAX_TOTAL_CONTEXT_CHARS: logger.warning( "[%s] Context size %d exceeds limit %d, truncating old messages", trace_id, ctx_size, MAX_TOTAL_CONTEXT_CHARS, ) # Keep system + user + most recent 6 messages messages = self._truncate_messages(messages) # Ensure tool_calls and tool results are paired correctly self._repair_tool_call_integrity(messages, trace_id) # Track LLM API latency llm_start = time.perf_counter() response = self._client.chat( messages, tools=get_tool_schemas(self._lsp_languages), trace_id=trace_id ) llm_latency_ms = (time.perf_counter() - llm_start) * 1000 # Parse response choices = response.get("choices", []) if not choices: name = self._client._provider_config.display_name raise RuntimeError(f"{name} Search API returned empty choices") message = choices[0].get("message", {}) # Defense: some providers/mocks may lack role, avoid breaking block/repair logic message.setdefault("role", "assistant") tool_calls = message.get("tool_calls") or [] # Extract usage for token tracking usage = response.get("usage") # Log turn state after getting response (includes LLM latency and token usage) log_search_turn( trace_id, turn + 1, _harness_mod.SEARCH_MAX_TURNS, ctx_size, len(tool_calls), llm_latency_ms=llm_latency_ms, usage=usage, ) # If no tool_calls, check for content (model may respond directly) if not tool_calls: content = message.get("content") or "" logger.warning( "[%s] No tool calls in turn %d (content_len=%d)", trace_id, turn + 1, len(content), ) # Add assistant message to context and continue messages.append({"role": "assistant", "content": content}) continue # Add assistant message (with tool_calls) to messages messages.append(self._sanitize_assistant_message(message)) # Execute tool calls in parallel and collect results tool_results, report_back_result = self._execute_tools_parallel( tool_calls, trace_id, turn=turn + 1 ) # Add all tool results to messages (per OpenAI protocol) self._append_tool_results_to_messages(messages, tool_results) # After processing all tool calls, if report_back was called, return if report_back_result is not None: logger.info( "[%s] Search completed in %d turns, found %d files", trace_id, turn + 1, len(report_back_result.get("files", {})), ) return { "query": query, "explanation": report_back_result.get("explanation", ""), "files": self._normalize_report_files(report_back_result.get("files", {})), "turns_used": turn + 1, } # Exceeded limit, return partial report (don't raise) logger.warning( "[%s] Search did not complete within %d turns, returning partial results", trace_id, _harness_mod.SEARCH_MAX_TURNS, ) merged_files = self._merge_observed_ranges() return { "query": query, "explanation": ( f"[PARTIAL] Search did not complete within {_harness_mod.SEARCH_MAX_TURNS} turns. " f"Returning {len(merged_files)} observed files based on exploration." ), "files": merged_files, "turns_used": _harness_mod.SEARCH_MAX_TURNS, "partial": True, } async def _run_search_loop_async(self, query: str, trace_id: str) -> dict[str, Any]: """Internal method to execute the search loop asynchronously.""" messages: list[dict[str, Any]] = [ {"role": "system", "content": self._system_prompt}, {"role": "user", "content": self._user_prompt_template.format(query=query)}, ] loop = asyncio.get_running_loop() # Use an explicit ThreadPoolExecutor for blocking tool execution. with ThreadPoolExecutor(max_workers=1) as executor: for turn in range(_harness_mod.SEARCH_MAX_TURNS): logger.debug( "[%s] Turn %d/%d", trace_id, turn + 1, _harness_mod.SEARCH_MAX_TURNS, ) # Inject unified turn hint (from turn 2 onwards) if turn > 0: chars_for_hint = estimate_context_size(messages) turn_hint = self._get_turn_hint( turn, _harness_mod.SEARCH_MAX_TURNS, chars_for_hint ) messages.append({"role": "user", "content": turn_hint}) logger.debug( "[%s] Injected turn hint at turn %d (chars: %d/%d)", trace_id, turn + 1, chars_for_hint, MAX_CONTEXT_BUDGET_CHARS, ) # Check context size AFTER all user messages are added ctx_size = estimate_context_size(messages) if ctx_size > MAX_TOTAL_CONTEXT_CHARS: logger.warning( "[%s] Context size %d exceeds limit %d, truncating old messages", trace_id, ctx_size, MAX_TOTAL_CONTEXT_CHARS, ) # Keep system + user + most recent 6 messages messages = self._truncate_messages(messages) # Ensure tool_calls and tool results are paired correctly self._repair_tool_call_integrity(messages, trace_id) # Track LLM API latency llm_start = time.perf_counter() response = await self._client.chat_async( messages, tools=get_tool_schemas(self._lsp_languages), trace_id=trace_id ) llm_latency_ms = (time.perf_counter() - llm_start) * 1000 # Parse response choices = response.get("choices", []) if not choices: name = self._client._provider_config.display_name raise RuntimeError(f"{name} Search API returned empty choices") message = choices[0].get("message", {}) # Defense: some providers/mocks may lack role, avoid breaking block/repair logic message.setdefault("role", "assistant") tool_calls = message.get("tool_calls") or [] # Extract usage for token tracking usage = response.get("usage") # Log turn state after getting response (includes LLM latency and token usage) log_search_turn( trace_id, turn + 1, _harness_mod.SEARCH_MAX_TURNS, ctx_size, len(tool_calls), llm_latency_ms=llm_latency_ms, usage=usage, ) # If no tool_calls, check for content (model may respond directly) if not tool_calls: content = message.get("content") or "" logger.warning( "[%s] No tool calls in turn %d (content_len=%d)", trace_id, turn + 1, len(content), ) # Add assistant message to context and continue messages.append({"role": "assistant", "content": content}) continue # Add assistant message (with tool_calls) to messages messages.append(self._sanitize_assistant_message(message)) # Execute tool calls off the event loop to avoid blocking. tool_results, report_back_result = await loop.run_in_executor( executor, self._execute_tools_parallel, tool_calls, trace_id, turn + 1, ) # Add all tool results to messages (per OpenAI protocol) self._append_tool_results_to_messages(messages, tool_results) # After processing all tool calls, if report_back was called, return if report_back_result is not None: logger.info( "[%s] Search completed in %d turns, found %d files", trace_id, turn + 1, len(report_back_result.get("files", {})), ) return { "query": query, "explanation": report_back_result.get("explanation", ""), "files": self._normalize_report_files(report_back_result.get("files", {})), "turns_used": turn + 1, } # Exceeded limit, return partial report (don't raise) logger.warning( "[%s] Search did not complete within %d turns, returning partial results", trace_id, _harness_mod.SEARCH_MAX_TURNS, ) merged_files = self._merge_observed_ranges() return { "query": query, "explanation": ( f"[PARTIAL] Search did not complete within {_harness_mod.SEARCH_MAX_TURNS} turns. " f"Returning {len(merged_files)} observed files based on exploration." ), "files": merged_files, "turns_used": _harness_mod.SEARCH_MAX_TURNS, "partial": True, }
  • The fast_search tool is explicitly listed with its ID in the relace://tools_list MCP resource, confirming its registration and providing metadata.
    "id": "fast_search", "name": "Fast Search", "description": "Agentic search over local codebase", "enabled": True, },
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/possible055/relace-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server