Skip to main content
Glama

MCP Bitget Trading Server

by gagarinyury
websocket-client.ts•8.24 kB
/** * Bitget WebSocket Client * Handles real-time data streaming from Bitget exchange */ import WebSocket from 'ws'; import { EventEmitter } from 'events'; import { BitgetConfig, WSSubscription, WSMessage } from '../types/bitget.js'; import { logger } from '../utils/logger.js'; export interface WSClientConfig { url: string; pingInterval?: number; reconnectInterval?: number; maxReconnects?: number; } export class BitgetWebSocketClient extends EventEmitter { private ws: WebSocket | null = null; private config: WSClientConfig; private subscriptions: Set<string> = new Set(); private pingTimer: NodeJS.Timeout | null = null; private reconnectTimer: NodeJS.Timeout | null = null; private reconnectCount = 0; private isConnected = false; private isConnecting = false; constructor(config: WSClientConfig) { super(); this.config = { pingInterval: 30000, // 30 seconds reconnectInterval: 5000, // 5 seconds maxReconnects: 10, ...config }; } /** * Connect to WebSocket */ async connect(): Promise<void> { if (this.isConnected || this.isConnecting) { return; } this.isConnecting = true; logger.info('Connecting to Bitget WebSocket', { url: this.config.url }); try { this.ws = new WebSocket(this.config.url); this.setupEventHandlers(); return new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error('WebSocket connection timeout')); }, 10000); this.ws!.once('open', () => { clearTimeout(timeout); this.isConnected = true; this.isConnecting = false; this.reconnectCount = 0; logger.info('WebSocket connected successfully'); this.startPing(); this.resubscribeAll(); resolve(); }); this.ws!.once('error', (error) => { clearTimeout(timeout); this.isConnecting = false; reject(error); }); }); } catch (error) { this.isConnecting = false; throw error; } } /** * Disconnect from WebSocket */ disconnect(): void { logger.info('Disconnecting WebSocket'); this.isConnected = false; this.isConnecting = false; if (this.pingTimer) { clearInterval(this.pingTimer); this.pingTimer = null; } if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.ws) { this.ws.close(); this.ws = null; } this.subscriptions.clear(); } /** * Subscribe to a channel */ subscribe(channel: string, symbol: string, instType: 'SPOT' | 'UMCBL' | 'DMCBL' = 'SPOT'): void { const subscription: WSSubscription = { op: 'subscribe', args: [{ instType, channel, instId: symbol }] }; const subKey = `${instType}:${channel}:${symbol}`; this.subscriptions.add(subKey); if (this.isConnected && this.ws) { logger.debug('Subscribing to channel', { channel, symbol, instType }); this.ws.send(JSON.stringify(subscription)); } else { logger.warn('WebSocket not connected, subscription will be sent on reconnect', { channel, symbol }); } } /** * Unsubscribe from a channel */ unsubscribe(channel: string, symbol: string, instType: 'SPOT' | 'UMCBL' | 'DMCBL' = 'SPOT'): void { const subscription: WSSubscription = { op: 'unsubscribe', args: [{ instType, channel, instId: symbol }] }; const subKey = `${instType}:${channel}:${symbol}`; this.subscriptions.delete(subKey); if (this.isConnected && this.ws) { logger.debug('Unsubscribing from channel', { channel, symbol, instType }); this.ws.send(JSON.stringify(subscription)); } } /** * Setup WebSocket event handlers */ private setupEventHandlers(): void { if (!this.ws) return; this.ws.on('open', () => { logger.debug('WebSocket connection opened'); this.emit('connected'); }); this.ws.on('message', (data: Buffer) => { try { const message = JSON.parse(data.toString()); this.handleMessage(message); } catch (error) { logger.error('Failed to parse WebSocket message', { error: (error as any).message }); } }); this.ws.on('error', (error) => { logger.error('WebSocket error', { error: error.message }); this.emit('error', error); }); this.ws.on('close', (code, reason) => { logger.warn('WebSocket connection closed', { code, reason: reason.toString() }); this.isConnected = false; this.stopPing(); this.emit('disconnected', { code, reason }); this.scheduleReconnect(); }); this.ws.on('pong', () => { logger.debug('Received pong'); }); } /** * Handle incoming messages */ private handleMessage(message: any): void { logger.debug('Received WebSocket message', { type: message.event || message.action }); if (message.event === 'subscribe') { logger.info('Subscription confirmed', { channel: message.arg }); this.emit('subscribed', message.arg); } else if (message.event === 'unsubscribe') { logger.info('Unsubscription confirmed', { channel: message.arg }); this.emit('unsubscribed', message.arg); } else if (message.event === 'error') { logger.error('WebSocket subscription error', { error: message }); this.emit('subscriptionError', message); } else if (message.data && message.arg) { // Data update const wsMessage: WSMessage = { action: message.action || 'update', arg: message.arg, data: message.data, ts: message.ts || Date.now() }; this.emit('data', wsMessage); // Emit specific channel events const { instType, channel, instId } = message.arg; this.emit(`${instType}:${channel}:${instId}`, wsMessage); } else if (message.ping) { // Respond to ping if (this.ws && this.isConnected) { this.ws.send(JSON.stringify({ pong: message.ping })); } } } /** * Start ping mechanism */ private startPing(): void { if (this.pingTimer) { clearInterval(this.pingTimer); } this.pingTimer = setInterval(() => { if (this.ws && this.isConnected) { this.ws.ping(); logger.debug('Sent ping'); } }, this.config.pingInterval!); } /** * Stop ping mechanism */ private stopPing(): void { if (this.pingTimer) { clearInterval(this.pingTimer); this.pingTimer = null; } } /** * Schedule reconnection */ private scheduleReconnect(): void { if (this.reconnectCount >= this.config.maxReconnects!) { logger.error('Max reconnection attempts reached'); this.emit('maxReconnectsReached'); return; } this.reconnectTimer = setTimeout(async () => { this.reconnectCount++; logger.info(`Attempting to reconnect (${this.reconnectCount}/${this.config.maxReconnects})`); try { await this.connect(); } catch (error) { logger.error('Reconnection failed', { error: (error as any).message }); this.scheduleReconnect(); } }, this.config.reconnectInterval!); } /** * Resubscribe to all channels after reconnection */ private resubscribeAll(): void { logger.info(`Resubscribing to ${this.subscriptions.size} channels`); for (const subKey of this.subscriptions) { const [instType, channel, symbol] = subKey.split(':'); this.subscribe(channel, symbol, instType as any); } } /** * Get connection status */ isWebSocketConnected(): boolean { return this.isConnected; } /** * Get subscription count */ getSubscriptionCount(): number { return this.subscriptions.size; } } /** * Factory function to create WebSocket client for Bitget */ export function createBitgetWebSocketClient(config: BitgetConfig): BitgetWebSocketClient { return new BitgetWebSocketClient({ url: config.wsUrl, pingInterval: 30000, reconnectInterval: 5000, maxReconnects: 10 }); }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gagarinyury/MCP-bitget-trading'

If you have feedback or need assistance with the MCP directory API, please join our Discord server