Skip to main content
Glama
websocket_manager.py1.72 kB
"""WebSocket connection manager for streaming subscriptions.""" from typing import Dict, List, Any from starlette.websockets import WebSocket class StreamingManager: """Manages WebSocket connections and TWS subscriptions.""" def __init__(self): self.connections: Dict[str, List[WebSocket]] = { "market_data": [], "portfolio": [], "news": [] } self.subscriptions: Dict[WebSocket, Dict[str, Any]] = {} async def connect(self, websocket: WebSocket, stream_type: str): """Register a new WebSocket connection.""" await websocket.accept() if stream_type in self.connections: self.connections[stream_type].append(websocket) self.subscriptions[websocket] = {"type": stream_type, "active": []} async def disconnect(self, websocket: WebSocket, stream_type: str): """Unregister a WebSocket connection and cleanup subscriptions.""" if stream_type in self.connections and websocket in self.connections[stream_type]: self.connections[stream_type].remove(websocket) if websocket in self.subscriptions: del self.subscriptions[websocket] async def broadcast(self, stream_type: str, message: dict): """Broadcast a message to all connected clients of a stream type.""" disconnected = [] for websocket in self.connections.get(stream_type, []): try: await websocket.send_json(message) except Exception: disconnected.append(websocket) # Cleanup disconnected clients for ws in disconnected: await self.disconnect(ws, stream_type)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/haymant/tws-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server