Skip to main content
Glama
binance_ws_api.py14.7 kB
# binance_ws_api.py import json import asyncio import logging from typing import Dict, List, Optional, Callable, Any, Union import websockets from websockets.exceptions import ConnectionClosed # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # WebSocket endpoints MARKET_WS_BASE_URL = "wss://stream.binance.com:9443" API_WS_BASE_URL = "wss://ws-api.binance.com:443/ws-api/v3" # Global connection and subscription state ws_connections = {} subscriptions = {} class BinanceWebSocketManager: """Manager for Binance WebSocket connections and subscriptions.""" def __init__(self): """Initialize the WebSocket manager.""" self.connections = {} # Map of connection_id -> websocket connection self.subscriptions = {} # Map of stream_name -> connection_id self.callbacks = {} # Map of stream_name -> callback function self.running_tasks = set() # Set of running tasks async def connect(self, connection_id: str, base_url: str = MARKET_WS_BASE_URL) -> bool: """Establish a WebSocket connection. Args: connection_id: Unique identifier for this connection base_url: WebSocket endpoint to connect to Returns: True if connection was successful, False otherwise """ try: connection = await websockets.connect(base_url) self.connections[connection_id] = connection logger.info(f"Established WebSocket connection: {connection_id}") # Start a task to receive messages receive_task = asyncio.create_task(self._receive_messages(connection_id)) self.running_tasks.add(receive_task) receive_task.add_done_callback(self.running_tasks.discard) return True except Exception as e: logger.error(f"Failed to establish WebSocket connection: {e}") return False async def disconnect(self, connection_id: str) -> bool: """Close a WebSocket connection. Args: connection_id: Identifier for the connection to close Returns: True if disconnection was successful, False otherwise """ if connection_id not in self.connections: logger.warning(f"Connection not found: {connection_id}") return False try: connection = self.connections[connection_id] await connection.close() del self.connections[connection_id] # Remove any subscriptions using this connection to_remove = [] for stream, conn_id in self.subscriptions.items(): if conn_id == connection_id: to_remove.append(stream) for stream in to_remove: del self.subscriptions[stream] if stream in self.callbacks: del self.callbacks[stream] logger.info(f"Closed WebSocket connection: {connection_id}") return True except Exception as e: logger.error(f"Failed to close WebSocket connection: {e}") return False async def _receive_messages(self, connection_id: str): """Background task to receive and process messages for a connection. Args: connection_id: Identifier for the connection to process messages from """ if connection_id not in self.connections: logger.error(f"Connection not found for receiver: {connection_id}") return connection = self.connections[connection_id] try: async for message in connection: await self._process_message(connection_id, message) except ConnectionClosed: logger.info(f"Connection closed: {connection_id}") # Handle reconnection logic if needed except Exception as e: logger.error(f"Error in message receiver: {e}") finally: # Clean up if the connection was lost unexpectedly if connection_id in self.connections: await self.disconnect(connection_id) async def _process_message(self, connection_id: str, message: str): """Process an incoming WebSocket message. Args: connection_id: Identifier for the connection the message came from message: The raw message string to process """ try: # Parse the message data = json.loads(message) # Handle ping/pong frames for connection keep-alive if "ping" in data: await self._send_pong(connection_id, data["ping"]) return # Handle combined stream messages if "stream" in data and "data" in data: stream_name = data["stream"] stream_data = data["data"] # Call the appropriate callback if registered if stream_name in self.callbacks: callback = self.callbacks[stream_name] await callback(stream_data) # Handle single stream messages (raw streams) else: # Try to determine which subscription this is for for stream_name, conn_id in self.subscriptions.items(): if conn_id == connection_id and stream_name in self.callbacks: callback = self.callbacks[stream_name] await callback(data) break except json.JSONDecodeError: logger.error(f"Failed to parse message as JSON: {message[:100]}...") except Exception as e: logger.error(f"Error processing message: {e}") async def _send_pong(self, connection_id: str, ping_payload: Any): """Send a pong response to a ping. Args: connection_id: Identifier for the connection to send the pong to ping_payload: The payload from the ping message to echo back """ if connection_id not in self.connections: return try: connection = self.connections[connection_id] pong_message = {"pong": ping_payload} await connection.send(json.dumps(pong_message)) except Exception as e: logger.error(f"Error sending pong: {e}") async def subscribe(self, stream_name: str, callback: Callable[[dict], Any], connection_id: Optional[str] = None, use_combined_stream: bool = True) -> bool: """Subscribe to a WebSocket stream. Args: stream_name: Name of the stream to subscribe to (e.g. 'btcusdt@trade') callback: Async function to call when a message is received connection_id: Optional identifier for an existing connection to use use_combined_stream: Whether to use the combined stream endpoint Returns: True if subscription was successful, False otherwise """ # If no connection_id is provided, create a new one if not connection_id: connection_id = f"conn_{stream_name}_{id(callback)}" # If the connection doesn't exist yet, create it if connection_id not in self.connections: if use_combined_stream: base_url = f"{MARKET_WS_BASE_URL}/stream" else: base_url = f"{MARKET_WS_BASE_URL}/ws/{stream_name}" connected = await self.connect(connection_id, base_url) if not connected: return False # Register the subscription and callback self.subscriptions[stream_name] = connection_id self.callbacks[stream_name] = callback # If using a combined stream, send the subscription request if use_combined_stream: try: connection = self.connections[connection_id] subscribe_msg = { "method": "SUBSCRIBE", "params": [stream_name], "id": id(callback) } await connection.send(json.dumps(subscribe_msg)) logger.info(f"Sent subscription request for: {stream_name}") return True except Exception as e: logger.error(f"Failed to send subscription request: {e}") return False # If using a raw stream, the subscription is already active by connecting to the stream URL return True async def unsubscribe(self, stream_name: str) -> bool: """Unsubscribe from a WebSocket stream. Args: stream_name: Name of the stream to unsubscribe from Returns: True if unsubscription was successful, False otherwise """ if stream_name not in self.subscriptions: logger.warning(f"No active subscription for: {stream_name}") return False connection_id = self.subscriptions[stream_name] # Check if we're using a combined stream connection = self.connections.get(connection_id) if not connection: logger.warning(f"Connection not found for: {connection_id}") return False # Check if this is a combined stream by looking at the connection URI if "/stream" in str(connection.uri): try: unsubscribe_msg = { "method": "UNSUBSCRIBE", "params": [stream_name], "id": id(self.callbacks.get(stream_name, lambda: None)) } await connection.send(json.dumps(unsubscribe_msg)) # Remove the subscription and callback del self.subscriptions[stream_name] if stream_name in self.callbacks: del self.callbacks[stream_name] logger.info(f"Unsubscribed from: {stream_name}") return True except Exception as e: logger.error(f"Failed to unsubscribe: {e}") return False else: # For raw streams, just disconnect the connection return await self.disconnect(connection_id) # Initialize a singleton WebSocket manager ws_manager = BinanceWebSocketManager() # Helper functions for common stream types async def subscribe_to_trade_stream(symbol: str, callback: Callable[[dict], Any]) -> bool: """Subscribe to the trade stream for a symbol. Args: symbol: Trading pair symbol in lowercase (e.g., 'btcusdt') callback: Async function to call when a trade message is received Returns: True if subscription was successful, False otherwise """ stream_name = f"{symbol.lower()}@trade" return await ws_manager.subscribe(stream_name, callback) async def subscribe_to_kline_stream(symbol: str, interval: str, callback: Callable[[dict], Any]) -> bool: """Subscribe to the kline/candlestick stream for a symbol. Args: symbol: Trading pair symbol in lowercase (e.g., 'btcusdt') interval: Kline interval (e.g., '1m', '1h', '1d') callback: Async function to call when a kline message is received Returns: True if subscription was successful, False otherwise """ stream_name = f"{symbol.lower()}@kline_{interval}" return await ws_manager.subscribe(stream_name, callback) async def subscribe_to_ticker_stream(symbol: str, callback: Callable[[dict], Any]) -> bool: """Subscribe to the ticker stream for a symbol. Args: symbol: Trading pair symbol in lowercase (e.g., 'btcusdt') callback: Async function to call when a ticker message is received Returns: True if subscription was successful, False otherwise """ stream_name = f"{symbol.lower()}@ticker" return await ws_manager.subscribe(stream_name, callback) async def subscribe_to_book_ticker_stream(symbol: str, callback: Callable[[dict], Any]) -> bool: """Subscribe to the book ticker stream for a symbol. Args: symbol: Trading pair symbol in lowercase (e.g., 'btcusdt') callback: Async function to call when a book ticker message is received Returns: True if subscription was successful, False otherwise """ stream_name = f"{symbol.lower()}@bookTicker" return await ws_manager.subscribe(stream_name, callback) async def subscribe_to_all_market_tickers(callback: Callable[[dict], Any]) -> bool: """Subscribe to ticker streams for all market pairs. Args: callback: Async function to call when market ticker messages are received Returns: True if subscription was successful, False otherwise """ stream_name = "!ticker@arr" return await ws_manager.subscribe(stream_name, callback) async def subscribe_to_depth_stream(symbol: str, callback: Callable[[dict], Any], levels: Optional[int] = None, update_speed: Optional[int] = None) -> bool: """Subscribe to the depth (order book) stream for a symbol. Args: symbol: Trading pair symbol in lowercase (e.g., 'btcusdt') callback: Async function to call when depth messages are received levels: Optional number of levels to include (5, 10, or 20) update_speed: Optional update speed in ms (1000 or 100) Returns: True if subscription was successful, False otherwise """ stream_name = f"{symbol.lower()}@depth" if levels: if levels not in (5, 10, 20): logger.warning(f"Invalid depth levels: {levels}. Using default.") else: stream_name = f"{symbol.lower()}@depth{levels}" if update_speed == 100: stream_name = f"{stream_name}@100ms" return await ws_manager.subscribe(stream_name, callback) # Cleanup function for application shutdown async def cleanup(): """Close all WebSocket connections when the application is shutting down.""" for connection_id in list(ws_manager.connections.keys()): await ws_manager.disconnect(connection_id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tienan92it/binance-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server