# pyright: reportUnusedFunction=false
# Decorator-registered functions (@mcp.tool, @mcp.resource) are accessed by the framework
import asyncio
import inspect
from contextlib import suppress
from dataclasses import replace
from pathlib import Path
from typing import Annotated, Any
from fastmcp import FastMCP
from fastmcp.server.context import Context
from pydantic import Field
from ..clients import RelaceRepoClient, SearchLLMClient
from ..clients.apply import ApplyLLMClient
from ..config import RelaceConfig, resolve_base_dir
from ..config.settings import AGENTIC_RETRIEVAL_ENABLED, RELACE_CLOUD_TOOLS, RETRIEVAL_BACKEND
from ..repo import (
cloud_info_logic,
cloud_list_logic,
cloud_search_logic,
cloud_sync_logic,
load_sync_state,
)
from .apply import apply_file_logic
from .retrieval import agentic_retrieval_logic
from .search import FastAgenticSearchHarness
__all__ = ["register_tools"]
def register_tools(mcp: FastMCP, config: RelaceConfig) -> None:
"""Register Relace tools to the FastMCP instance."""
apply_backend = ApplyLLMClient(config)
async def _progress_heartbeat(ctx: Context, *, message: str) -> None:
while True:
try:
maybe = ctx.report_progress(progress=0, total=1.0, message=message)
if inspect.isawaitable(maybe):
await maybe
except Exception:
return
await asyncio.sleep(5)
# Agentic Search client (used by both agentic_search and agentic_retrieval)
search_client = SearchLLMClient(config)
@mcp.tool(
annotations={
"readOnlyHint": False, # Modifies files
"destructiveHint": True, # Can overwrite content
"idempotentHint": False, # Same edit twice = different results
"openWorldHint": False, # Only local filesystem
}
)
async def fast_apply(
path: Annotated[
str,
Field(
description="File path (absolute or relative to MCP_BASE_DIR; if MCP_BASE_DIR is unset, "
"relative paths resolve against MCP Roots)."
),
],
edit_snippet: Annotated[
str,
Field(
description="New content. Use placeholders for unchanged parts: "
"`// ... existing code ...` (C/JS/TS), `# ... existing code ...` (Python/shell)."
),
],
instruction: Annotated[
str,
Field(description="Optional hint when edit is ambiguous (e.g., 'add after imports')."),
] = "",
ctx: Context | None = None,
) -> dict[str, Any]:
"""Edit or create a file using intelligent merging.
For new files: writes content directly.
For existing files: merges edit_snippet with current content using anchor lines.
If anchors cannot be located, returns NEEDS_MORE_CONTEXT error—provide complete
file content to fully overwrite, or add context lines to help locate the edit point.
"""
# Resolve base_dir dynamically (aligns with other tools).
# This allows relative paths when MCP_BASE_DIR is not set but MCP Roots are available,
# and provides a consistent security boundary for absolute paths.
progress_task = None
if ctx is not None:
progress_task = asyncio.create_task(
_progress_heartbeat(ctx, message="fast_apply in progress")
)
try:
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
if ctx is not None:
await ctx.info(f"Applying edit to {path}")
result = await apply_file_logic(
backend=apply_backend,
file_path=path,
edit_snippet=edit_snippet,
instruction=instruction or None, # Convert empty string to None internally
base_dir=base_dir,
extra_paths=config.extra_paths,
)
if ctx is not None and result and result.get("status") == "ok":
diff_preview = (result.get("diff") or "")[:200]
await ctx.debug(f"Edit applied: {diff_preview}...")
if result and result.get("status") == "ok":
import shutil as _shutil
from ..repo.backends import (
is_backend_disabled,
schedule_bg_chunkhound_index,
schedule_bg_codanna_index,
)
if _shutil.which("chunkhound") and not is_backend_disabled("chunkhound"):
schedule_bg_chunkhound_index(base_dir)
if _shutil.which("codanna") and not is_backend_disabled("codanna"):
schedule_bg_codanna_index(result.get("path", path), base_dir)
return result
finally:
if progress_task is not None:
progress_task.cancel()
with suppress(asyncio.CancelledError):
await progress_task
# Register agentic_search (always enabled)
@mcp.tool(
annotations={
"readOnlyHint": True, # Does not modify environment
"destructiveHint": False, # Read-only = non-destructive
"idempotentHint": True, # Same query = same results
"openWorldHint": False, # Only local codebase
}
)
async def agentic_search(
query: Annotated[
str,
Field(
description="What to find. Natural language (e.g., 'where is auth handled') "
"or specific patterns (e.g., 'UserService class')."
),
],
ctx: Context,
) -> dict[str, Any]:
"""Search codebase for code locations matching a query.
Finds functions, classes, modules, and traces how components connect.
Accepts natural language or specific patterns like class/function names.
Returns file paths with line ranges and an explanation of findings.
"""
await ctx.info(f"Searching: {query[:100]}")
progress_task = asyncio.create_task(
_progress_heartbeat(ctx, message="agentic_search in progress")
)
try:
# Resolve base_dir dynamically from MCP Roots if not configured
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
# Get cached LSP languages (auto-detects on first call per base_dir)
from ..lsp.languages import get_lsp_languages
lsp_languages = get_lsp_languages(Path(base_dir))
effective_config = replace(config, base_dir=base_dir)
result = await FastAgenticSearchHarness(
effective_config, search_client, lsp_languages=lsp_languages
).run_async(query=query)
files_found = len(result.get("files", {}))
await ctx.debug(f"Search found {files_found} files")
return result
finally:
progress_task.cancel()
with suppress(asyncio.CancelledError):
await progress_task
repo_client: RelaceRepoClient | None = None
# Cloud Repos (Semantic Search & Sync) - only register if enabled
if RELACE_CLOUD_TOOLS:
repo_client = RelaceRepoClient(config)
@mcp.tool(
annotations={
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
}
)
async def cloud_sync(
force: Annotated[
bool, Field(description="Ignore cache, upload all files (default: false).")
] = False,
mirror: Annotated[
bool,
Field(
description="With force=True, delete cloud files not in local (default: false)."
),
] = False,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Upload codebase to Relace Cloud for semantic search.
Syncs git-tracked files to enable cloud_search. Incremental by default—only
uploads changed files. Run once per session before using cloud_search.
Fails if not in a git repository or RELACE_API_KEY is not set.
"""
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
return cloud_sync_logic(repo_client, base_dir, force=force, mirror=mirror)
@mcp.tool(
annotations={
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
}
)
async def cloud_search(
query: Annotated[str, Field(description="Natural language search query.")],
branch: Annotated[
str, Field(description="Branch to search (empty = default branch).")
] = "",
ctx: Context | None = None,
) -> dict[str, Any]:
"""Semantic code search using AI embeddings. Requires cloud_sync first.
Finds code by meaning, not just keywords. Returns ranked results with relevance scores.
"""
# Fixed internal parameters (not exposed to LLM)
score_threshold = 0.3
token_limit = 30000
# Resolve base_dir dynamically from MCP Roots if not configured
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
return cloud_search_logic(
repo_client,
base_dir,
query,
branch=branch,
score_threshold=score_threshold,
token_limit=token_limit,
)
@mcp.tool(
annotations={
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": False,
"openWorldHint": True,
}
)
async def cloud_clear(
confirm: Annotated[bool, Field(description="Must be True to proceed.")] = False,
repo_id: Annotated[
str | None,
Field(
description="Optional repo ID to delete directly (use cloud_list to find). "
"If not provided, deletes the repo associated with current directory."
),
] = None,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Delete cloud repository and local sync state. IRREVERSIBLE.
Removes all indexed data from Relace Cloud. Use cloud_list to find repo IDs.
"""
from ..repo import cloud_clear_logic
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
return cloud_clear_logic(repo_client, base_dir, confirm=confirm, repo_id=repo_id)
@mcp.tool(
annotations={
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
}
)
def cloud_list(
reason: Annotated[
str, Field(description="Why you need this list (helps with debugging).")
] = "",
) -> dict[str, Any]:
"""List all repositories in your Relace Cloud account.
Returns repository IDs, names, and indexing status. Use to find repo_id for cloud_clear.
"""
del reason # LLM chain-of-thought only
return cloud_list_logic(repo_client)
@mcp.tool(
annotations={
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
}
)
async def cloud_info(
reason: Annotated[
str, Field(description="Why you need sync status (helps with debugging).")
] = "",
ctx: Context | None = None,
) -> dict[str, Any]:
"""Check sync status before running cloud_sync.
Shows local git state, last sync info, and whether re-sync is needed.
Helps decide if cloud_sync should be called.
"""
del reason # LLM chain-of-thought only
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
return cloud_info_logic(repo_client, base_dir)
if AGENTIC_RETRIEVAL_ENABLED and RETRIEVAL_BACKEND != "none":
@mcp.tool(
annotations={
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": RETRIEVAL_BACKEND == "relace",
}
)
async def agentic_retrieval(
query: Annotated[
str,
Field(
description="Be SPECIFIC. Examples: "
"❌ 'auth logic' "
"✅ 'function that validates JWT tokens and extracts user ID' "
"❌ 'error handling' "
"✅ 'where HTTP 4xx errors are caught and transformed to user messages'"
),
],
ctx: Context | None = None,
) -> dict[str, Any]:
"""Find code by semantic similarity. Best for conceptual queries.
When you know what behavior you're looking for but not the exact names or keywords.
Returns file paths with line ranges and relevance-ranked results.
"""
progress_task = None
if ctx is not None:
await ctx.info(f"Retrieval: {query[:100]}")
progress_task = asyncio.create_task(
_progress_heartbeat(ctx, message="agentic_retrieval in progress")
)
try:
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
result = await agentic_retrieval_logic(
repo_client,
search_client,
config,
base_dir,
query,
)
if ctx is not None:
files_found = len(result.get("files", {}))
await ctx.debug(f"Retrieval found {files_found} files")
return result
finally:
if progress_task is not None:
progress_task.cancel()
with suppress(asyncio.CancelledError):
await progress_task
# === MCP Resources ===
@mcp.resource("relace://tools_list", mime_type="application/json")
def tools_list() -> list[dict[str, Any]]:
"""List all registered Relace MCP tools with their enabled status.
Returns: [{id, name, description, enabled}, ...] for each tool.
Use this to discover available capabilities before calling tools.
"""
tools = [
{
"id": "fast_apply",
"name": "Fast Apply",
"description": "Edit or create files using fuzzy matching",
"enabled": True,
},
]
# agentic_search is always enabled
tools.append(
{
"id": "agentic_search",
"name": "Agentic Search",
"description": "Agentic search over local codebase",
"enabled": True,
}
)
if RELACE_CLOUD_TOOLS:
tools.extend(
[
{
"id": "cloud_sync",
"name": "Cloud Sync",
"description": "Upload codebase for semantic indexing",
"enabled": True,
},
{
"id": "cloud_search",
"name": "Cloud Search",
"description": "Semantic code search using AI embeddings",
"enabled": True,
},
{
"id": "cloud_clear",
"name": "Cloud Clear",
"description": "Delete cloud repository and sync state",
"enabled": True,
},
{
"id": "cloud_list",
"name": "Cloud List",
"description": "List all repositories in Relace Cloud",
"enabled": True,
},
{
"id": "cloud_info",
"name": "Cloud Info",
"description": "Get sync status for current repository",
"enabled": True,
},
]
)
if AGENTIC_RETRIEVAL_ENABLED and RETRIEVAL_BACKEND != "none":
tools.append(
{
"id": "agentic_retrieval",
"name": "Agentic Retrieval",
"description": "Two-stage semantic + agentic code retrieval",
"enabled": True,
}
)
return tools
if RELACE_CLOUD_TOOLS:
@mcp.resource("relace://cloud/status", mime_type="application/json")
async def cloud_status(ctx: Context | None = None) -> dict[str, Any]:
"""Current cloud sync status - lightweight read from local state file.
Returns sync state without making API calls. Use this to quickly check
if cloud_sync has been run and what the current sync status is.
"""
try:
base_dir, _ = await resolve_base_dir(config.base_dir, ctx)
except RuntimeError:
return {
"synced": False,
"error": "base_dir not configured",
"message": "Set MCP_BASE_DIR or use MCP Roots to enable cloud status",
}
from ..repo import get_repo_identity
local_repo_name, cloud_repo_name, _project_fingerprint = get_repo_identity(base_dir)
if not local_repo_name or not cloud_repo_name:
return {
"synced": False,
"error": "invalid base_dir",
"message": "Cannot derive repository identity from base_dir; ensure MCP_BASE_DIR or MCP Roots points to a project directory.",
}
state = load_sync_state(base_dir)
if state is None:
return {
"synced": False,
"repo_name": local_repo_name,
"cloud_repo_name": cloud_repo_name,
"message": "No sync state found. Run cloud_sync to upload codebase.",
}
return {
"synced": True,
"repo_id": state.repo_id,
"repo_name": state.repo_name or local_repo_name,
"cloud_repo_name": state.cloud_repo_name or cloud_repo_name,
"git_ref": (
f"{state.git_branch}@{state.git_head_sha[:8]}"
if state.git_branch and state.git_head_sha
else state.git_head_sha[:8]
if state.git_head_sha
else ""
),
"files_count": len(state.files),
"skipped_files_count": len(state.skipped_files),
"files_found": state.files_found,
"files_selected": state.files_selected,
"file_limit": state.file_limit,
"files_truncated": state.files_truncated,
"last_sync": state.last_sync,
}