Binance MCP Server
by qeinfinity
- src
- connectors
import WebSocket from 'ws';
import { config } from '../config.js';
import { logger } from '../utils/logger.js';
import {
WebSocketMessage,
StreamEventType,
StreamEventData,
TradeData,
TickerData,
BookTickerData,
KlineData,
ForceOrderData,
MarkPriceData,
OpenInterestData
} from '../types/ws-stream.js';
type WSReadyState = number;
interface StreamSubscription {
symbol: string;
type: 'spot' | 'futures';
streams: StreamEventType[];
reconnectAttempts: number;
reconnectTimeout?: NodeJS.Timeout;
}
type MessageHandler = (data: StreamEventData) => void;
export class BinanceWebSocketManager {
private connections: Map<string, WebSocket>;
private pingIntervals: Map<string, NodeJS.Timeout>;
private messageCallbacks: Map<string, Map<StreamEventType, MessageHandler[]>>;
private subscriptions: Map<string, StreamSubscription>;
private readonly MAX_RECONNECT_ATTEMPTS = 5;
private readonly RECONNECT_DELAY = config.WS_RECONNECT_DELAY || 5000;
constructor() {
this.connections = new Map();
this.pingIntervals = new Map();
this.messageCallbacks = new Map();
this.subscriptions = new Map();
}
public subscribe(symbol: string, type: 'spot' | 'futures', streams: StreamEventType[]): void {
const subscription: StreamSubscription = {
symbol,
type,
streams,
reconnectAttempts: 0
};
if (!this.messageCallbacks.has(symbol)) {
this.messageCallbacks.set(symbol, new Map());
}
const symbolCallbacks = this.messageCallbacks.get(symbol)!;
streams.forEach(stream => {
if (!symbolCallbacks.has(stream)) {
symbolCallbacks.set(stream, []);
}
});
this.subscriptions.set(symbol, subscription);
this.connectWebSocket(subscription);
}
private connectWebSocket(subscription: StreamSubscription): void {
const { symbol, type, streams } = subscription;
const wsUrl = type === 'spot' ? config.SPOT_WS_URL : config.FUTURES_WS_URL;
// Handle special futures streams
const streamNames = streams.map(stream => {
if (type === 'futures') {
switch (stream) {
case 'forceOrder':
return `${symbol.toLowerCase()}@forceOrder`;
case 'markPrice':
return `${symbol.toLowerCase()}@markPrice@1s`; // 1s update frequency
case 'openInterest':
return `${symbol.toLowerCase()}@openInterest@1s`;
default:
return `${symbol.toLowerCase()}@${stream}`;
}
}
return `${symbol.toLowerCase()}@${stream}`;
});
try {
const ws = new WebSocket(`${wsUrl}/${streamNames.join('/')}`);
ws.on('open', () => {
logger.info(`WebSocket connected for ${symbol} ${streams.join(', ')}`);
subscription.reconnectAttempts = 0;
this.setupPingInterval(symbol, ws);
});
ws.on('message', (data: WebSocket.Data) => {
try {
const message = JSON.parse(data.toString()) as WebSocketMessage<StreamEventData>;
this.handleStreamMessage(symbol, message);
} catch (error) {
logger.error('Error parsing WebSocket message:', error);
}
});
ws.on('error', (error: Error) => {
logger.error(`WebSocket error for ${symbol}:`, error);
});
ws.on('close', () => {
logger.info(`WebSocket closed for ${symbol}`);
this.cleanup(symbol);
this.handleReconnection(subscription);
});
ws.on('pong', () => {
logger.debug(`Received pong from ${symbol} WebSocket`);
});
this.connections.set(symbol, ws);
} catch (error) {
logger.error(`Error creating WebSocket connection for ${symbol}:`, error);
this.handleReconnection(subscription);
throw error;
}
}
private handleStreamMessage(symbol: string, message: WebSocketMessage<StreamEventData>): void {
const symbolCallbacks = this.messageCallbacks.get(symbol);
if (!symbolCallbacks) return;
// Extract stream type from the stream name
const streamParts = message.stream.split('@');
if (streamParts.length < 2) return;
let streamType = streamParts[1] as StreamEventType;
// Handle special cases where the stream name has additional parts (e.g., markPrice@1s)
if (streamParts.length > 2) {
streamType = streamParts[1].split('@')[0] as StreamEventType;
}
const handlers = symbolCallbacks.get(streamType);
if (handlers) {
handlers.forEach(handler => {
try {
handler(message.data);
} catch (error) {
logger.error(`Error in message handler for ${symbol} ${streamType}:`, error);
}
});
}
}
public onStreamData(symbol: string, streamType: StreamEventType, handler: MessageHandler): void {
const symbolCallbacks = this.messageCallbacks.get(symbol);
if (!symbolCallbacks) {
logger.error(`No callbacks registered for symbol ${symbol}`);
return;
}
const handlers = symbolCallbacks.get(streamType) || [];
handlers.push(handler);
symbolCallbacks.set(streamType, handlers);
}
private handleReconnection(subscription: StreamSubscription): void {
const { symbol, reconnectAttempts } = subscription;
if (reconnectAttempts >= this.MAX_RECONNECT_ATTEMPTS) {
logger.error(`Max reconnection attempts reached for ${symbol}`);
return;
}
subscription.reconnectAttempts++;
const delay = this.RECONNECT_DELAY * Math.pow(2, reconnectAttempts - 1); // Exponential backoff
logger.info(`Attempting to reconnect ${symbol} in ${delay}ms (attempt ${reconnectAttempts})`);
subscription.reconnectTimeout = setTimeout(() => {
this.connectWebSocket(subscription);
}, delay);
}
private setupPingInterval(symbol: string, ws: WebSocket): void {
const interval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping((error?: Error) => {
if (error) {
logger.error(`Error sending ping for ${symbol}:`, error);
}
});
}
}, config.WS_PING_INTERVAL);
this.pingIntervals.set(symbol, interval);
}
private cleanup(symbol: string): void {
const interval = this.pingIntervals.get(symbol);
if (interval) {
clearInterval(interval);
this.pingIntervals.delete(symbol);
}
const subscription = this.subscriptions.get(symbol);
if (subscription?.reconnectTimeout) {
clearTimeout(subscription.reconnectTimeout);
}
this.connections.delete(symbol);
}
public unsubscribe(symbol: string): void {
const ws = this.connections.get(symbol);
if (ws) {
ws.close();
}
this.cleanup(symbol);
this.subscriptions.delete(symbol);
this.messageCallbacks.delete(symbol);
}
public close(): void {
this.connections.forEach((ws, symbol) => {
ws.close();
this.cleanup(symbol);
});
this.subscriptions.clear();
this.messageCallbacks.clear();
}
public getConnectionState(symbol: string): WSReadyState | undefined {
const ws = this.connections.get(symbol);
return ws?.readyState;
}
public isSubscribed(symbol: string, streamType: StreamEventType): boolean {
const subscription = this.subscriptions.get(symbol);
return subscription?.streams.includes(streamType) || false;
}
}