from __future__ import annotations
import argparse
import difflib
import os
import re
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from .config import load_config
from .lsp_manager import LSPManager
from .workspace import uri_to_path
mcp = FastMCP(
"codex-lsp-bridge",
instructions=(
"Expose Language Server Protocol (LSP) capabilities (definition, typeDefinition, references, hover, "
"signatureHelp, symbols, codeAction, formatting, callHierarchy, rename) to Codex CLI via MCP. "
"Use these tools for semantic navigation and refactoring instead of text search."
),
)
_manager: LSPManager | None = None
def get_manager() -> LSPManager:
global _manager
if _manager is not None:
return _manager
config_path = os.getenv("CODEX_LSP_BRIDGE_CONFIG")
cfg = load_config(config_path)
_manager = LSPManager(cfg)
return _manager
def _as_path(uri: str) -> str:
try:
return str(uri_to_path(uri))
except Exception:
return ""
def _normalize_range(r: Any) -> dict[str, Any] | None:
if not isinstance(r, dict):
return None
s = r.get("start")
e = r.get("end")
if not isinstance(s, dict) or not isinstance(e, dict):
return None
return {
"start": {"line": s.get("line"), "character": s.get("character")},
"end": {"line": e.get("line"), "character": e.get("character")},
}
def _normalize_location(obj: Any) -> dict[str, Any] | None:
"""Normalize Location or LocationLink to a common shape."""
if not isinstance(obj, dict):
return None
# Location
if "uri" in obj and "range" in obj:
uri = obj.get("uri")
if not isinstance(uri, str):
return None
return {
"uri": uri,
"path": _as_path(uri),
"range": _normalize_range(obj.get("range")),
}
# LocationLink
if "targetUri" in obj:
uri = obj.get("targetUri")
if not isinstance(uri, str):
return None
return {
"uri": uri,
"path": _as_path(uri),
"range": _normalize_range(obj.get("targetRange")),
"selectionRange": _normalize_range(obj.get("targetSelectionRange")),
"originSelectionRange": _normalize_range(obj.get("originSelectionRange")),
}
return None
def _normalize_locations(result: Any) -> list[dict[str, Any]]:
if result is None:
return []
if isinstance(result, list):
out: list[dict[str, Any]] = []
for item in result:
loc = _normalize_location(item)
if loc:
out.append(loc)
return out
loc = _normalize_location(result)
return [loc] if loc else []
def _extract_hover_markdown(hover_obj: Any) -> str:
if not isinstance(hover_obj, dict):
return ""
contents = hover_obj.get("contents")
# MarkupContent
if isinstance(contents, dict) and "value" in contents:
return str(contents.get("value", ""))
# MarkedString[]
if isinstance(contents, list):
parts = []
for item in contents:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict) and "value" in item:
parts.append(str(item.get("value", "")))
return "\n\n".join(parts)
# string
if isinstance(contents, str):
return contents
return ""
_IDENTIFIER_RE = re.compile(r"[A-Za-z_][A-Za-z0-9_]*")
def _read_file_text(mgr: LSPManager, file_path: str, workspace_root: str | None) -> str | None:
base_dir = Path(workspace_root).expanduser().resolve() if workspace_root else None
try:
path = mgr.resolve_path(file_path, base_dir=base_dir)
return path.read_text(encoding="utf-8", errors="replace")
except Exception:
return None
def _fuzzy_positions(
text: str, line: int, column: int, line_radius: int = 1, max_positions: int = 5
) -> list[tuple[int, int]]:
lines = text.splitlines()
if not lines:
return [(line, column)]
target_line = min(max(line, 0), len(lines) - 1)
start = max(0, target_line - line_radius)
end = min(len(lines), target_line + line_radius + 1)
scored: list[tuple[int, int, int]] = []
for i in range(start, end):
line_text = lines[i]
for match in _IDENTIFIER_RE.finditer(line_text):
start_col, end_col = match.span()
if i == target_line and start_col <= column <= end_col:
col_dist = 0
else:
col_center = (start_col + end_col) // 2
col_dist = abs(column - col_center)
line_dist = abs(i - target_line)
score = line_dist * 100 + col_dist
scored.append((score, i, start_col))
scored.sort()
positions = [(line, column)]
for _, i, col in scored:
pos = (i, col)
if pos not in positions:
positions.append(pos)
if len(positions) >= max_positions:
break
return positions
async def _try_positions(
positions: list[tuple[int, int]],
call,
is_empty,
) -> tuple[Any, tuple[int, int] | None]:
last_error: Exception | None = None
for line, column in positions:
try:
result = await call(line, column)
except Exception as exc:
last_error = exc
continue
if not is_empty(result):
return result, (line, column)
if last_error:
raise last_error
return None, None
def _symbol_kind_map() -> dict[str, int]:
return {
"file": 1,
"module": 2,
"namespace": 3,
"package": 4,
"class": 5,
"method": 6,
"property": 7,
"field": 8,
"constructor": 9,
"enum": 10,
"interface": 11,
"function": 12,
"variable": 13,
"constant": 14,
"string": 15,
"number": 16,
"boolean": 17,
"array": 18,
"object": 19,
"key": 20,
"null": 21,
"enum_member": 22,
"struct": 23,
"event": 24,
"operator": 25,
"type_parameter": 26,
}
def _score_symbol_name(query: str, candidate: str) -> int:
q = query.strip()
c = candidate.strip()
if not q or not c:
return 0
if q == c:
return 100
if q.lower() == c.lower():
return 95
if c.startswith(q):
return 90
if c.lower().startswith(q.lower()):
return 85
if q.lower() in c.lower():
return 75
# Token-aware fuzzy boost (handles Foo#bar style queries).
tokens = [t for t in re.split(r"[^A-Za-z0-9_]+", q.lower()) if t]
if tokens and all(t in c.lower() for t in tokens):
return 70
ratio = difflib.SequenceMatcher(None, q.lower(), c.lower()).ratio()
return int(ratio * 60)
def _flatten_document_symbols(symbols: Any, *, uri: str) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
def walk(items: list[Any]) -> None:
for item in items:
if not isinstance(item, dict):
continue
name = item.get("name")
kind = item.get("kind")
if isinstance(item.get("location"), dict):
loc = _normalize_location(item.get("location"))
else:
loc = {
"uri": uri,
"path": _as_path(uri),
"range": _normalize_range(item.get("range")),
}
out.append(
{
"name": name,
"kind": kind,
"location": loc,
"containerName": item.get("containerName"),
}
)
children = item.get("children")
if isinstance(children, list):
walk(children)
if isinstance(symbols, list):
walk(symbols)
return out
@mcp.tool()
async def go_to_definition(
file_path: str,
line: int,
column: int,
workspace_root: str | None = None,
fuzzy: bool = False,
fuzzy_radius: int = 1,
) -> dict[str, Any]:
"""Jump to the definition of the symbol at (line, column) in file_path.
line/column are 0-indexed.
Returns:
{"locations": [{"path": "/abs/file", "range": {...}, ...}, ...]}
"""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
positions = [(line, column)]
if fuzzy:
text = _read_file_text(mgr, file_path, workspace_root)
if text:
positions = _fuzzy_positions(text, line, column, line_radius=fuzzy_radius)
result, used = await _try_positions(
positions,
lambda l, c: client.definition(uri=uri, line=l, character=c),
lambda r: len(_normalize_locations(r)) == 0,
)
payload: dict[str, Any] = {
"language_id": language_id,
"locations": _normalize_locations(result),
}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
if used:
payload["fuzzy_selected_position"] = {"line": used[0], "column": used[1]}
return payload
@mcp.tool()
async def type_definition(
file_path: str,
line: int,
column: int,
workspace_root: str | None = None,
fuzzy: bool = False,
fuzzy_radius: int = 1,
) -> dict[str, Any]:
"""Jump to the type definition of the symbol at (line, column) in file_path.
line/column are 0-indexed.
"""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
positions = [(line, column)]
if fuzzy:
text = _read_file_text(mgr, file_path, workspace_root)
if text:
positions = _fuzzy_positions(text, line, column, line_radius=fuzzy_radius)
result, used = await _try_positions(
positions,
lambda l, c: client.type_definition(uri=uri, line=l, character=c),
lambda r: len(_normalize_locations(r)) == 0,
)
payload: dict[str, Any] = {
"language_id": language_id,
"locations": _normalize_locations(result),
}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
if used:
payload["fuzzy_selected_position"] = {"line": used[0], "column": used[1]}
return payload
@mcp.tool()
async def find_references(
file_path: str,
line: int,
column: int,
include_declaration: bool = True,
workspace_root: str | None = None,
fuzzy: bool = False,
fuzzy_radius: int = 1,
) -> dict[str, Any]:
"""Find references to the symbol at (line, column) in file_path.
Returns:
{"references": [{"path": "/abs/file", "range": {...}}, ...]}
"""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
positions = [(line, column)]
if fuzzy:
text = _read_file_text(mgr, file_path, workspace_root)
if text:
positions = _fuzzy_positions(text, line, column, line_radius=fuzzy_radius)
result, used = await _try_positions(
positions,
lambda l, c: client.references(
uri=uri,
line=l,
character=c,
include_declaration=include_declaration,
),
lambda r: len(_normalize_locations(r)) == 0,
)
payload: dict[str, Any] = {
"language_id": language_id,
"references": _normalize_locations(result),
}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
if used:
payload["fuzzy_selected_position"] = {"line": used[0], "column": used[1]}
return payload
@mcp.tool()
async def hover(
file_path: str,
line: int,
column: int,
workspace_root: str | None = None,
fuzzy: bool = False,
fuzzy_radius: int = 1,
) -> dict[str, Any]:
"""Get hover/type information for the symbol at (line, column)."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
positions = [(line, column)]
if fuzzy:
text = _read_file_text(mgr, file_path, workspace_root)
if text:
positions = _fuzzy_positions(text, line, column, line_radius=fuzzy_radius)
def is_empty(result: Any) -> bool:
if not isinstance(result, dict):
return True
if _extract_hover_markdown(result):
return False
return result.get("range") is None
result, used = await _try_positions(
positions,
lambda l, c: client.hover(uri=uri, line=l, character=c),
is_empty,
)
payload: dict[str, Any] = {
"language_id": language_id,
"markdown": _extract_hover_markdown(result),
"range": _normalize_range(result.get("range")) if isinstance(result, dict) else None,
}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
if used:
payload["fuzzy_selected_position"] = {"line": used[0], "column": used[1]}
return payload
@mcp.tool()
async def signature_help(
file_path: str,
line: int,
column: int,
workspace_root: str | None = None,
fuzzy: bool = False,
fuzzy_radius: int = 1,
) -> dict[str, Any]:
"""Get signature help for the function call at (line, column)."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
positions = [(line, column)]
if fuzzy:
text = _read_file_text(mgr, file_path, workspace_root)
if text:
positions = _fuzzy_positions(text, line, column, line_radius=fuzzy_radius)
def is_empty(result: Any) -> bool:
if not isinstance(result, dict):
return True
sigs = result.get("signatures")
return not sigs
result, used = await _try_positions(
positions,
lambda l, c: client.signature_help(uri=uri, line=l, character=c),
is_empty,
)
payload: dict[str, Any] = {"language_id": language_id, "signatureHelp": result}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
if used:
payload["fuzzy_selected_position"] = {"line": used[0], "column": used[1]}
return payload
@mcp.tool()
async def format_document(
file_path: str,
tab_size: int = 2,
insert_spaces: bool = True,
workspace_root: str | None = None,
) -> dict[str, Any]:
"""Format a document and return LSP TextEdits."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
options = {
"tabSize": tab_size,
"insertSpaces": insert_spaces,
"trimTrailingWhitespace": True,
"insertFinalNewline": True,
"trimFinalNewlines": True,
}
result = await client.format_document(uri=uri, options=options)
return {"language_id": language_id, "edits": result or []}
@mcp.tool()
async def code_action(
file_path: str,
line: int,
column: int,
end_line: int | None = None,
end_column: int | None = None,
only: str | None = None,
workspace_root: str | None = None,
) -> dict[str, Any]:
"""Get LSP code actions for a range (defaults to a single position)."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
if end_line is None:
end_line = line
if end_column is None:
end_column = column
only_list = [only] if only else None
result = await client.code_action(
uri=uri,
start_line=line,
start_character=column,
end_line=end_line,
end_character=end_column,
only=only_list,
)
return {"language_id": language_id, "actions": result or []}
@mcp.tool()
async def call_hierarchy(
file_path: str,
line: int,
column: int,
direction: str = "both",
workspace_root: str | None = None,
fuzzy: bool = False,
fuzzy_radius: int = 1,
) -> dict[str, Any]:
"""Return incoming/outgoing call hierarchy for the symbol at (line, column)."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
dir_norm = direction.strip().lower()
if dir_norm not in {"incoming", "outgoing", "both"}:
raise ValueError("direction must be one of: incoming, outgoing, both")
positions = [(line, column)]
if fuzzy:
text = _read_file_text(mgr, file_path, workspace_root)
if text:
positions = _fuzzy_positions(text, line, column, line_radius=fuzzy_radius)
items, used = await _try_positions(
positions,
lambda l, c: client.prepare_call_hierarchy(uri=uri, line=l, character=c),
lambda r: not r,
)
if not items:
payload = {"language_id": language_id, "items": []}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
return payload
results: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
entry: dict[str, Any] = {"item": item}
if dir_norm in {"incoming", "both"}:
entry["incoming"] = await client.call_hierarchy_incoming(item=item)
if dir_norm in {"outgoing", "both"}:
entry["outgoing"] = await client.call_hierarchy_outgoing(item=item)
results.append(entry)
payload: dict[str, Any] = {"language_id": language_id, "items": results}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
if used:
payload["fuzzy_selected_position"] = {"line": used[0], "column": used[1]}
return payload
@mcp.tool()
async def document_symbols(file_path: str, workspace_root: str | None = None) -> dict[str, Any]:
"""List symbols (classes/functions/variables) in a file."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
result = await client.document_symbols(uri=uri)
return {"language_id": language_id, "symbols": result or []}
@mcp.tool()
async def workspace_symbols(
query: str, max_results: int = 200, workspace_root: str | None = None
) -> dict[str, Any]:
"""Search symbols across all active workspaces.
Note: This searches only across LSP clients that have been started (e.g., by opening/syncing
at least one file for that language/workspace). This keeps startup cheap.
"""
mgr = get_manager()
results: list[Any] = []
root_filter = None
if workspace_root:
root_filter = str(Path(workspace_root).expanduser().resolve())
# Query all active clients (optionally filtered by workspace_root).
for (_, root), client in list(mgr._clients.items()):
if root_filter and root != root_filter:
continue
try:
partial = await client.workspace_symbols(query=query)
if isinstance(partial, list):
results.extend(partial)
except Exception:
continue
if max_results and len(results) > max_results:
results = results[:max_results]
return {"query": query, "results": results}
@mcp.tool()
async def resolve_symbol(
symbol: str,
file_path: str | None = None,
max_results: int = 20,
kind: str | None = None,
workspace_root: str | None = None,
) -> dict[str, Any]:
"""Find symbols by name with fuzzy matching.
If file_path is provided, searches document symbols. Otherwise searches active workspaces.
"""
mgr = get_manager()
kind_filter = None
if kind:
kind_filter = _symbol_kind_map().get(kind.strip().lower())
candidates: list[dict[str, Any]] = []
if file_path:
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
symbols = await client.document_symbols(uri=uri)
candidates = _flatten_document_symbols(symbols or [], uri=uri)
if kind_filter:
candidates = [c for c in candidates if c.get("kind") == kind_filter]
results = []
for item in candidates:
name = str(item.get("name") or "")
score = _score_symbol_name(symbol, name)
if score <= 0:
continue
results.append({**item, "score": score})
results.sort(key=lambda r: r.get("score", 0), reverse=True)
return {
"scope": "document",
"language_id": language_id,
"symbol": symbol,
"results": results[:max_results],
"best": results[0] if results else None,
}
# Workspace search across active clients.
root_filter = None
if workspace_root:
root_filter = str(Path(workspace_root).expanduser().resolve())
for (_, root), client in list(mgr._clients.items()):
if root_filter and root != root_filter:
continue
try:
partial = await client.workspace_symbols(query=symbol)
if isinstance(partial, list):
for item in partial:
if not isinstance(item, dict):
continue
name = str(item.get("name") or "")
score = _score_symbol_name(symbol, name)
if score <= 0:
continue
loc = _normalize_location(item.get("location"))
entry = {
"name": name,
"kind": item.get("kind"),
"location": loc,
"containerName": item.get("containerName"),
"score": score,
}
if kind_filter and entry.get("kind") != kind_filter:
continue
candidates.append(entry)
except Exception:
continue
candidates.sort(key=lambda r: r.get("score", 0), reverse=True)
return {
"scope": "workspace",
"symbol": symbol,
"results": candidates[:max_results],
"best": candidates[0] if candidates else None,
}
@mcp.tool()
async def diagnostics(file_path: str, workspace_root: str | None = None) -> dict[str, Any]:
"""Return the latest diagnostics the language server published for this file."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
return {
"language_id": language_id,
"uri": uri,
"diagnostics": client.latest_diagnostics(uri=uri),
}
@mcp.tool()
async def wait_for_diagnostics(
file_path: str,
timeout_sec: float = 5.0,
workspace_root: str | None = None,
) -> dict[str, Any]:
"""Wait for the next diagnostics publish for this file (or until timeout)."""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
diagnostics, updated = await client.await_diagnostics(uri=uri, timeout_sec=timeout_sec)
return {
"language_id": language_id,
"uri": uri,
"diagnostics": diagnostics,
"updated": updated,
"timed_out": not updated,
}
@mcp.tool()
async def rename_symbol(
file_path: str,
line: int,
column: int,
new_name: str,
workspace_root: str | None = None,
fuzzy: bool = False,
fuzzy_radius: int = 1,
) -> dict[str, Any]:
"""Request an LSP rename for the symbol at (line, column).
Returns an LSP WorkspaceEdit (or null) that Codex can apply.
"""
mgr = get_manager()
client, uri, language_id = await mgr.sync_file(file_path, workspace_root=workspace_root)
positions = [(line, column)]
if fuzzy:
text = _read_file_text(mgr, file_path, workspace_root)
if text:
positions = _fuzzy_positions(text, line, column, line_radius=fuzzy_radius)
def is_empty(result: Any) -> bool:
return not isinstance(result, dict)
result, used = await _try_positions(
positions,
lambda l, c: client.rename(uri=uri, line=l, character=c, new_name=new_name),
is_empty,
)
payload: dict[str, Any] = {"language_id": language_id, "workspaceEdit": result}
if fuzzy:
payload["fuzzy_attempted_positions"] = [
{"line": l, "column": c} for (l, c) in positions
]
if used:
payload["fuzzy_selected_position"] = {"line": used[0], "column": used[1]}
return payload
@mcp.tool()
async def lsp_bridge_status() -> dict[str, Any]:
"""Return basic status: which language servers are currently running."""
mgr = get_manager()
return mgr.status()
def cli() -> None:
parser = argparse.ArgumentParser(prog="codex-lsp-bridge")
sub = parser.add_subparsers(dest="cmd", required=False)
serve = sub.add_parser("serve", help="Run the MCP server")
serve.add_argument("--config", default=os.getenv("CODEX_LSP_BRIDGE_CONFIG"), help="Path to TOML config")
serve.add_argument("--host", default=os.getenv("CODEX_LSP_BRIDGE_HOST", "127.0.0.1"))
serve.add_argument("--port", type=int, default=int(os.getenv("CODEX_LSP_BRIDGE_PORT", "8000")))
serve.add_argument(
"--transport",
default=os.getenv("CODEX_LSP_BRIDGE_TRANSPORT", "http"),
choices=["http", "stdio"],
help="MCP transport: http (Streamable HTTP) or stdio",
)
args = parser.parse_args()
cmd = args.cmd or "serve"
if cmd == "serve":
if args.config:
os.environ["CODEX_LSP_BRIDGE_CONFIG"] = str(Path(args.config).expanduser())
# Instantiate manager eagerly to catch config errors early.
get_manager()
if args.transport == "http":
# FastMCP docs: `mcp.run(transport=\"http\")` exposes /mcp as the endpoint.
mcp.run(transport="http", host=args.host, port=args.port)
else:
mcp.run(transport="stdio")
if __name__ == "__main__":
cli()