/**
* Connection Manager
* Manages WebSocket connections with health monitoring and automatic recovery
*/
import { EventEmitter } from 'events';
import { ExponentialBackoff } from './exponential-backoff.js';
export const ConnectionState = {
DISCONNECTED: 'disconnected',
CONNECTING: 'connecting',
CONNECTED: 'connected',
RECONNECTING: 'reconnecting',
FAILED: 'failed'
};
export class ConnectionManager extends EventEmitter {
constructor(options = {}) {
super();
this.state = ConnectionState.DISCONNECTED;
this.connectionAttempts = 0;
this.lastError = null;
this.healthCheckInterval = null;
this.reconnectTimer = null;
// Configuration
this.config = {
maxReconnectAttempts: options.maxReconnectAttempts || 10,
reconnectDelay: options.reconnectDelay || 1000,
maxReconnectDelay: options.maxReconnectDelay || 30000,
heartbeatInterval: options.heartbeatInterval || 30000,
connectionTimeout: options.connectionTimeout || 10000,
enableAutoReconnect: options.enableAutoReconnect !== false,
...options
};
// Initialize backoff
this.backoff = new ExponentialBackoff({
initialDelay: this.config.reconnectDelay,
maxDelay: this.config.maxReconnectDelay,
maxAttempts: this.config.maxReconnectAttempts
});
// Operation queue for pending operations during reconnection
this.operationQueue = [];
this.isProcessingQueue = false;
}
get isConnected() {
return this.state === ConnectionState.CONNECTED;
}
get isConnecting() {
return this.state === ConnectionState.CONNECTING ||
this.state === ConnectionState.RECONNECTING;
}
setState(newState) {
const oldState = this.state;
this.state = newState;
if (oldState !== newState) {
this.emit('stateChange', { from: oldState, to: newState });
console.log(`Connection state changed: ${oldState} -> ${newState}`);
}
}
async connect(connectFn) {
if (this.isConnected) {
return true;
}
if (this.isConnecting) {
// Wait for ongoing connection attempt
return this.waitForConnection();
}
this.setState(ConnectionState.CONNECTING);
this.connectionAttempts++;
try {
// Set connection timeout
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Connection timeout')),
this.config.connectionTimeout);
});
// Race between connection and timeout
await Promise.race([connectFn(), timeoutPromise]);
this.setState(ConnectionState.CONNECTED);
this.connectionAttempts = 0;
this.lastError = null;
this.backoff.reset();
// Start health monitoring
this.startHealthCheck();
// Process any queued operations
this.processOperationQueue();
this.emit('connected');
return true;
} catch (error) {
this.lastError = error;
console.error(`Connection failed (attempt ${this.connectionAttempts}):`, error.message);
if (this.config.enableAutoReconnect && this.backoff.canRetry) {
this.scheduleReconnect(connectFn);
} else {
this.setState(ConnectionState.FAILED);
this.emit('connectionFailed', error);
}
throw error;
}
}
scheduleReconnect(connectFn) {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
const delay = this.backoff.nextDelay;
if (delay === null) {
this.setState(ConnectionState.FAILED);
this.emit('maxReconnectAttemptsReached');
return;
}
this.setState(ConnectionState.RECONNECTING);
console.log(`Reconnecting in ${delay}ms (attempt ${this.backoff.attempt}/${this.backoff.maxAttempts})`);
this.reconnectTimer = setTimeout(async () => {
try {
await this.connect(connectFn);
} catch (error) {
// Connection will handle its own retry
}
}, delay);
}
disconnect() {
this.stopHealthCheck();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.setState(ConnectionState.DISCONNECTED);
this.backoff.reset();
this.connectionAttempts = 0;
this.operationQueue = [];
this.emit('disconnected');
}
handleUnexpectedDisconnect(connectFn) {
console.warn('Unexpected disconnect detected');
this.stopHealthCheck();
if (this.config.enableAutoReconnect) {
this.scheduleReconnect(connectFn);
} else {
this.setState(ConnectionState.DISCONNECTED);
this.emit('unexpectedDisconnect');
}
}
startHealthCheck(checkFn) {
this.stopHealthCheck();
if (!checkFn || this.config.heartbeatInterval <= 0) {
return;
}
this.healthCheckInterval = setInterval(async () => {
if (!this.isConnected) {
return;
}
try {
await checkFn();
this.emit('healthCheckSuccess');
} catch (error) {
console.error('Health check failed:', error.message);
this.emit('healthCheckFailed', error);
// Treat failed health check as unexpected disconnect
this.handleUnexpectedDisconnect();
}
}, this.config.heartbeatInterval);
}
stopHealthCheck() {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
}
async waitForConnection(timeout = 30000) {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.removeListener('connected', onConnect);
this.removeListener('connectionFailed', onFail);
reject(new Error('Connection wait timeout'));
}, timeout);
const onConnect = () => {
clearTimeout(timer);
this.removeListener('connectionFailed', onFail);
resolve(true);
};
const onFail = (error) => {
clearTimeout(timer);
this.removeListener('connected', onConnect);
reject(error);
};
this.once('connected', onConnect);
this.once('connectionFailed', onFail);
});
}
queueOperation(operation) {
return new Promise((resolve, reject) => {
this.operationQueue.push({ operation, resolve, reject });
if (this.isConnected) {
this.processOperationQueue();
}
});
}
async processOperationQueue() {
if (this.isProcessingQueue || !this.isConnected || this.operationQueue.length === 0) {
return;
}
this.isProcessingQueue = true;
while (this.operationQueue.length > 0 && this.isConnected) {
const { operation, resolve, reject } = this.operationQueue.shift();
try {
const result = await operation();
resolve(result);
} catch (error) {
reject(error);
}
}
this.isProcessingQueue = false;
}
getStatus() {
return {
state: this.state,
isConnected: this.isConnected,
connectionAttempts: this.connectionAttempts,
lastError: this.lastError ? this.lastError.message : null,
queuedOperations: this.operationQueue.length,
attemptsRemaining: this.backoff.attemptsRemaining
};
}
}
/**
* Circuit Breaker implementation for connection management
*/
export class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 60000; // 1 minute
this.halfOpenRequests = options.halfOpenRequests || 1;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.lastFailureTime = null;
this.successCount = 0;
this.resetTimer = null;
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime < this.resetTimeout) {
throw new Error('Circuit breaker is OPEN');
}
// Try half-open
this.state = 'HALF_OPEN';
this.successCount = 0;
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= this.halfOpenRequests) {
this.state = 'CLOSED';
console.log('Circuit breaker closed');
}
}
}
onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
console.log('Circuit breaker opened');
// Schedule reset to half-open
if (this.resetTimer) {
clearTimeout(this.resetTimer);
}
this.resetTimer = setTimeout(() => {
this.state = 'HALF_OPEN';
console.log('Circuit breaker half-open');
}, this.resetTimeout);
}
}
reset() {
this.state = 'CLOSED';
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = null;
if (this.resetTimer) {
clearTimeout(this.resetTimer);
this.resetTimer = null;
}
}
getStatus() {
return {
state: this.state,
failureCount: this.failureCount,
successCount: this.successCount,
isOpen: this.state === 'OPEN'
};
}
}