circuit-breaker.ts•16.2 kB
/**
* Circuit Breaker Pattern Implementation for GEPA
*
* Provides robust failure handling with automatic recovery mechanisms
* for LLM adapter calls, trajectory operations, and other critical services.
*/
import { EventEmitter } from 'events';
import { MemoryLeakIntegration } from '../memory-leak-detector';
/**
* Circuit breaker states
*/
export enum CircuitBreakerState {
CLOSED = 'CLOSED', // Normal operation
OPEN = 'OPEN', // Failure mode - rejecting requests
HALF_OPEN = 'HALF_OPEN' // Testing mode - allowing limited requests
}
/**
* Circuit breaker configuration
*/
export interface CircuitBreakerConfig {
name: string;
failureThreshold: number; // Number of failures before opening
recoveryTimeout: number; // Time before attempting recovery (ms)
successThreshold: number; // Successes needed to close from half-open
timeout: number; // Request timeout (ms)
monitoringWindow: number; // Rolling window for failure tracking (ms)
maxRetries: number; // Maximum retries per request
exponentialBackoff: boolean; // Use exponential backoff
jitterEnabled: boolean; // Add jitter to prevent thundering herd
}
/**
* Circuit breaker metrics
*/
export interface CircuitBreakerMetrics {
state: CircuitBreakerState;
failureCount: number;
successCount: number;
totalRequests: number;
lastFailureTime?: Date;
lastSuccessTime?: Date;
uptime: number;
errorRate: number;
responseTime: {
average: number;
p95: number;
p99: number;
};
}
/**
* Request execution result
*/
interface ExecutionResult<T> {
success: boolean;
result?: T;
error?: Error;
executionTime: number;
retryCount: number;
}
/**
* Failure tracking entry
*/
interface FailureEntry {
timestamp: Date;
error: Error;
executionTime: number;
}
/**
* Circuit Breaker implementation with intelligent failure detection
*/
export class CircuitBreaker extends EventEmitter {
private state: CircuitBreakerState = CircuitBreakerState.CLOSED;
private failureCount = 0;
private successCount = 0;
private totalRequests = 0;
private nextAttempt = 0;
private failures: FailureEntry[] = [];
private responseTimes: number[] = [];
private lastStateChange = Date.now();
constructor(private readonly config: CircuitBreakerConfig) {
super();
this.validateConfig();
this.setupMemoryManagement();
this.startMonitoring();
}
/**
* Execute operation with circuit breaker protection
*/
async execute<R>(operation: () => Promise<R>): Promise<R> {
this.totalRequests++;
// Check if circuit is open
if (this.state === CircuitBreakerState.OPEN) {
if (Date.now() < this.nextAttempt) {
const error = new Error(`Circuit breaker '${this.config.name}' is OPEN. Next attempt in ${this.nextAttempt - Date.now()}ms`);
this.emit('rejected', { name: this.config.name, error });
throw error;
} else {
// Transition to half-open
this.transitionToHalfOpen();
}
}
// Check if we should limit requests in half-open state
if (this.state === CircuitBreakerState.HALF_OPEN && this.successCount >= this.config.successThreshold) {
this.transitionToClosed();
}
return this.executeWithRetry(operation);
}
/**
* Execute operation with retry logic and timeout
*/
private async executeWithRetry<R>(operation: () => Promise<R>): Promise<R> {
let lastError: Error;
let retryCount = 0;
const maxRetries = this.config.maxRetries;
while (retryCount <= maxRetries) {
try {
const result = await this.executeWithTimeout(operation);
this.onSuccess(result.executionTime);
return result.result!;
} catch (error) {
lastError = error as Error;
retryCount++;
if (retryCount <= maxRetries) {
const delay = this.calculateRetryDelay(retryCount);
await this.sleep(delay);
} else {
this.onFailure(lastError, Date.now());
break;
}
}
}
throw lastError!;
}
/**
* Execute operation with timeout protection
*/
private async executeWithTimeout<R>(operation: () => Promise<R>): Promise<ExecutionResult<R>> {
const startTime = Date.now();
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => {
reject(new Error(`Operation timeout after ${this.config.timeout}ms`));
}, this.config.timeout);
});
try {
const result = await Promise.race([operation(), timeoutPromise]);
const executionTime = Date.now() - startTime;
return {
success: true,
result,
executionTime,
retryCount: 0
};
} catch (error) {
const executionTime = Date.now() - startTime;
return {
success: false,
error: error as Error,
executionTime,
retryCount: 0
};
}
}
/**
* Handle successful operation
*/
private onSuccess(executionTime: number): void {
this.successCount++;
this.responseTimes.push(executionTime);
this.cleanupMetrics();
if (this.state === CircuitBreakerState.HALF_OPEN &&
this.successCount >= this.config.successThreshold) {
this.transitionToClosed();
}
this.emit('success', {
name: this.config.name,
executionTime,
state: this.state
});
}
/**
* Handle failed operation
*/
private onFailure(error: Error, executionTime: number): void {
const failure: FailureEntry = {
timestamp: new Date(),
error,
executionTime
};
this.failures.push(failure);
this.failureCount++;
this.cleanupMetrics();
// Check if we should open the circuit
if (this.shouldOpenCircuit()) {
this.transitionToOpen();
}
this.emit('failure', {
name: this.config.name,
error,
executionTime,
state: this.state,
failureCount: this.failureCount
});
}
/**
* Determine if circuit should be opened
*/
private shouldOpenCircuit(): boolean {
if (this.state === CircuitBreakerState.OPEN) {
return false;
}
// Check failure threshold
if (this.failureCount >= this.config.failureThreshold) {
return true;
}
// Check failure rate within monitoring window
const recentFailures = this.getRecentFailures();
const failureRate = recentFailures.length / Math.max(this.totalRequests, 1);
return failureRate > 0.5; // Open if >50% failure rate
}
/**
* Get recent failures within monitoring window
*/
private getRecentFailures(): FailureEntry[] {
const cutoff = Date.now() - this.config.monitoringWindow;
return this.failures.filter(f => f.timestamp.getTime() > cutoff);
}
/**
* Transition to OPEN state
*/
private transitionToOpen(): void {
this.state = CircuitBreakerState.OPEN;
this.nextAttempt = Date.now() + this.config.recoveryTimeout;
this.lastStateChange = Date.now();
this.emit('stateChange', {
name: this.config.name,
from: 'CLOSED',
to: 'OPEN',
reason: `Failure threshold reached: ${this.failureCount}/${this.config.failureThreshold}`
});
}
/**
* Transition to HALF_OPEN state
*/
private transitionToHalfOpen(): void {
this.state = CircuitBreakerState.HALF_OPEN;
this.successCount = 0;
this.lastStateChange = Date.now();
this.emit('stateChange', {
name: this.config.name,
from: 'OPEN',
to: 'HALF_OPEN',
reason: 'Recovery timeout elapsed, testing circuit'
});
}
/**
* Transition to CLOSED state
*/
private transitionToClosed(): void {
this.state = CircuitBreakerState.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.failures = [];
this.lastStateChange = Date.now();
this.emit('stateChange', {
name: this.config.name,
from: 'HALF_OPEN',
to: 'CLOSED',
reason: `Success threshold reached: ${this.successCount}/${this.config.successThreshold}`
});
}
/**
* Calculate retry delay with exponential backoff and jitter
*/
private calculateRetryDelay(retryCount: number): number {
let delay = 1000; // Base delay 1 second
if (this.config.exponentialBackoff) {
delay = Math.min(delay * Math.pow(2, retryCount - 1), 30000); // Max 30 seconds
}
if (this.config.jitterEnabled) {
// Add ±25% jitter
const jitter = delay * 0.25 * (Math.random() - 0.5) * 2;
delay += jitter;
}
return Math.max(delay, 100); // Minimum 100ms
}
/**
* Get current circuit breaker metrics
*/
getMetrics(): CircuitBreakerMetrics {
const recentFailures = this.getRecentFailures();
const uptime = Date.now() - this.lastStateChange;
const errorRate = this.totalRequests > 0 ? recentFailures.length / this.totalRequests : 0;
const metrics: CircuitBreakerMetrics = {
state: this.state,
failureCount: this.failureCount,
successCount: this.successCount,
totalRequests: this.totalRequests,
uptime,
errorRate,
responseTime: this.calculateResponseTimeMetrics()
};
// Only add optional properties if they have values
if (recentFailures.length > 0) {
const lastFailure = recentFailures[recentFailures.length - 1];
if (lastFailure) {
metrics.lastFailureTime = lastFailure.timestamp;
}
}
if (this.successCount > 0) {
metrics.lastSuccessTime = new Date();
}
return metrics;
}
/**
* Calculate response time metrics
*/
private calculateResponseTimeMetrics() {
if (this.responseTimes.length === 0) {
return { average: 0, p95: 0, p99: 0 };
}
const sorted = [...this.responseTimes].sort((a, b) => a - b);
const average = sorted.reduce((sum, time) => sum + time, 0) / sorted.length;
const p95Index = Math.floor(sorted.length * 0.95);
const p99Index = Math.floor(sorted.length * 0.99);
return {
average: Math.round(average),
p95: sorted[p95Index] || 0,
p99: sorted[p99Index] || 0
};
}
/**
* Reset circuit breaker to initial state
*/
reset(): void {
this.state = CircuitBreakerState.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.totalRequests = 0;
this.failures = [];
this.responseTimes = [];
this.lastStateChange = Date.now();
this.emit('reset', { name: this.config.name });
}
/**
* Force circuit breaker state (for testing)
*/
forceState(state: CircuitBreakerState): void {
const oldState = this.state;
this.state = state;
this.lastStateChange = Date.now();
this.emit('stateChange', {
name: this.config.name,
from: oldState,
to: state,
reason: 'Forced state change'
});
}
/**
* Clean up old metrics to prevent memory leaks
*/
private cleanupMetrics(): void {
const cutoff = Date.now() - this.config.monitoringWindow;
// Clean up old failures
this.failures = this.failures.filter(f => f.timestamp.getTime() > cutoff);
// Keep only recent response times (last 100)
if (this.responseTimes.length > 100) {
this.responseTimes = this.responseTimes.slice(-100);
}
}
/**
* Validate configuration
*/
private validateConfig(): void {
if (!this.config.name) {
throw new Error('Circuit breaker name is required');
}
if (this.config.failureThreshold <= 0) {
throw new Error('Failure threshold must be positive');
}
if (this.config.recoveryTimeout <= 0) {
throw new Error('Recovery timeout must be positive');
}
if (this.config.successThreshold <= 0) {
throw new Error('Success threshold must be positive');
}
if (this.config.timeout <= 0) {
throw new Error('Timeout must be positive');
}
}
/**
* Setup memory management integration
*/
private setupMemoryManagement(): void {
MemoryLeakIntegration.initialize();
// Track circuit breaker creation
MemoryLeakIntegration.trackCircuitBreaker('create', this.config.name, this.estimateMemoryUsage());
}
/**
* Start monitoring for memory and performance
*/
private startMonitoring(): void {
// Periodic cleanup
setInterval(() => {
this.cleanupMetrics();
// Track memory usage
const memoryUsage = this.estimateMemoryUsage();
MemoryLeakIntegration.trackCircuitBreaker('monitor', this.config.name, memoryUsage);
}, 60000); // Every minute
}
/**
* Estimate memory usage of circuit breaker
*/
private estimateMemoryUsage(): number {
const base = 1024; // Base object size
const failures = this.failures.length * 200; // Approximate size per failure
const responseTimes = this.responseTimes.length * 8; // Number array
return base + failures + responseTimes;
}
/**
* Sleep utility
*/
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Cleanup resources
*/
async cleanup(): Promise<void> {
this.removeAllListeners();
this.failures = [];
this.responseTimes = [];
// Track cleanup
MemoryLeakIntegration.trackCircuitBreaker('cleanup', this.config.name, 0);
}
}
/**
* Circuit breaker factory for creating pre-configured instances
*/
export class CircuitBreakerFactory {
private static instances = new Map<string, CircuitBreaker>();
/**
* Create LLM adapter circuit breaker
*/
static createLLMCircuitBreaker(name = 'llm-adapter'): CircuitBreaker {
if (this.instances.has(name)) {
return this.instances.get(name)!;
}
const config: CircuitBreakerConfig = {
name,
failureThreshold: 5, // 5 failures
recoveryTimeout: 30000, // 30 seconds
successThreshold: 3, // 3 successes to close
timeout: 300000, // 5 minutes (for LLM calls)
monitoringWindow: 300000, // 5 minute window
maxRetries: 3,
exponentialBackoff: true,
jitterEnabled: true
};
const breaker = new CircuitBreaker(config);
this.instances.set(name, breaker);
return breaker;
}
/**
* Create trajectory store circuit breaker
*/
static createTrajectoryCircuitBreaker(name = 'trajectory-store'): CircuitBreaker {
if (this.instances.has(name)) {
return this.instances.get(name)!;
}
const config: CircuitBreakerConfig = {
name,
failureThreshold: 10, // 10 failures (database more tolerant)
recoveryTimeout: 10000, // 10 seconds
successThreshold: 2, // 2 successes to close
timeout: 30000, // 30 seconds
monitoringWindow: 120000, // 2 minute window
maxRetries: 2,
exponentialBackoff: true,
jitterEnabled: true
};
const breaker = new CircuitBreaker(config);
this.instances.set(name, breaker);
return breaker;
}
/**
* Create generic service circuit breaker
*/
static createServiceCircuitBreaker(
name: string,
overrides: Partial<CircuitBreakerConfig> = {}
): CircuitBreaker {
if (this.instances.has(name)) {
return this.instances.get(name)!;
}
const config: CircuitBreakerConfig = {
name,
failureThreshold: 5,
recoveryTimeout: 15000, // 15 seconds
successThreshold: 2,
timeout: 30000, // 30 seconds
monitoringWindow: 180000, // 3 minute window
maxRetries: 2,
exponentialBackoff: true,
jitterEnabled: true,
...overrides
};
const breaker = new CircuitBreaker(config);
this.instances.set(name, breaker);
return breaker;
}
/**
* Get all circuit breaker instances
*/
static getAllInstances(): Map<string, CircuitBreaker> {
return new Map(this.instances);
}
/**
* Cleanup all circuit breakers
*/
static async cleanup(): Promise<void> {
const cleanupPromises = Array.from(this.instances.values()).map(breaker => breaker.cleanup());
await Promise.all(cleanupPromises);
this.instances.clear();
}
}