websocket-client.ts•7.89 kB
/**
* ByteBot WebSocket Client
*
* Provides WebSocket client for real-time task status updates
* Includes automatic reconnection with exponential backoff
*/
import WebSocket from 'ws';
import { WebSocketMessage, WebSocketEventType } from '../types/bytebot.js';
import { EnvironmentConfig } from '../types/mcp.js';
import { logError } from '../utils/error-handler.js';
/**
* WebSocket event listener callback
*/
export type WebSocketEventListener = (message: WebSocketMessage) => void;
/**
* WebSocket client state
*/
type WebSocketState = 'disconnected' | 'connecting' | 'connected' | 'reconnecting';
/**
* WebSocket Client
*/
export class WebSocketClient {
private ws: WebSocket | null = null;
private config: EnvironmentConfig;
private state: WebSocketState = 'disconnected';
private eventListeners: Map<WebSocketEventType | '*', Set<WebSocketEventListener>> = new Map();
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private reconnectDelay = 1000;
private reconnectBackoffMultiplier = 2;
private maxReconnectDelay = 30000;
private reconnectTimer: NodeJS.Timeout | null = null;
private pingInterval: NodeJS.Timeout | null = null;
private subscribedTaskIds: Set<string> = new Set();
constructor(config: EnvironmentConfig) {
this.config = config;
}
/**
* Connect to WebSocket server
*/
async connect(): Promise<void> {
if (this.state === 'connected' || this.state === 'connecting') {
console.warn('[ByteBot MCP] WebSocket already connected or connecting');
return;
}
if (!this.config.wsUrl) {
throw new Error('WebSocket URL not configured');
}
this.state = 'connecting';
console.log(`[ByteBot MCP] Connecting to WebSocket: ${this.config.wsUrl}`);
return new Promise((resolve, reject) => {
try {
const wsUrl = `${this.config.wsUrl}/tasks/subscribe`;
this.ws = new WebSocket(wsUrl);
this.ws.on('open', () => {
this.state = 'connected';
this.reconnectAttempts = 0;
console.log('[ByteBot MCP] WebSocket connected');
// Start ping interval to keep connection alive
this.startPingInterval();
// Resubscribe to tasks if any
this.resubscribeToTasks();
resolve();
});
this.ws.on('message', (data: WebSocket.Data) => {
try {
const message = JSON.parse(data.toString()) as WebSocketMessage;
this.handleMessage(message);
} catch (error) {
logError('WebSocket message parsing', error);
}
});
this.ws.on('error', (error) => {
logError('WebSocket error', error);
if (this.state === 'connecting') {
reject(error);
}
});
this.ws.on('close', (code, reason) => {
console.log(
`[ByteBot MCP] WebSocket disconnected: code=${code}, reason=${reason}`
);
this.handleDisconnect();
});
} catch (error) {
this.state = 'disconnected';
reject(error);
}
});
}
/**
* Disconnect from WebSocket server
*/
disconnect(): void {
console.log('[ByteBot MCP] Disconnecting WebSocket');
// Clear timers
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
// Close WebSocket
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.state = 'disconnected';
}
/**
* Subscribe to task updates
*/
subscribeToTask(taskId: string): void {
this.subscribedTaskIds.add(taskId);
if (this.state === 'connected' && this.ws) {
this.ws.send(
JSON.stringify({
action: 'subscribe',
taskId,
})
);
console.log(`[ByteBot MCP] Subscribed to task updates: ${taskId}`);
}
}
/**
* Subscribe to all task updates
*/
subscribeToAllTasks(): void {
if (this.state === 'connected' && this.ws) {
this.ws.send(
JSON.stringify({
action: 'subscribe',
})
);
console.log('[ByteBot MCP] Subscribed to all task updates');
}
}
/**
* Unsubscribe from task updates
*/
unsubscribeFromTask(taskId: string): void {
this.subscribedTaskIds.delete(taskId);
if (this.state === 'connected' && this.ws) {
this.ws.send(
JSON.stringify({
action: 'unsubscribe',
taskId,
})
);
console.log(`[ByteBot MCP] Unsubscribed from task updates: ${taskId}`);
}
}
/**
* Add event listener
*/
on(event: WebSocketEventType | '*', listener: WebSocketEventListener): void {
if (!this.eventListeners.has(event)) {
this.eventListeners.set(event, new Set());
}
this.eventListeners.get(event)!.add(listener);
}
/**
* Remove event listener
*/
off(event: WebSocketEventType | '*', listener: WebSocketEventListener): void {
const listeners = this.eventListeners.get(event);
if (listeners) {
listeners.delete(listener);
}
}
/**
* Get current state
*/
getState(): WebSocketState {
return this.state;
}
/**
* Check if connected
*/
isConnected(): boolean {
return this.state === 'connected';
}
/**
* Handle incoming message
*/
private handleMessage(message: WebSocketMessage): void {
console.log(
`[ByteBot MCP] WebSocket message: type=${message.type}, taskId=${message.taskId}`
);
// Emit to specific event listeners
const specificListeners = this.eventListeners.get(message.type);
if (specificListeners) {
specificListeners.forEach((listener) => listener(message));
}
// Emit to wildcard listeners
const wildcardListeners = this.eventListeners.get('*');
if (wildcardListeners) {
wildcardListeners.forEach((listener) => listener(message));
}
}
/**
* Handle disconnect and attempt reconnection
*/
private handleDisconnect(): void {
this.state = 'disconnected';
// Stop ping interval
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
// Attempt reconnection if not manually disconnected
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.attemptReconnect();
} else {
console.error(
`[ByteBot MCP] Max reconnect attempts (${this.maxReconnectAttempts}) reached`
);
}
}
/**
* Attempt to reconnect with exponential backoff
*/
private async attemptReconnect(): Promise<void> {
this.reconnectAttempts++;
this.state = 'reconnecting';
const delay = Math.min(
this.reconnectDelay *
Math.pow(this.reconnectBackoffMultiplier, this.reconnectAttempts - 1),
this.maxReconnectDelay
);
console.log(
`[ByteBot MCP] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`
);
this.reconnectTimer = setTimeout(async () => {
try {
await this.connect();
} catch (error) {
logError('WebSocket reconnection', error);
}
}, delay);
}
/**
* Resubscribe to tasks after reconnection
*/
private resubscribeToTasks(): void {
if (this.subscribedTaskIds.size > 0) {
console.log(
`[ByteBot MCP] Resubscribing to ${this.subscribedTaskIds.size} tasks`
);
this.subscribedTaskIds.forEach((taskId) => {
this.subscribeToTask(taskId);
});
}
}
/**
* Start ping interval to keep connection alive
*/
private startPingInterval(): void {
this.pingInterval = setInterval(() => {
if (this.ws && this.state === 'connected') {
this.ws.ping();
}
}, 30000); // Ping every 30 seconds
}
}