retry-manager.ts•16.7 kB
/**
 * Retry Manager for GEPA Resilience
 * 
 * Provides intelligent retry mechanisms with exponential backoff,
 * jittered delays, context-aware policies, and circuit breaker integration.
 */
import { EventEmitter } from 'events';
import { CircuitBreaker } from './circuit-breaker';
import { MemoryLeakIntegration } from '../memory-leak-detector';
/**
 * Retry policy configuration
 */
export interface RetryPolicy {
  name: string;
  maxRetries: number;
  baseDelay: number;                    // Base delay in ms
  maxDelay: number;                     // Maximum delay in ms
  backoffMultiplier: number;            // Exponential backoff multiplier
  jitterEnabled: boolean;               // Add randomization to prevent thundering herd
  jitterMax: number;                    // Maximum jitter percentage (0-1)
  retryableErrors: string[];            // Error types/messages that should trigger retry
  nonRetryableErrors: string[];         // Error types that should never retry
  contextualRules: ContextualRule[];    // Context-specific retry rules
}
/**
 * Contextual retry rules based on operation type
 */
export interface ContextualRule {
  context: string;                      // Operation context (e.g., 'llm-call', 'db-query')
  condition: (error: Error, attempt: number) => boolean;
  overrides: Partial<RetryPolicy>;     // Policy overrides for this context
}
/**
 * Retry attempt information
 */
export interface RetryAttempt {
  attemptNumber: number;
  totalAttempts: number;
  delay: number;
  error: Error;
  context?: string | undefined;
  timestamp: Date;
}
/**
 * Retry result
 */
export interface RetryResult<T> {
  success: boolean;
  result?: T;
  finalError?: Error;
  attempts: RetryAttempt[];
  totalTime: number;
  circuitBreakerTriggered: boolean;
}
/**
 * Operation context for retry decisions
 */
export interface OperationContext {
  name: string;
  priority: 'low' | 'medium' | 'high' | 'critical';
  timeout?: number;
  metadata?: Record<string, any>;
}
/**
 * Retry Manager implementation with intelligent backoff and context awareness
 */
