Skip to main content
Glama
handler.py2.44 kB
import asyncio from tornado.log import gen_log as logger from json import loads from typing import Dict from tornado.websocket import WebSocketHandler DISCOVERY_EVENT = "discovery-event" METRICS_EVENT = "instance-level-metrics" CONFIGURATION_LOAD_EVENT = "configuration-load-event" EVENT_KEY = "event" DATA_KEY = "data" CONNECTION_READY_MESSAGE = "ready" class WebSocketHubHandler(WebSocketHandler): def initialize(self, cache: Dict[str, Dict[str, str]], lock: asyncio.Lock): self.cache = cache self.lock = lock async def on_message(self, message: str | bytes): logger.info(f"Received message: {message}") await self.handle_message(message) async def handle_message(self, message: str | bytes): try: json_data = loads(message) logger.info(f"Parsed message: {json_data}") event_name = json_data.get(EVENT_KEY) logger.info(f"Event: {event_name}") handler = self.get_event_handler(event_name) if handler is None: print(f"Error handling event message of type: {event_name}") else: await handler(json_data) except Exception as e: print(f"Error handling message: {e}") def get_event_handler(self, event: str): logger.info(f"Event: {event}") return { DISCOVERY_EVENT: self._discovery_event, CONFIGURATION_LOAD_EVENT: self._configuration_load_event, METRICS_EVENT: self._metrics_event, }.get(event) async def _discovery_event(self, event_data: Dict[str, str]): async with self.lock: logger.info(f"Discovery Event: {event_data}") self.cache[DISCOVERY_EVENT] = event_data async def _configuration_load_event(self, event_data: Dict[str, str]): async with self.lock: logger.info(f"Configuration Load Event: {event_data}") self.cache[CONFIGURATION_LOAD_EVENT] = event_data async def _metrics_event(self, event_data: Dict[str, str]): async with self.lock: logger.info(f"Metrics Event: {event_data}") self.cache[METRICS_EVENT] = event_data # Custom ping handler def on_ping(self, data: bytes): logger.info(f"Received ping with data: {data}") # Send a custom readiness message, "ready", as expected by protocol self.write_message(CONNECTION_READY_MESSAGE)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server