Skip to main content
Glama
taplo_server.py13 kB
""" Provides TOML specific instantiation of the LanguageServer class using Taplo. Contains various configurations and settings specific to TOML files. """ import gzip import hashlib import logging import os import platform import shutil import socket import stat import threading import urllib.request from typing import Any # Download timeout in seconds (prevents indefinite hangs) DOWNLOAD_TIMEOUT_SECONDS = 120 from solidlsp.ls import SolidLanguageServer from solidlsp.ls_config import LanguageServerConfig from solidlsp.ls_utils import PathUtils from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo from solidlsp.settings import SolidLSPSettings log = logging.getLogger(__name__) # Taplo release version and download URLs TAPLO_VERSION = "0.10.0" TAPLO_DOWNLOAD_BASE = f"https://github.com/tamasfe/taplo/releases/download/{TAPLO_VERSION}" # SHA256 checksums for Taplo releases (verified from official GitHub releases) # Source: https://github.com/tamasfe/taplo/releases/tag/0.10.0 # To update: download each release file and run: sha256sum <filename> TAPLO_SHA256_CHECKSUMS: dict[str, str] = { "taplo-windows-x86_64.zip": "1615eed140039bd58e7089109883b1c434de5d6de8f64a993e6e8c80ca57bdf9", "taplo-windows-x86.zip": "b825701daab10dcfc0251e6d668cd1a9c0e351e7f6762dd20844c3f3f3553aa0", "taplo-darwin-x86_64.gz": "898122cde3a0b1cd1cbc2d52d3624f23338218c91b5ddb71518236a4c2c10ef2", "taplo-darwin-aarch64.gz": "713734314c3e71894b9e77513c5349835eefbd52908445a0d73b0c7dc469347d", "taplo-linux-x86_64.gz": "8fe196b894ccf9072f98d4e1013a180306e17d244830b03986ee5e8eabeb6156", "taplo-linux-aarch64.gz": "033681d01eec8376c3fd38fa3703c79316f5e14bb013d859943b60a07bccdcc3", "taplo-linux-armv7.gz": "6b728896afe2573522f38b8e668b1ff40eb5928fd9d6d0c253ecae508274d417", } def _verify_sha256(file_path: str, expected_hash: str) -> bool: """Verify SHA256 checksum of a downloaded file.""" sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha256_hash.update(chunk) actual_hash = sha256_hash.hexdigest() return actual_hash.lower() == expected_hash.lower() def _get_taplo_download_url() -> tuple[str, str]: """ Get the appropriate Taplo download URL for the current platform. Returns: Tuple of (download_url, executable_name) """ system = platform.system().lower() machine = platform.machine().lower() # Map machine architecture to Taplo naming convention arch_map = { "x86_64": "x86_64", "amd64": "x86_64", "x86": "x86", "i386": "x86", "i686": "x86", "aarch64": "aarch64", "arm64": "aarch64", "armv7l": "armv7", } arch = arch_map.get(machine, "x86_64") # Default to x86_64 if system == "windows": filename = f"taplo-windows-{arch}.zip" executable = "taplo.exe" elif system == "darwin": filename = f"taplo-darwin-{arch}.gz" executable = "taplo" else: # Linux and others filename = f"taplo-linux-{arch}.gz" executable = "taplo" return f"{TAPLO_DOWNLOAD_BASE}/{filename}", executable class TaploServer(SolidLanguageServer): """ Provides TOML specific instantiation of the LanguageServer class using Taplo. Taplo is a TOML toolkit with LSP support for validation, formatting, and schema support. """ @staticmethod def _determine_log_level(line: str) -> int: """Classify Taplo stderr output to avoid false-positive errors.""" line_lower = line.lower() # Known informational messages from Taplo if any( [ "schema" in line_lower and "not found" in line_lower, "warning" in line_lower, ] ): return logging.DEBUG return SolidLanguageServer._determine_log_level(line) def __init__(self, config: LanguageServerConfig, repository_root_path: str, solidlsp_settings: SolidLSPSettings): """ Creates a TaploServer instance. This class is not meant to be instantiated directly. Use LanguageServer.create() instead. """ taplo_executable_path = self._setup_runtime_dependencies(solidlsp_settings) super().__init__( config, repository_root_path, ProcessLaunchInfo(cmd=f"{taplo_executable_path} lsp stdio", cwd=repository_root_path), "toml", solidlsp_settings, ) self.server_ready = threading.Event() @classmethod def _setup_runtime_dependencies(cls, solidlsp_settings: SolidLSPSettings) -> str: """ Setup runtime dependencies for Taplo and return the command to start the server. """ # First check if taplo is already installed system-wide system_taplo = shutil.which("taplo") if system_taplo: log.info(f"Using system-installed Taplo at: {system_taplo}") return system_taplo # Setup local installation directory taplo_dir = os.path.join(cls.ls_resources_dir(solidlsp_settings), "taplo") os.makedirs(taplo_dir, exist_ok=True) _, executable_name = _get_taplo_download_url() taplo_executable = os.path.join(taplo_dir, executable_name) if os.path.exists(taplo_executable) and os.access(taplo_executable, os.X_OK): log.info(f"Using cached Taplo at: {taplo_executable}") return taplo_executable # Download and install Taplo log.info(f"Taplo not found. Downloading version {TAPLO_VERSION}...") cls._download_taplo(taplo_dir, taplo_executable) if not os.path.exists(taplo_executable): raise FileNotFoundError( f"Taplo executable not found at {taplo_executable}. " "Installation may have failed. Try installing manually: cargo install taplo-cli --locked" ) return taplo_executable @classmethod def _download_taplo(cls, install_dir: str, executable_path: str) -> None: """Download and extract Taplo binary with SHA256 verification.""" # TODO: consider using existing download utilities in SolidLSP instead of the custom logic here download_url, _ = _get_taplo_download_url() archive_filename = os.path.basename(download_url) try: log.info(f"Downloading Taplo from: {download_url}") archive_path = os.path.join(install_dir, archive_filename) # Download the archive with timeout to prevent indefinite hangs old_timeout = socket.getdefaulttimeout() try: socket.setdefaulttimeout(DOWNLOAD_TIMEOUT_SECONDS) urllib.request.urlretrieve(download_url, archive_path) finally: socket.setdefaulttimeout(old_timeout) # Verify SHA256 checksum expected_hash = TAPLO_SHA256_CHECKSUMS.get(archive_filename) if expected_hash: if not _verify_sha256(archive_path, expected_hash): os.remove(archive_path) raise RuntimeError( f"SHA256 checksum verification failed for {archive_filename}. " "The downloaded file may be corrupted or tampered with. " "Try installing manually: cargo install taplo-cli --locked" ) log.info(f"SHA256 checksum verified for {archive_filename}") else: log.warning( f"No SHA256 checksum available for {archive_filename}. " "Skipping verification - consider installing manually: cargo install taplo-cli --locked" ) # Extract based on format if archive_path.endswith(".gz") and not archive_path.endswith(".tar.gz"): # Single file gzip with gzip.open(archive_path, "rb") as f_in: with open(executable_path, "wb") as f_out: f_out.write(f_in.read()) elif archive_path.endswith(".zip"): import zipfile with zipfile.ZipFile(archive_path, "r") as zip_ref: # Security: Validate paths to prevent zip slip vulnerability for member in zip_ref.namelist(): member_path = os.path.normpath(os.path.join(install_dir, member)) if not member_path.startswith(os.path.normpath(install_dir)): raise RuntimeError(f"Zip slip detected: {member} attempts to escape install directory") zip_ref.extractall(install_dir) # Make executable on Unix systems if os.name != "nt": os.chmod(executable_path, os.stat(executable_path).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) # Clean up archive os.remove(archive_path) log.info(f"Taplo installed successfully at: {executable_path}") except Exception as e: log.error(f"Failed to download Taplo: {e}") raise RuntimeError( f"Failed to download Taplo from {download_url}. Try installing manually: cargo install taplo-cli --locked" ) from e @staticmethod def _get_initialize_params(repository_absolute_path: str) -> InitializeParams: """ Returns the initialize params for the Taplo Language Server. """ root_uri = PathUtils.path_to_uri(repository_absolute_path) initialize_params = { "locale": "en", "capabilities": { "textDocument": { "synchronization": {"didSave": True, "dynamicRegistration": True}, "completion": {"dynamicRegistration": True, "completionItem": {"snippetSupport": True}}, "definition": {"dynamicRegistration": True}, "references": {"dynamicRegistration": True}, "documentSymbol": { "dynamicRegistration": True, "hierarchicalDocumentSymbolSupport": True, "symbolKind": {"valueSet": list(range(1, 27))}, }, "hover": {"dynamicRegistration": True, "contentFormat": ["markdown", "plaintext"]}, "codeAction": {"dynamicRegistration": True}, }, "workspace": { "workspaceFolders": True, "didChangeConfiguration": {"dynamicRegistration": True}, "symbol": {"dynamicRegistration": True}, }, }, "processId": os.getpid(), "rootPath": repository_absolute_path, "rootUri": root_uri, "workspaceFolders": [ { "uri": root_uri, "name": os.path.basename(repository_absolute_path), } ], } return initialize_params # type: ignore def _start_server(self) -> None: """ Starts the Taplo Language Server and initializes it. """ def register_capability_handler(params: Any) -> None: return def do_nothing(params: Any) -> None: return def window_log_message(msg: dict) -> None: log.info(f"LSP: window/logMessage: {msg}") self.server.on_request("client/registerCapability", register_capability_handler) self.server.on_notification("window/logMessage", window_log_message) self.server.on_notification("$/progress", do_nothing) self.server.on_notification("textDocument/publishDiagnostics", do_nothing) log.info("Starting Taplo server process") self.server.start() initialize_params = self._get_initialize_params(self.repository_root_path) log.info("Sending initialize request to Taplo server") init_response = self.server.send.initialize(initialize_params) log.debug(f"Received initialize response from Taplo: {init_response}") # Verify document symbol support capabilities = init_response.get("capabilities", {}) if capabilities.get("documentSymbolProvider"): log.info("Taplo server supports document symbols") else: log.warning("Taplo server may have limited document symbol support") self.server.notify.initialized({}) log.info("Taplo server initialization complete") self.server_ready.set() self.completions_available.set() def is_ignored_dirname(self, dirname: str) -> bool: """Define TOML-specific directories to ignore.""" return super().is_ignored_dirname(dirname) or dirname in ["target", ".cargo", "node_modules"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/oraios/serena'

If you have feedback or need assistance with the MCP directory API, please join our Discord server