export class RetryManager extends EventEmitter {
  private static instance: RetryManager;
  private policies = new Map<string, RetryPolicy>();
  private activeRetries = new Set<string>();
  private retryStats = new Map<string, {
    totalAttempts: number;
    successfulRetries: number;
    failedRetries: number;
    averageDelay: number;
  }>();
  constructor() {
    super();
    this.setupDefaultPolicies();
    this.setupMemoryManagement();
  }
  /**
   * Get singleton instance
   */
  static getInstance(): RetryManager {
    if (!this.instance) {
      this.instance = new RetryManager();
    }
    return this.instance;
  }
  /**
   * Execute operation with retry logic
   */
  async executeWithRetry<T>(
    operation: () => Promise<T>,
    policyName: string,
    context?: OperationContext,
    circuitBreaker?: CircuitBreaker
  ): Promise<T> {
    const policy = this.getPolicy(policyName);
    const operationId = this.generateOperationId(context?.name || 'unknown');
    const startTime = Date.now();
    
    this.activeRetries.add(operationId);
    
    try {
      const result = await this.performRetries(
        operation,
        policy,
        context,
        circuitBreaker,
        operationId
      );
      
      this.updateStats(policyName, result.attempts.length, true, Date.now() - startTime);
      
      if (!result.success) {
        throw result.finalError || new Error('Retry attempts exhausted');
      }
      
      return result.result!;
    } finally {
      this.activeRetries.delete(operationId);
    }
  }
  /**
   * Perform retry attempts with intelligent backoff
   */
  private async performRetries<T>(
    operation: () => Promise<T>,
    policy: RetryPolicy,
    context?: OperationContext,
    circuitBreaker?: CircuitBreaker,
    operationId?: string
  ): Promise<RetryResult<T>> {
    const attempts: RetryAttempt[] = [];
    const startTime = Date.now();
    let lastError: Error;
    let circuitBreakerTriggered = false;
    // Apply contextual overrides if applicable
    const effectivePolicy = this.applyContextualRules(policy, context);
    for (let attempt = 1; attempt <= effectivePolicy.maxRetries + 1; attempt++) {
      try {
        // Execute through circuit breaker if provided
        const result = circuitBreaker 
          ? await circuitBreaker.execute(operation)
          : await operation();
        // Success - return result
        return {
          success: true,
          result,
          attempts,
          totalTime: Date.now() - startTime,
          circuitBreakerTriggered
        };
      } catch (error) {
        lastError = error as Error;
        
        // Check if circuit breaker is open
        if (lastError.message.includes('Circuit breaker') && lastError.message.includes('OPEN')) {
          circuitBreakerTriggered = true;
          break;
        }
        // Record attempt
        const retryAttempt: RetryAttempt = {
          attemptNumber: attempt,
          totalAttempts: effectivePolicy.maxRetries + 1,
          delay: 0,
          error: lastError,
          context: context?.name || undefined,
          timestamp: new Date()
        };
        attempts.push(retryAttempt);
        // Check if we should retry this error
        if (!this.shouldRetry(lastError, attempt - 1, effectivePolicy, context)) {
          break;
        }
        // Don't delay after the last attempt
        if (attempt <= effectivePolicy.maxRetries) {
          const delay = this.calculateDelay(attempt - 1, effectivePolicy);
          retryAttempt.delay = delay;
          
          this.emit('retryAttempt', {
            operationId,
            attempt,
            delay,
            error: lastError,
            context: context?.name
          });
          await this.sleep(delay);
        }
      }
    }
    return {
      success: false,
      finalError: lastError!,
      attempts,
      totalTime: Date.now() - startTime,
      circuitBreakerTriggered
    };
  }
  /**
   * Determine if an error should trigger a retry
   */
  private shouldRetry(
    error: Error,
    attemptNumber: number,
    policy: RetryPolicy,
    context?: OperationContext
  ): boolean {
    // Check maximum retries
    if (attemptNumber >= policy.maxRetries) {
      return false;
    }
    // Check non-retryable errors first
    for (const nonRetryable of policy.nonRetryableErrors) {
      if (this.errorMatches(error, nonRetryable)) {
        return false;
      }
    }
    // Check retryable errors
    for (const retryable of policy.retryableErrors) {
      if (this.errorMatches(error, retryable)) {
        return true;
      }
    }
    // Check contextual rules
    for (const rule of policy.contextualRules) {
      if (context?.name === rule.context && rule.condition(error, attemptNumber)) {
        return true;
      }
    }
    // Default: retry transient errors
    return this.isTransientError(error);
  }
  /**
   * Check if error matches pattern
   */
  private errorMatches(error: Error, pattern: string): boolean {
    const errorString = error.message.toLowerCase();
    const patternLower = pattern.toLowerCase();
    
    // Support both exact matches and patterns
    return errorString.includes(patternLower) || 
           error.name.toLowerCase().includes(patternLower) ||
           error.constructor.name.toLowerCase().includes(patternLower);
  }
  /**
   * Determine if error is transient (likely to succeed on retry)
   */
  private isTransientError(error: Error): boolean {
    const transientPatterns = [
      'timeout',
      'econnreset',
      'enotfound',
      'network',
      'temporary',
      'rate limit',
      'throttle',
      '502',
      '503',
      '504',
      'gateway'
    ];
    const errorString = error.message.toLowerCase();
    return transientPatterns.some(pattern => errorString.includes(pattern));
  }
  /**
   * Calculate retry delay with exponential backoff and jitter
   */
  private calculateDelay(attemptNumber: number, policy: RetryPolicy): number {
    // Base exponential backoff
    let delay = policy.baseDelay * Math.pow(policy.backoffMultiplier, attemptNumber);
    
    // Apply maximum delay limit
    delay = Math.min(delay, policy.maxDelay);
    // Add jitter if enabled
    if (policy.jitterEnabled) {
      const jitterRange = delay * policy.jitterMax;
      const jitter = (Math.random() - 0.5) * 2 * jitterRange;
      delay += jitter;
    }
    return Math.max(delay, 100); // Minimum 100ms delay
  }
  /**
   * Apply contextual rules to policy
   */
  private applyContextualRules(
    basePolicy: RetryPolicy,
    context?: OperationContext
  ): RetryPolicy {
    if (!context) {
      return basePolicy;
    }
    // Find matching contextual rule
    const matchingRule = basePolicy.contextualRules.find(rule => 
      rule.context === context.name
    );
    if (!matchingRule) {
      return basePolicy;
    }
    // Apply overrides
    return {
      ...basePolicy,
      ...matchingRule.overrides
    };
  }
  /**
   * Register a new retry policy
   */
  registerPolicy(policy: RetryPolicy): void {
    this.validatePolicy(policy);
    this.policies.set(policy.name, policy);
    this.emit('policyRegistered', { name: policy.name });
  }
  /**
   * Get retry policy by name
   */
  getPolicy(name: string): RetryPolicy {
    const policy = this.policies.get(name);
    if (!policy) {
      throw new Error(`Retry policy '${name}' not found`);
    }
    return policy;
  }
  /**
   * Get all available policies
   */
  getPolicies(): Map<string, RetryPolicy> {
    return new Map(this.policies);
  }
  /**
   * Get retry statistics
   */
  getRetryStats(): Map<string, any> {
    return new Map(this.retryStats);
  }
  /**
   * Clear statistics
   */
  clearStats(): void {
    this.retryStats.clear();
    this.emit('statsClear');
  }
  /**
   * Setup default retry policies
   */
  private setupDefaultPolicies(): void {
    // LLM Adapter Policy - More tolerant for API calls
    this.registerPolicy({
      name: 'llm-adapter',
      maxRetries: 3,
      baseDelay: 2000,
      maxDelay: 30000,
      backoffMultiplier: 2,
      jitterEnabled: true,
      jitterMax: 0.3,
      retryableErrors: [
        'timeout',
        'rate limit',
        'throttle',
        '502',
        '503',
        '504',
        'gateway',
        'network'
      ],
      nonRetryableErrors: [
        'authentication',
        'unauthorized',
        '401',
        '403',
        'invalid prompt',
        'malformed request'
      ],
      contextualRules: [
        {
          context: 'evaluation',
          condition: (error, attempt) => attempt < 2 && error.message.includes('timeout'),
          overrides: {
            maxRetries: 5,
            maxDelay: 60000
          }
        }
      ]
    });
    // Trajectory Store Policy - Fast retries for database operations
    this.registerPolicy({
      name: 'trajectory-store',
      maxRetries: 2,
      baseDelay: 500,
      maxDelay: 5000,
      backoffMultiplier: 2,
      jitterEnabled: true,
      jitterMax: 0.2,
      retryableErrors: [
        'connection',
        'timeout',
        'lock',
        'deadlock',
        'busy',
        'temporary'
      ],
      nonRetryableErrors: [
        'constraint',
        'foreign key',
        'unique',
        'syntax error',
        'permission'
      ],
      contextualRules: []
    });
    // Pareto Frontier Policy - Conservative retries for optimization
    this.registerPolicy({
      name: 'pareto-frontier',
      maxRetries: 2,
      baseDelay: 1000,
      maxDelay: 10000,
      backoffMultiplier: 2,
      jitterEnabled: true,
      jitterMax: 0.25,
      retryableErrors: [
        'memory',
        'computation',
        'overflow',
        'temporary'
      ],
      nonRetryableErrors: [
        'invalid candidate',
        'malformed data',
        'configuration error'
      ],
      contextualRules: []
    });
    // Generic Service Policy
    this.registerPolicy({
      name: 'generic',
      maxRetries: 2,
      baseDelay: 1000,
      maxDelay: 15000,
      backoffMultiplier: 2,
      jitterEnabled: true,
      jitterMax: 0.25,
      retryableErrors: ['timeout', 'network', 'temporary', '502', '503', '504'],
      nonRetryableErrors: ['401', '403', '404', 'invalid', 'malformed'],
      contextualRules: []
    });
  }
  /**
   * Validate retry policy configuration
   */
  private validatePolicy(policy: RetryPolicy): void {
    if (!policy.name) {
      throw new Error('Policy name is required');
    }
    if (policy.maxRetries < 0) {
      throw new Error('Max retries must be non-negative');
    }
    if (policy.baseDelay <= 0) {
      throw new Error('Base delay must be positive');
    }
    if (policy.maxDelay <= 0) {
      throw new Error('Max delay must be positive');
    }
    if (policy.backoffMultiplier <= 0) {
      throw new Error('Backoff multiplier must be positive');
    }
    if (policy.jitterMax < 0 || policy.jitterMax > 1) {
      throw new Error('Jitter max must be between 0 and 1');
    }
  }
  /**
   * Update retry statistics
   */
  private updateStats(
    policyName: string,
    attempts: number,
    success: boolean,
    totalTime: number
  ): void {
    let stats = this.retryStats.get(policyName);
    if (!stats) {
      stats = {
        totalAttempts: 0,
        successfulRetries: 0,
        failedRetries: 0,
        averageDelay: 0
      };
      this.retryStats.set(policyName, stats);
    }
    stats.totalAttempts += attempts;
    if (success) {
      stats.successfulRetries++;
    } else {
      stats.failedRetries++;
    }
    
    // Update average delay (simple moving average)
    const totalRetries = stats.successfulRetries + stats.failedRetries;
    stats.averageDelay = ((stats.averageDelay * (totalRetries - 1)) + totalTime) / totalRetries;
  }
  /**
   * Generate unique operation ID
   */
  private generateOperationId(baseName: string): string {
    return `${baseName}-${Date.now()}-${Math.random().toString(36).substring(2)}`;
  }
  /**
   * Setup memory management
   */
  private setupMemoryManagement(): void {
    MemoryLeakIntegration.initialize();
    
    // Periodic cleanup of old statistics
    setInterval(() => {
      this.cleanupStats();
    }, 300000); // Every 5 minutes
  }
  /**
   * Cleanup old statistics to prevent memory leaks
   */
  private cleanupStats(): void {
    // Reset stats if they get too large
    for (const [, stats] of this.retryStats) {
      if (stats.totalAttempts > 10000) {
        // Reset but keep the ratios
        const successRate = stats.successfulRetries / stats.totalAttempts;
        stats.totalAttempts = 1000;
        stats.successfulRetries = Math.floor(1000 * successRate);
        stats.failedRetries = 1000 - stats.successfulRetries;
      }
    }
  }
  /**
   * Sleep utility
   */
  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
  /**
   * Cleanup resources
   */
  async cleanup(): Promise<void> {
    this.removeAllListeners();
    this.policies.clear();
    this.retryStats.clear();
    this.activeRetries.clear();
  }
}
/**
 * Convenience wrapper for common retry patterns
 */
