from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
from .config import BridgeConfig
from .lsp_client import LSPClient, LSPError
from .workspace import detect_workspace_root, path_to_uri
class LSPManager:
"""Routes files to the right language server and manages LSPClient instances.
A single repo/workspace may have multiple language servers running in parallel.
We scope servers by (language_key, workspace_root) to avoid cross-project index contamination.
"""
def __init__(self, config: BridgeConfig):
self.config = config
self.base_dir = Path(config.default_root).expanduser().resolve() if config.default_root else Path.cwd().resolve()
self._clients: dict[tuple[str, str], LSPClient] = {} # (server_key, root) -> client
self._lock = asyncio.Lock()
def resolve_path(self, file_path: str | Path, base_dir: Path | None = None) -> Path:
p = Path(file_path).expanduser()
if p.is_absolute():
return p.resolve()
base = base_dir or self.base_dir
return (base / p).resolve()
def _pick_server_key(self, file_path: Path) -> str | None:
ext = file_path.suffix.lower().lstrip(".")
return self.config.server_for_extension(ext)
def _language_id(self, server_key: str, file_path: Path) -> str:
ext = file_path.suffix.lower().lstrip(".")
cfg = self.config.servers[server_key]
if cfg.language_id_map and ext in cfg.language_id_map:
return cfg.language_id_map[ext]
if cfg.language_id:
return cfg.language_id
# Reasonable fallback: use server key
return server_key
async def get_client_for_file(
self, file_path: str | Path, workspace_root: str | Path | None = None
) -> tuple[LSPClient, str, str]:
"""Return (client, uri, language_id) for a file."""
root_override = Path(workspace_root).expanduser().resolve() if workspace_root else None
base_dir = root_override or self.base_dir
path = self.resolve_path(file_path, base_dir=base_dir)
if not path.exists():
raise FileNotFoundError(str(path))
server_key = self._pick_server_key(path)
if not server_key:
raise LSPError(f"No language server configured for *.{path.suffix.lstrip('.')} (path={path})")
language_id = self._language_id(server_key, path)
if root_override:
if not root_override.exists():
raise FileNotFoundError(str(root_override))
root = root_override
else:
root = detect_workspace_root(path, default_root=str(self.base_dir))
cache_key = (server_key, str(root))
async with self._lock:
client = self._clients.get(cache_key)
if client is None:
server_cfg = self.config.servers[server_key]
client = LSPClient(
name=f"{server_key}@{root.name}",
command=server_cfg.command,
args=server_cfg.args,
root_path=root,
initialization_options=server_cfg.initialization_options,
settings=server_cfg.settings,
env=server_cfg.env,
)
self._clients[cache_key] = client
await client.start()
uri = path_to_uri(path)
return client, uri, language_id
async def sync_file(
self, file_path: str | Path, workspace_root: str | Path | None = None
) -> tuple[LSPClient, str, str]:
client, uri, language_id = await self.get_client_for_file(
file_path, workspace_root=workspace_root
)
base_dir = Path(workspace_root).expanduser().resolve() if workspace_root else None
path = self.resolve_path(file_path, base_dir=base_dir)
text = path.read_text(encoding="utf-8", errors="replace")
await client.sync_document(uri=uri, language_id=language_id, text=text)
return client, uri, language_id
async def shutdown(self) -> None:
async with self._lock:
clients = list(self._clients.values())
self._clients.clear()
await asyncio.gather(*(c.stop() for c in clients), return_exceptions=True)
def status(self) -> dict[str, Any]:
out: dict[str, Any] = {"servers": []}
for (server_key, root), client in self._clients.items():
out["servers"].append(
{
"language": server_key,
"workspace_root": root,
"running": client._proc is not None and client._proc.returncode is None,
}
)
return out