import { EventEmitter } from 'events';
import { createLogger } from './logger.js';
import { structuredLogger } from './structured-logger.js';
export enum CircuitState {
CLOSED = 'CLOSED', // Normal operation
OPEN = 'OPEN', // Failing, rejecting requests
HALF_OPEN = 'HALF_OPEN' // Testing if service recovered
}
export interface CircuitBreakerOptions {
/**
* Number of failures before opening the circuit
*/
failureThreshold?: number;
/**
* Time window in ms to count failures
*/
failureWindow?: number;
/**
* Time in ms before attempting to close the circuit
*/
resetTimeout?: number;
/**
* Number of successful calls needed to close from half-open
*/
successThreshold?: number;
/**
* Timeout for individual calls in ms
*/
timeout?: number;
/**
* Name for logging
*/
name?: string;
/**
* Custom error filter to determine if an error should trip the breaker
*/
errorFilter?: (error: Error) => boolean;
/**
* Fallback function when circuit is open
*/
fallback?: () => Promise<any>;
}
export interface CircuitBreakerStats {
state: CircuitState;
failures: number;
successes: number;
consecutiveSuccesses: number;
consecutiveFailures: number;
lastFailureTime?: Date;
lastSuccessTime?: Date;
totalRequests: number;
rejectedRequests: number;
timeoutRequests: number;
fallbackRequests: number;
}
/**
* Circuit Breaker implementation for fault tolerance
*/
export class CircuitBreaker extends EventEmitter {
private state: CircuitState = CircuitState.CLOSED;
private failures = 0;
private successes = 0;
private consecutiveSuccesses = 0;
private consecutiveFailures = 0;
private lastFailureTime?: Date;
private lastSuccessTime?: Date;
private totalRequests = 0;
private rejectedRequests = 0;
private timeoutRequests = 0;
private fallbackRequests = 0;
private resetTimer?: NodeJS.Timeout;
private failureTimestamps: number[] = [];
private readonly options: Required<CircuitBreakerOptions>;
private readonly logger;
constructor(options: CircuitBreakerOptions = {}) {
super();
this.options = {
failureThreshold: options.failureThreshold || 5,
failureWindow: options.failureWindow || 60000, // 1 minute
resetTimeout: options.resetTimeout || 60000, // 1 minute
successThreshold: options.successThreshold || 3,
timeout: options.timeout || 30000, // 30 seconds
name: options.name || 'CircuitBreaker',
errorFilter: options.errorFilter || (() => true),
fallback: options.fallback || (() => Promise.reject(new Error('Circuit breaker is OPEN')))
};
this.logger = createLogger({ component: `CircuitBreaker:${this.options.name}` });
}
/**
* Execute a function with circuit breaker protection
*/
async execute<T>(
fn: () => Promise<T>,
correlationId?: string
): Promise<T> {
this.totalRequests++;
const logger = correlationId
? structuredLogger.createCorrelated(`circuit-breaker.${this.options.name}`, correlationId)
: this.logger;
// Check circuit state
if (this.state === CircuitState.OPEN) {
this.rejectedRequests++;
this.fallbackRequests++;
logger.warn('Circuit is OPEN, using fallback', {
failures: this.failures,
lastFailure: this.lastFailureTime
});
this.emit('rejected', { state: this.state, stats: this.getStats() });
// Use fallback if available
return this.options.fallback() as Promise<T>;
}
// Create timeout promise
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => {
reject(new Error(`Operation timed out after ${this.options.timeout}ms`));
}, this.options.timeout);
});
try {
// Execute the function with timeout
const result = await Promise.race([
fn(),
timeoutPromise
]);
this.onSuccess();
logger.debug('Operation succeeded', {
state: this.state,
consecutiveSuccesses: this.consecutiveSuccesses
});
return result;
} catch (error) {
const err = error as Error;
// Check if this is a timeout
if (err.message.includes('timed out')) {
this.timeoutRequests++;
}
// Check if error should trip the breaker
if (this.options.errorFilter(err)) {
this.onFailure();
logger.error('Operation failed', err, {
state: this.state,
consecutiveFailures: this.consecutiveFailures
});
} else {
logger.debug('Error ignored by filter', { error: err.message });
}
throw error;
}
}
/**
* Handle successful execution
*/
private onSuccess(): void {
this.successes++;
this.consecutiveSuccesses++;
this.consecutiveFailures = 0;
this.lastSuccessTime = new Date();
if (this.state === CircuitState.HALF_OPEN) {
if (this.consecutiveSuccesses >= this.options.successThreshold) {
this.close();
}
}
this.emit('success', { state: this.state, stats: this.getStats() });
}
/**
* Handle failed execution
*/
private onFailure(): void {
this.failures++;
this.consecutiveFailures++;
this.consecutiveSuccesses = 0;
this.lastFailureTime = new Date();
// Add timestamp to sliding window
const now = Date.now();
this.failureTimestamps.push(now);
// Remove old timestamps outside the window
const cutoff = now - this.options.failureWindow;
this.failureTimestamps = this.failureTimestamps.filter(ts => ts > cutoff);
// Check if we should open the circuit
if (this.state === CircuitState.CLOSED) {
if (this.failureTimestamps.length >= this.options.failureThreshold) {
this.open();
}
} else if (this.state === CircuitState.HALF_OPEN) {
// Single failure in half-open state reopens the circuit
this.open();
}
this.emit('failure', { state: this.state, stats: this.getStats() });
}
/**
* Open the circuit
*/
private open(): void {
this.state = CircuitState.OPEN;
this.logger.warn('Circuit opened', {
failures: this.failures,
threshold: this.options.failureThreshold,
window: this.options.failureWindow
});
// Clear any existing timer
if (this.resetTimer) {
clearTimeout(this.resetTimer);
}
// Set timer to try half-open
this.resetTimer = setTimeout(() => {
this.halfOpen();
}, this.options.resetTimeout);
this.emit('open', { stats: this.getStats() });
}
/**
* Move to half-open state
*/
private halfOpen(): void {
this.state = CircuitState.HALF_OPEN;
this.consecutiveSuccesses = 0;
this.consecutiveFailures = 0;
this.logger.info('Circuit half-open, testing recovery');
this.emit('half-open', { stats: this.getStats() });
}
/**
* Close the circuit
*/
private close(): void {
this.state = CircuitState.CLOSED;
this.failures = 0;
this.failureTimestamps = [];
this.logger.info('Circuit closed, service recovered');
this.emit('close', { stats: this.getStats() });
}
/**
* Get current statistics
*/
getStats(): CircuitBreakerStats {
return {
state: this.state,
failures: this.failures,
successes: this.successes,
consecutiveSuccesses: this.consecutiveSuccesses,
consecutiveFailures: this.consecutiveFailures,
lastFailureTime: this.lastFailureTime,
lastSuccessTime: this.lastSuccessTime,
totalRequests: this.totalRequests,
rejectedRequests: this.rejectedRequests,
timeoutRequests: this.timeoutRequests,
fallbackRequests: this.fallbackRequests
};
}
/**
* Reset the circuit breaker
*/
reset(): void {
this.state = CircuitState.CLOSED;
this.failures = 0;
this.successes = 0;
this.consecutiveSuccesses = 0;
this.consecutiveFailures = 0;
this.failureTimestamps = [];
this.lastFailureTime = undefined;
this.lastSuccessTime = undefined;
if (this.resetTimer) {
clearTimeout(this.resetTimer);
this.resetTimer = undefined;
}
this.logger.info('Circuit breaker reset');
this.emit('reset', { stats: this.getStats() });
}
/**
* Force the circuit to open
*/
forceOpen(): void {
this.open();
}
/**
* Force the circuit to close
*/
forceClose(): void {
this.close();
}
/**
* Check if circuit is available
*/
isAvailable(): boolean {
return this.state !== CircuitState.OPEN;
}
}
/**
* Factory function to create circuit breakers for different services
*/
export function createCircuitBreaker(
serviceName: string,
options?: Partial<CircuitBreakerOptions>
): CircuitBreaker {
return new CircuitBreaker({
name: serviceName,
...options
});
}
/**
* Circuit breaker manager for managing multiple breakers
*/
export class CircuitBreakerManager {
private breakers = new Map<string, CircuitBreaker>();
private logger = createLogger({ component: 'CircuitBreakerManager' });
/**
* Get or create a circuit breaker for a service
*/
getBreaker(
serviceName: string,
options?: Partial<CircuitBreakerOptions>
): CircuitBreaker {
if (!this.breakers.has(serviceName)) {
const breaker = createCircuitBreaker(serviceName, options);
this.breakers.set(serviceName, breaker);
// Log circuit breaker events
breaker.on('open', ({ stats }) => {
this.logger.warn(`Circuit opened for ${serviceName}`, stats);
});
breaker.on('close', ({ stats }) => {
this.logger.info(`Circuit closed for ${serviceName}`, stats);
});
}
return this.breakers.get(serviceName)!;
}
/**
* Get statistics for all breakers
*/
getAllStats(): Record<string, CircuitBreakerStats> {
const stats: Record<string, CircuitBreakerStats> = {};
for (const [name, breaker] of this.breakers) {
stats[name] = breaker.getStats();
}
return stats;
}
/**
* Reset all circuit breakers
*/
resetAll(): void {
for (const breaker of this.breakers.values()) {
breaker.reset();
}
}
/**
* Check overall health
*/
isHealthy(): boolean {
for (const breaker of this.breakers.values()) {
if (!breaker.isAvailable()) {
return false;
}
}
return true;
}
}
// Export singleton manager
export const circuitBreakerManager = new CircuitBreakerManager();