Skip to main content
Glama
websocket_server.py4.48 kB
#!/usr/bin/env python # vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai """Standalone MCP WebSocket server wrapping Calibre tools.""" from __future__ import annotations import asyncio import json import logging from typing import Any, Dict, Optional import websockets from websockets.server import WebSocketServerProtocol from .config_loader import ServerConfig, load_config_from_env from .core.plugin_registry import PluginRegistry from .core.service import LibraryResearchService from .mcp_protocol import make_error_response, make_result_response from .tools.excerpt_tool import register_excerpt_tool from .tools.ft_search_tool import register_ft_search_tool log = logging.getLogger(__name__) class MCPWebSocketServer: """Very small MCP-inspired WebSocket facade for FastMCP tools.""" def __init__(self, config: ServerConfig): self.config = config self._server: Optional[asyncio.AbstractServer] = None self._mcp = self._build_fastmcp(config) def _build_fastmcp(self, cfg: ServerConfig): service = LibraryResearchService(calibre_root_path=cfg.calibre_library_path) registry = PluginRegistry(service) from fastmcp import FastMCP mcp = FastMCP("CalibreMCPServer") register_ft_search_tool(mcp, registry) register_excerpt_tool(mcp, registry) return mcp async def _handle_client(self, websocket: WebSocketServerProtocol) -> None: async for message in websocket: try: payload = json.loads(message) except json.JSONDecodeError: await websocket.send(json.dumps(make_error_response("-", "Invalid JSON"))) continue request_id = str(payload.get("id") or "-") method = payload.get("method") params = payload.get("params") or {} if method == "list_tools": await websocket.send(json.dumps(make_result_response(request_id, self._list_tools()))) elif method == "call_tool": await websocket.send(json.dumps(await self._call_tool(request_id, params))) else: await websocket.send(json.dumps(make_error_response(request_id, "Unknown method", code="unknown_method"))) def _list_tools(self) -> Dict[str, Any]: tools = [] for name, tool in self._mcp.tools.items(): schema = tool.input_model.model_json_schema() tools.append({ "name": name, "description": tool.func.__doc__ or "", "input_schema": schema, }) return {"tools": tools} async def _call_tool(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]: name = params.get("name") arguments = params.get("arguments") or {} if not name: return make_error_response(request_id, "Missing tool name", code="bad_request") tool = self._mcp.tools.get(name) if not tool: return make_error_response(request_id, f"Unknown tool '{name}'", code="not_found") try: result_model = tool.func(tool.input_model(**arguments)) except Exception as exc: # noqa: BLE001 return make_error_response(request_id, f"Tool failed: {exc}") return make_result_response(request_id, json.loads(result_model.model_dump_json())) async def start(self): cfg = self.config log.info("Starting MCP WebSocket server on ws://%s:%s", cfg.server_host, cfg.server_port) try: self._server = await websockets.serve(self._handle_client, cfg.server_host, cfg.server_port) except OSError as exc: raise RuntimeError(f"Server konnte Port {cfg.server_port} nicht binden: {exc}") from exc async def stop(self): if self._server: self._server.close() await self._server.wait_closed() self._server = None async def run_async(config: Optional[ServerConfig] = None) -> None: cfg = config or load_config_from_env() server = MCPWebSocketServer(cfg) await server.start() try: await asyncio.Future() # run forever finally: await server.stop() def run_from_env() -> None: cfg = load_config_from_env() try: asyncio.run(run_async(cfg)) except RuntimeError as exc: print(str(exc)) raise except KeyboardInterrupt: pass if __name__ == "__main__": run_from_env()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Miguel0888/mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server