timeout-manager.ts•19.3 kB
/**
* Timeout Manager for GEPA Resilience
*
* Provides configurable timeouts, cascading timeout prevention,
* timeout escalation policies, and emergency abort mechanisms.
*/
import { EventEmitter } from 'events';
import { MemoryLeakIntegration } from '../memory-leak-detector';
/**
* Timeout configuration for different operation types
*/
export interface TimeoutConfig {
name: string;
baseTimeout: number; // Base timeout in milliseconds
maxTimeout: number; // Maximum timeout allowed
escalationFactor: number; // Multiplier for timeout escalation
maxEscalations: number; // Maximum number of escalations
cascadePreventionEnabled: boolean; // Prevent timeout cascades
emergencyAbortEnabled: boolean; // Enable emergency abort
emergencyAbortThreshold: number; // Number of concurrent operations before abort
warningThreshold: number; // Percentage of timeout before warning (0-1)
}
/**
* Active timeout tracking
*/
interface TimeoutTracker {
id: string;
operationName: string;
startTime: number;
timeout: number;
escalationCount: number;
abortController: AbortController;
warningEmitted: boolean;
parentTracker?: TimeoutTracker;
childTrackers: Set<TimeoutTracker>;
}
/**
* Timeout escalation policy
*/
interface EscalationPolicy {
condition: (tracker: TimeoutTracker, currentTime: number) => boolean;
action: (tracker: TimeoutTracker) => Promise<void>;
priority: number;
}
/**
* Operation context for timeout decisions
*/
export interface OperationContext {
name: string;
priority: 'low' | 'medium' | 'high' | 'critical';
canAbort: boolean;
parentOperation?: string;
metadata?: Record<string, any>;
}
/**
* Timeout event data
*/
export interface TimeoutEvent {
id: string;
operation: string;
timeout: number;
elapsed: number;
escalationCount: number;
reason: 'timeout' | 'escalation' | 'emergency_abort' | 'cascade_prevention';
}
/**
* Timeout Manager implementation
*/
export class TimeoutManager extends EventEmitter {
private static instance: TimeoutManager;
private configs = new Map<string, TimeoutConfig>();
private activeTimeouts = new Map<string, TimeoutTracker>();
private escalationPolicies: EscalationPolicy[] = [];
private monitoringInterval?: ReturnType<typeof setInterval>;
private cascadeDetectionWindow = 5000; // 5 seconds
constructor() {
super();
this.setupDefaultConfigs();
this.setupEscalationPolicies();
this.startMonitoring();
this.setupMemoryManagement();
}
/**
* Get singleton instance
*/
static getInstance(): TimeoutManager {
if (!this.instance) {
this.instance = new TimeoutManager();
}
return this.instance;
}
/**
* Execute operation with timeout protection
*/
async executeWithTimeout<T>(
operation: (abortSignal?: AbortSignal) => Promise<T>,
configName: string,
context?: OperationContext
): Promise<T> {
const config = this.getConfig(configName);
const tracker = this.createTracker(config, context);
try {
// Check for cascading timeouts
if (config.cascadePreventionEnabled && this.detectCascade(tracker)) {
throw new Error('Cascading timeout detected - operation prevented');
}
// Check emergency abort conditions
if (config.emergencyAbortEnabled && this.shouldEmergencyAbort(config)) {
throw new Error('Emergency abort triggered - system overloaded');
}
return await this.executeWithTimeoutInternal(operation, tracker);
} finally {
this.removeTracker(tracker.id);
}
}
/**
* Internal execution with timeout handling
*/
private async executeWithTimeoutInternal<T>(
operation: (abortSignal?: AbortSignal) => Promise<T>,
tracker: TimeoutTracker
): Promise<T> {
this.activeTimeouts.set(tracker.id, tracker);
// Create timeout promise
const timeoutPromise = new Promise<never>((_, reject) => {
const timeoutHandle = setTimeout(() => {
if (!tracker.abortController.signal.aborted) {
const event: TimeoutEvent = {
id: tracker.id,
operation: tracker.operationName,
timeout: tracker.timeout,
elapsed: Date.now() - tracker.startTime,
escalationCount: tracker.escalationCount,
reason: 'timeout'
};
this.emit('timeout', event);
tracker.abortController.abort();
reject(new Error(`Operation '${tracker.operationName}' timed out after ${tracker.timeout}ms`));
}
}, tracker.timeout);
// Cleanup timeout if operation completes
tracker.abortController.signal.addEventListener('abort', () => {
clearTimeout(timeoutHandle);
});
});
// Create warning promise
const warningTimeout = tracker.timeout * 0.8; // 80% of timeout
void new Promise<void>((resolve) => {
setTimeout(() => {
if (!tracker.warningEmitted && !tracker.abortController.signal.aborted) {
tracker.warningEmitted = true;
this.emit('timeoutWarning', {
id: tracker.id,
operation: tracker.operationName,
elapsed: Date.now() - tracker.startTime,
remaining: tracker.timeout - (Date.now() - tracker.startTime)
});
}
resolve();
}, warningTimeout);
});
// Execute operation with timeout race
try {
const result = await Promise.race([
operation(tracker.abortController.signal),
timeoutPromise
]);
// Operation completed successfully
this.emit('operationComplete', {
id: tracker.id,
operation: tracker.operationName,
elapsed: Date.now() - tracker.startTime
});
return result;
} catch (error) {
// Handle operation errors
if (tracker.abortController.signal.aborted) {
// Check if we should escalate
if (await this.shouldEscalate(tracker)) {
return await this.escalateTimeout(operation, tracker);
}
}
throw error;
}
}
/**
* Escalate timeout for critical operations
*/
private async escalateTimeout<T>(
operation: (abortSignal?: AbortSignal) => Promise<T>,
tracker: TimeoutTracker
): Promise<T> {
const config = this.configs.get(tracker.operationName.split('-')[0] || 'generic');
if (!config || tracker.escalationCount >= config.maxEscalations) {
throw new Error(`Maximum escalations reached for ${tracker.operationName}`);
}
// Create escalated tracker
const escalatedTracker = {
...tracker,
id: this.generateId(),
timeout: Math.min(tracker.timeout * config.escalationFactor, config.maxTimeout),
escalationCount: tracker.escalationCount + 1,
abortController: new AbortController(),
warningEmitted: false
};
this.emit('timeoutEscalation', {
id: escalatedTracker.id,
operation: escalatedTracker.operationName,
timeout: escalatedTracker.timeout,
elapsed: Date.now() - escalatedTracker.startTime,
escalationCount: escalatedTracker.escalationCount,
reason: 'escalation'
});
// Execute with escalated timeout
return await this.executeWithTimeoutInternal(operation, escalatedTracker);
}
/**
* Determine if operation should be escalated
*/
private async shouldEscalate(tracker: TimeoutTracker): Promise<boolean> {
const config = this.configs.get(tracker.operationName.split('-')[0] || 'generic');
if (!config) return false;
// Check escalation count
if (tracker.escalationCount >= config.maxEscalations) {
return false;
}
// Apply escalation policies
for (const policy of this.escalationPolicies.sort((a, b) => a.priority - b.priority)) {
if (policy.condition(tracker, Date.now())) {
await policy.action(tracker);
return true;
}
}
return false;
}
/**
* Detect cascading timeouts
*/
private detectCascade(_tracker: TimeoutTracker): boolean {
const recentTimeouts = Array.from(this.activeTimeouts.values())
.filter(t => Date.now() - t.startTime < this.cascadeDetectionWindow)
.length;
// If more than 5 operations timed out in the last 5 seconds, consider it a cascade
return recentTimeouts > 5;
}
/**
* Check if emergency abort should be triggered
*/
private shouldEmergencyAbort(config: TimeoutConfig): boolean {
if (!config.emergencyAbortEnabled) return false;
const activeCount = this.activeTimeouts.size;
return activeCount >= config.emergencyAbortThreshold;
}
/**
* Create operation tracker
*/
private createTracker(config: TimeoutConfig, context?: OperationContext): TimeoutTracker {
const id = this.generateId();
const operationName = context?.name || config.name;
const tracker: TimeoutTracker = {
id,
operationName,
startTime: Date.now(),
timeout: this.calculateTimeout(config, context),
escalationCount: 0,
abortController: new AbortController(),
warningEmitted: false,
childTrackers: new Set()
};
// Link to parent if specified
if (context?.parentOperation) {
const parentTracker = Array.from(this.activeTimeouts.values())
.find(t => t.operationName === context.parentOperation);
if (parentTracker) {
tracker.parentTracker = parentTracker;
parentTracker.childTrackers.add(tracker);
}
}
return tracker;
}
/**
* Calculate effective timeout based on context
*/
private calculateTimeout(config: TimeoutConfig, context?: OperationContext): number {
let timeout = config.baseTimeout;
// Adjust based on priority
if (context?.priority) {
switch (context.priority) {
case 'critical':
timeout *= 2; // Allow more time for critical operations
break;
case 'high':
timeout *= 1.5;
break;
case 'low':
timeout *= 0.7; // Less time for low priority
break;
}
}
return Math.min(timeout, config.maxTimeout);
}
/**
* Remove tracker and cleanup
*/
private removeTracker(id: string): void {
const tracker = this.activeTimeouts.get(id);
if (tracker) {
// Cleanup child trackers
for (const child of tracker.childTrackers) {
child.abortController.abort();
this.activeTimeouts.delete(child.id);
}
// Remove from parent
if (tracker.parentTracker) {
tracker.parentTracker.childTrackers.delete(tracker);
}
// Cleanup
tracker.abortController.abort();
this.activeTimeouts.delete(id);
}
}
/**
* Register timeout configuration
*/
registerConfig(config: TimeoutConfig): void {
this.validateConfig(config);
this.configs.set(config.name, config);
this.emit('configRegistered', { name: config.name });
}
/**
* Get timeout configuration
*/
getConfig(name: string): TimeoutConfig {
const config = this.configs.get(name);
if (!config) {
throw new Error(`Timeout configuration '${name}' not found`);
}
return config;
}
/**
* Get active timeout statistics
*/
getActiveTimeouts(): Map<string, any> {
const stats = new Map();
const now = Date.now();
for (const [id, tracker] of this.activeTimeouts) {
stats.set(id, {
operation: tracker.operationName,
elapsed: now - tracker.startTime,
timeout: tracker.timeout,
escalationCount: tracker.escalationCount,
progress: (now - tracker.startTime) / tracker.timeout
});
}
return stats;
}
/**
* Emergency abort all operations
*/
emergencyAbort(reason: string): void {
this.emit('emergencyAbort', { reason, activeOperations: this.activeTimeouts.size });
// Abort all active operations
for (const tracker of this.activeTimeouts.values()) {
tracker.abortController.abort();
}
// Clear all trackers
this.activeTimeouts.clear();
// Global abort completed
}
/**
* Setup default timeout configurations
*/
private setupDefaultConfigs(): void {
// LLM Adapter Configuration
this.registerConfig({
name: 'llm-adapter',
baseTimeout: 300000, // 5 minutes base
maxTimeout: 600000, // 10 minutes max
escalationFactor: 1.5,
maxEscalations: 2,
cascadePreventionEnabled: true,
emergencyAbortEnabled: true,
emergencyAbortThreshold: 10,
warningThreshold: 0.8
});
// Trajectory Store Configuration
this.registerConfig({
name: 'trajectory-store',
baseTimeout: 30000, // 30 seconds base
maxTimeout: 120000, // 2 minutes max
escalationFactor: 2,
maxEscalations: 1,
cascadePreventionEnabled: true,
emergencyAbortEnabled: true,
emergencyAbortThreshold: 20,
warningThreshold: 0.7
});
// Pareto Frontier Configuration
this.registerConfig({
name: 'pareto-frontier',
baseTimeout: 60000, // 1 minute base
maxTimeout: 300000, // 5 minutes max
escalationFactor: 1.5,
maxEscalations: 1,
cascadePreventionEnabled: false,
emergencyAbortEnabled: true,
emergencyAbortThreshold: 5,
warningThreshold: 0.8
});
// Reflection Engine Configuration
this.registerConfig({
name: 'reflection-engine',
baseTimeout: 120000, // 2 minutes base
maxTimeout: 300000, // 5 minutes max
escalationFactor: 1.5,
maxEscalations: 1,
cascadePreventionEnabled: true,
emergencyAbortEnabled: true,
emergencyAbortThreshold: 8,
warningThreshold: 0.75
});
// Generic Configuration
this.registerConfig({
name: 'generic',
baseTimeout: 30000, // 30 seconds base
maxTimeout: 120000, // 2 minutes max
escalationFactor: 2,
maxEscalations: 1,
cascadePreventionEnabled: true,
emergencyAbortEnabled: true,
emergencyAbortThreshold: 15,
warningThreshold: 0.8
});
}
/**
* Setup escalation policies
*/
private setupEscalationPolicies(): void {
// Critical operation escalation
this.escalationPolicies.push({
condition: (tracker, _currentTime) => {
return tracker.operationName.includes('critical') &&
tracker.escalationCount < 2;
},
action: async (tracker) => {
this.emit('criticalEscalation', {
operation: tracker.operationName,
escalationCount: tracker.escalationCount
});
},
priority: 1
});
// High-priority operation escalation
this.escalationPolicies.push({
condition: (tracker, _currentTime) => {
return tracker.operationName.includes('high') &&
tracker.escalationCount < 1;
},
action: async (tracker) => {
this.emit('priorityEscalation', {
operation: tracker.operationName,
escalationCount: tracker.escalationCount
});
},
priority: 2
});
}
/**
* Start monitoring active timeouts
*/
private startMonitoring(): void {
this.monitoringInterval = setInterval(() => {
this.checkTimeoutHealth();
}, 5000); // Check every 5 seconds
}
/**
* Check health of active timeouts
*/
private checkTimeoutHealth(): void {
const warnings = [];
const longRunning = [];
const now = Date.now();
for (const tracker of this.activeTimeouts.values()) {
const elapsed = now - tracker.startTime;
const progress = elapsed / tracker.timeout;
// Check for warning threshold
if (progress > 0.8 && !tracker.warningEmitted) {
warnings.push(tracker);
}
// Check for long-running operations
if (elapsed > 300000) { // 5 minutes
longRunning.push(tracker);
}
}
if (warnings.length > 0 || longRunning.length > 0) {
this.emit('healthCheck', {
warnings: warnings.length,
longRunning: longRunning.length,
total: this.activeTimeouts.size
});
}
}
/**
* Validate timeout configuration
*/
private validateConfig(config: TimeoutConfig): void {
if (!config.name) {
throw new Error('Timeout config name is required');
}
if (config.baseTimeout <= 0) {
throw new Error('Base timeout must be positive');
}
if (config.maxTimeout <= config.baseTimeout) {
throw new Error('Max timeout must be greater than base timeout');
}
if (config.escalationFactor <= 1) {
throw new Error('Escalation factor must be greater than 1');
}
if (config.maxEscalations < 0) {
throw new Error('Max escalations must be non-negative');
}
}
/**
* Generate unique operation ID
*/
private generateId(): string {
return `timeout-${Date.now()}-${Math.random().toString(36).substring(2)}`;
}
/**
* Setup memory management
*/
private setupMemoryManagement(): void {
MemoryLeakIntegration.initialize();
// Periodic cleanup
setInterval(() => {
this.cleanupStaleTimeouts();
}, 60000); // Every minute
}
/**
* Cleanup stale timeouts
*/
private cleanupStaleTimeouts(): void {
const now = Date.now();
const staleThreshold = 3600000; // 1 hour
for (const [id, tracker] of this.activeTimeouts) {
if (now - tracker.startTime > staleThreshold) {
this.removeTracker(id);
}
}
}
/**
* Cleanup resources
*/
async cleanup(): Promise<void> {
if (this.monitoringInterval) {
clearInterval(this.monitoringInterval);
}
// Abort all active operations
this.emergencyAbort('Cleanup initiated');
this.removeAllListeners();
this.configs.clear();
this.escalationPolicies = [];
}
}
/**
* Timeout helper utilities
*/
export class TimeoutHelper {
private static timeoutManager = TimeoutManager.getInstance();
/**
* Execute LLM operation with timeout
*/
static async withLLMTimeout<T>(
operation: (abortSignal?: AbortSignal) => Promise<T>,
context?: OperationContext
): Promise<T> {
return this.timeoutManager.executeWithTimeout(operation, 'llm-adapter', context);
}
/**
* Execute database operation with timeout
*/
static async withDatabaseTimeout<T>(
operation: (abortSignal?: AbortSignal) => Promise<T>,
context?: OperationContext
): Promise<T> {
return this.timeoutManager.executeWithTimeout(operation, 'trajectory-store', context);
}
/**
* Execute computation with timeout
*/
static async withComputationTimeout<T>(
operation: (abortSignal?: AbortSignal) => Promise<T>,
context?: OperationContext
): Promise<T> {
return this.timeoutManager.executeWithTimeout(operation, 'pareto-frontier', context);
}
/**
* Generic timeout wrapper
*/
static async withTimeout<T>(
operation: (abortSignal?: AbortSignal) => Promise<T>,
context?: OperationContext
): Promise<T> {
return this.timeoutManager.executeWithTimeout(operation, 'generic', context);
}
}