export class RetryHelper {
  private static retryManager = RetryManager.getInstance();
  /**
   * Retry LLM operations with appropriate policy
   */
  static async retryLLMOperation<T>(
    operation: () => Promise<T>,
    context?: OperationContext,
    circuitBreaker?: CircuitBreaker
  ): Promise<T> {
    return this.retryManager.executeWithRetry(
      operation,
      'llm-adapter',
      context,
      circuitBreaker
    );
  }
  /**
   * Retry database operations
   */
  static async retryDatabaseOperation<T>(
    operation: () => Promise<T>,
    context?: OperationContext,
    circuitBreaker?: CircuitBreaker
  ): Promise<T> {
    return this.retryManager.executeWithRetry(
      operation,
      'trajectory-store',
      context,
      circuitBreaker
    );
  }
  /**
   * Retry computation operations
   */
  static async retryComputationOperation<T>(
    operation: () => Promise<T>,
    context?: OperationContext
  ): Promise<T> {
    return this.retryManager.executeWithRetry(
      operation,
      'pareto-frontier',
      context
    );
  }
  /**
   * Generic retry with default policy
   */
  static async retry<T>(
    operation: () => Promise<T>,
    context?: OperationContext
  ): Promise<T> {
    return this.retryManager.executeWithRetry(
      operation,
      'generic',
      context
    );
  }
}