"""Deribit WebSocket client for real-time market data."""
import asyncio
import json
import logging
from typing import Any, Callable, Dict, Optional
import websockets
from websockets.client import WebSocketClientProtocol
from .config import settings
logger = logging.getLogger(__name__)
class DeribitWebSocketClient:
"""WebSocket client for Deribit exchange."""
def __init__(self):
self.ws: Optional[WebSocketClientProtocol] = None
self.url = settings.deribit_ws_url
self.subscriptions: Dict[str, list[Callable]] = {}
self._running = False
self._message_id = 0
self._response_handlers: Dict[int, asyncio.Future] = {}
def _get_next_id(self) -> int:
"""Generate next message ID."""
self._message_id += 1
return self._message_id
async def connect(self) -> None:
"""Establish WebSocket connection to Deribit."""
try:
logger.info(f"Connecting to Deribit WebSocket: {self.url}")
self.ws = await websockets.connect(
self.url,
ping_interval=20, # Send ping every 20 seconds
ping_timeout=10, # Wait 10 seconds for pong
close_timeout=10 # Wait 10 seconds for close
)
self._running = True
logger.info("Successfully connected to Deribit WebSocket")
# Start message handler
asyncio.create_task(self._message_handler())
# Authenticate if credentials are provided
if settings.deribit_api_key and settings.deribit_api_secret:
await self._authenticate()
except Exception as e:
logger.error(f"Failed to connect to Deribit WebSocket: {e}")
raise
async def ensure_connected(self) -> None:
"""Ensure WebSocket is connected, reconnect if needed."""
if not self.is_connected:
logger.warning("WebSocket not connected, attempting reconnection...")
try:
await self.connect()
except Exception as e:
logger.error(f"Reconnection failed: {e}")
raise
async def _authenticate(self) -> None:
"""Authenticate with Deribit API."""
msg_id = self._get_next_id()
auth_msg = {
"jsonrpc": "2.0",
"id": msg_id,
"method": "public/auth",
"params": {
"grant_type": "client_credentials",
"client_id": settings.deribit_api_key,
"client_secret": settings.deribit_api_secret,
},
}
future = asyncio.Future()
self._response_handlers[msg_id] = future
await self.ws.send(json.dumps(auth_msg))
try:
result = await asyncio.wait_for(future, timeout=10.0)
if "result" in result:
logger.info("Successfully authenticated with Deribit")
else:
logger.warning(f"Authentication response: {result}")
except asyncio.TimeoutError:
logger.error("Authentication timeout")
async def _message_handler(self) -> None:
"""Handle incoming WebSocket messages."""
try:
async for message in self.ws:
try:
data = json.loads(message)
# Handle RPC responses
if "id" in data and data["id"] in self._response_handlers:
future = self._response_handlers.pop(data["id"])
if not future.done():
future.set_result(data)
continue
# Handle subscription notifications
if "method" in data and data["method"] == "subscription":
params = data.get("params", {})
channel = params.get("channel", "")
# Extract instrument from channel (e.g., "ticker.BTC-PERPETUAL.raw")
if channel.startswith("ticker."):
parts = channel.split(".")
if len(parts) >= 2:
instrument = parts[1]
if instrument in self.subscriptions:
tick_data = params.get("data", {})
for callback in self.subscriptions[instrument]:
try:
await callback(instrument, tick_data)
except Exception as e:
logger.error(f"Error in subscription callback: {e}")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse WebSocket message: {e}")
except Exception as e:
logger.error(f"Error processing WebSocket message: {e}")
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self._running = False
except Exception as e:
logger.error(f"Error in message handler: {e}")
self._running = False
async def subscribe_ticker(
self, instrument: str, callback: Callable[[str, Dict[str, Any]], None]
) -> None:
"""Subscribe to ticker updates for an instrument."""
# Ensure connection is active
await self.ensure_connected()
if instrument not in self.subscriptions:
self.subscriptions[instrument] = []
self.subscriptions[instrument].append(callback)
# Send subscription request
msg_id = self._get_next_id()
sub_msg = {
"jsonrpc": "2.0",
"id": msg_id,
"method": "public/subscribe",
"params": {"channels": [f"ticker.{instrument}.raw"]},
}
future = asyncio.Future()
self._response_handlers[msg_id] = future
try:
await self.ws.send(json.dumps(sub_msg))
result = await asyncio.wait_for(future, timeout=10.0)
logger.info(f"Subscribed to ticker for {instrument}")
except asyncio.TimeoutError:
logger.error(f"Subscription timeout for {instrument}")
raise
except Exception as e:
logger.error(f"Subscription error for {instrument}: {e}")
raise
async def unsubscribe_ticker(self, instrument: str) -> None:
"""Unsubscribe from ticker updates."""
if instrument in self.subscriptions:
del self.subscriptions[instrument]
msg_id = self._get_next_id()
unsub_msg = {
"jsonrpc": "2.0",
"id": msg_id,
"method": "public/unsubscribe",
"params": {"channels": [f"ticker.{instrument}.raw"]},
}
await self.ws.send(json.dumps(unsub_msg))
logger.info(f"Unsubscribed from ticker for {instrument}")
async def get_ticker(self, instrument: str) -> Dict[str, Any]:
"""Get current ticker data for an instrument."""
# Ensure connection is active
await self.ensure_connected()
msg_id = self._get_next_id()
ticker_msg = {
"jsonrpc": "2.0",
"id": msg_id,
"method": "public/ticker",
"params": {"instrument_name": instrument},
}
future = asyncio.Future()
self._response_handlers[msg_id] = future
try:
await self.ws.send(json.dumps(ticker_msg))
result = await asyncio.wait_for(future, timeout=10.0)
return result.get("result", {})
except asyncio.TimeoutError:
logger.error(f"Ticker request timeout for {instrument}")
raise
except Exception as e:
logger.error(f"Ticker request error for {instrument}: {e}")
raise
async def disconnect(self) -> None:
"""Close WebSocket connection."""
self._running = False
if self.ws:
await self.ws.close()
logger.info("Disconnected from Deribit WebSocket")
@property
def is_connected(self) -> bool:
"""Check if WebSocket is connected."""
if not self._running or not self.ws:
return False
try:
# Check if the websocket has a closed attribute and use it
return not getattr(self.ws, 'closed', True)
except Exception:
return False