Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
progressive_timeout.py21.1 kB
""" Progressive Timeout Handler for CFM Tips MCP Server Provides intelligent timeout management based on analysis complexity, historical performance, and system resource availability. """ import logging import time import threading from typing import Dict, List, Any, Optional, Callable, Union from datetime import datetime, timedelta from dataclasses import dataclass, field from enum import Enum import math logger = logging.getLogger(__name__) class ComplexityLevel(Enum): """Analysis complexity levels.""" VERY_LOW = 1 LOW = 2 MEDIUM = 3 HIGH = 4 VERY_HIGH = 5 @dataclass class TimeoutConfiguration: """Configuration for timeout calculation.""" base_timeout: float = 30.0 # Base timeout in seconds complexity_multiplier: Dict[ComplexityLevel, float] = field(default_factory=lambda: { ComplexityLevel.VERY_LOW: 0.5, ComplexityLevel.LOW: 1.0, ComplexityLevel.MEDIUM: 1.5, ComplexityLevel.HIGH: 2.5, ComplexityLevel.VERY_HIGH: 4.0 }) data_size_multiplier: float = 0.1 # Additional seconds per MB of data bucket_count_multiplier: float = 2.0 # Additional seconds per bucket historical_performance_weight: float = 0.3 # Weight for historical performance system_load_weight: float = 0.2 # Weight for system load adjustment min_timeout: float = 10.0 # Minimum timeout max_timeout: float = 300.0 # Maximum timeout (5 minutes) grace_period: float = 15.0 # Additional grace period @dataclass class AnalysisContext: """Context information for timeout calculation.""" analysis_type: str complexity_level: ComplexityLevel = ComplexityLevel.MEDIUM estimated_data_size_mb: float = 0.0 bucket_count: int = 0 region: Optional[str] = None include_cost_analysis: bool = True lookback_days: int = 30 parallel_execution: bool = True custom_parameters: Dict[str, Any] = field(default_factory=dict) @dataclass class TimeoutResult: """Result of timeout calculation.""" calculated_timeout: float base_timeout: float complexity_adjustment: float data_size_adjustment: float bucket_count_adjustment: float historical_adjustment: float system_load_adjustment: float final_timeout: float reasoning: List[str] = field(default_factory=list) class ProgressiveTimeoutHandler: """ Intelligent timeout handler that calculates timeouts based on: - Analysis complexity and type - Historical performance data - System resource availability - Data size and scope - Current system load """ def __init__(self, config: Optional[TimeoutConfiguration] = None): """ Initialize ProgressiveTimeoutHandler. Args: config: Timeout configuration (uses default if None) """ self.config = config or TimeoutConfiguration() # Historical performance tracking self.performance_history: Dict[str, List[float]] = {} self.complexity_history: Dict[str, List[Tuple[ComplexityLevel, float]]] = {} # System monitoring self.system_load_samples: List[Tuple[datetime, float]] = [] self.max_load_samples = 100 # Thread safety self._lock = threading.RLock() # Performance monitor integration self._performance_monitor = None logger.info("ProgressiveTimeoutHandler initialized") def set_performance_monitor(self, performance_monitor): """Set performance monitor for integration.""" self._performance_monitor = performance_monitor def calculate_timeout(self, context: AnalysisContext) -> TimeoutResult: """ Calculate intelligent timeout based on analysis context. Args: context: Analysis context information Returns: TimeoutResult with calculated timeout and reasoning """ with self._lock: reasoning = [] # Start with base timeout base_timeout = self.config.base_timeout reasoning.append(f"Base timeout: {base_timeout}s") # Apply complexity multiplier complexity_multiplier = self.config.complexity_multiplier.get( context.complexity_level, 1.0 ) complexity_adjustment = base_timeout * (complexity_multiplier - 1.0) reasoning.append( f"Complexity adjustment ({context.complexity_level.name}): " f"{complexity_adjustment:+.1f}s (multiplier: {complexity_multiplier})" ) # Apply data size adjustment data_size_adjustment = context.estimated_data_size_mb * self.config.data_size_multiplier reasoning.append(f"Data size adjustment ({context.estimated_data_size_mb:.1f}MB): {data_size_adjustment:+.1f}s") # Apply bucket count adjustment bucket_count_adjustment = context.bucket_count * self.config.bucket_count_multiplier reasoning.append(f"Bucket count adjustment ({context.bucket_count} buckets): {bucket_count_adjustment:+.1f}s") # Calculate base adjusted timeout calculated_timeout = ( base_timeout + complexity_adjustment + data_size_adjustment + bucket_count_adjustment ) # Apply historical performance adjustment historical_adjustment = self._calculate_historical_adjustment( context.analysis_type, context.complexity_level, calculated_timeout ) reasoning.append(f"Historical performance adjustment: {historical_adjustment:+.1f}s") # Apply system load adjustment system_load_adjustment = self._calculate_system_load_adjustment(calculated_timeout) reasoning.append(f"System load adjustment: {system_load_adjustment:+.1f}s") # Calculate final timeout final_timeout = ( calculated_timeout + historical_adjustment + system_load_adjustment + self.config.grace_period ) # Apply min/max constraints constrained_timeout = max( self.config.min_timeout, min(final_timeout, self.config.max_timeout) ) if constrained_timeout != final_timeout: reasoning.append( f"Applied constraints: {final_timeout:.1f}s -> {constrained_timeout:.1f}s " f"(min: {self.config.min_timeout}s, max: {self.config.max_timeout}s)" ) reasoning.append(f"Grace period: +{self.config.grace_period}s") reasoning.append(f"Final timeout: {constrained_timeout:.1f}s") result = TimeoutResult( calculated_timeout=calculated_timeout, base_timeout=base_timeout, complexity_adjustment=complexity_adjustment, data_size_adjustment=data_size_adjustment, bucket_count_adjustment=bucket_count_adjustment, historical_adjustment=historical_adjustment, system_load_adjustment=system_load_adjustment, final_timeout=constrained_timeout, reasoning=reasoning ) logger.debug(f"Calculated timeout for {context.analysis_type}: {constrained_timeout:.1f}s") return result def _calculate_historical_adjustment(self, analysis_type: str, complexity_level: ComplexityLevel, base_timeout: float) -> float: """Calculate adjustment based on historical performance.""" if analysis_type not in self.performance_history: return 0.0 history = self.performance_history[analysis_type] if not history: return 0.0 # Calculate average historical execution time avg_execution_time = sum(history) / len(history) # Calculate adjustment based on difference from base timeout performance_ratio = avg_execution_time / base_timeout # Apply weight and calculate adjustment adjustment = (avg_execution_time - base_timeout) * self.config.historical_performance_weight # Limit adjustment to reasonable bounds max_adjustment = base_timeout * 0.5 # Max 50% adjustment adjustment = max(-max_adjustment, min(adjustment, max_adjustment)) return adjustment def _calculate_system_load_adjustment(self, base_timeout: float) -> float: """Calculate adjustment based on current system load.""" if not self.system_load_samples: return 0.0 # Get recent load samples (last 5 minutes) cutoff_time = datetime.now() - timedelta(minutes=5) recent_samples = [ load for timestamp, load in self.system_load_samples if timestamp >= cutoff_time ] if not recent_samples: return 0.0 # Calculate average recent load avg_load = sum(recent_samples) / len(recent_samples) # High load increases timeout, low load decreases it if avg_load > 80: # High load load_multiplier = 1.0 + ((avg_load - 80) / 100) # Up to 20% increase elif avg_load < 30: # Low load load_multiplier = 0.9 # 10% decrease else: load_multiplier = 1.0 # No adjustment adjustment = base_timeout * (load_multiplier - 1.0) * self.config.system_load_weight # Limit adjustment max_adjustment = base_timeout * 0.3 # Max 30% adjustment adjustment = max(-max_adjustment, min(adjustment, max_adjustment)) return adjustment def record_execution_time(self, analysis_type: str, execution_time: float, complexity_level: ComplexityLevel = ComplexityLevel.MEDIUM): """ Record execution time for historical analysis. Args: analysis_type: Type of analysis execution_time: Actual execution time in seconds complexity_level: Complexity level of the analysis """ with self._lock: # Record in performance history if analysis_type not in self.performance_history: self.performance_history[analysis_type] = [] self.performance_history[analysis_type].append(execution_time) # Keep only recent history (last 100 executions) if len(self.performance_history[analysis_type]) > 100: self.performance_history[analysis_type] = self.performance_history[analysis_type][-100:] # Record complexity history if analysis_type not in self.complexity_history: self.complexity_history[analysis_type] = [] self.complexity_history[analysis_type].append((complexity_level, execution_time)) # Keep only recent complexity history if len(self.complexity_history[analysis_type]) > 100: self.complexity_history[analysis_type] = self.complexity_history[analysis_type][-100:] logger.debug(f"Recorded execution time for {analysis_type}: {execution_time:.2f}s") def record_system_load(self, cpu_percent: float): """ Record system load for load-based adjustments. Args: cpu_percent: CPU usage percentage """ with self._lock: timestamp = datetime.now() self.system_load_samples.append((timestamp, cpu_percent)) # Keep only recent samples if len(self.system_load_samples) > self.max_load_samples: self.system_load_samples = self.system_load_samples[-self.max_load_samples:] def get_complexity_level(self, analysis_type: str, **context) -> ComplexityLevel: """ Determine complexity level based on analysis type and context. Args: analysis_type: Type of analysis **context: Additional context parameters Returns: Estimated complexity level """ # Base complexity by analysis type base_complexity = { "general_spend": ComplexityLevel.HIGH, "storage_class": ComplexityLevel.HIGH, "archive_optimization": ComplexityLevel.MEDIUM, "api_cost": ComplexityLevel.MEDIUM, "multipart_cleanup": ComplexityLevel.LOW, "governance": ComplexityLevel.LOW, "comprehensive": ComplexityLevel.VERY_HIGH }.get(analysis_type, ComplexityLevel.MEDIUM) # Adjust based on context bucket_count = context.get("bucket_count", 0) lookback_days = context.get("lookback_days", 30) include_cost_analysis = context.get("include_cost_analysis", True) complexity_score = base_complexity.value # Adjust for bucket count if bucket_count > 100: complexity_score += 1 elif bucket_count > 1000: complexity_score += 2 # Adjust for lookback period if lookback_days > 90: complexity_score += 1 elif lookback_days > 365: complexity_score += 2 # Adjust for cost analysis if include_cost_analysis: complexity_score += 0.5 # Convert back to enum final_complexity = min(ComplexityLevel.VERY_HIGH.value, max(ComplexityLevel.VERY_LOW.value, int(complexity_score))) return ComplexityLevel(final_complexity) def create_analysis_context(self, analysis_type: str, **kwargs) -> AnalysisContext: """ Create analysis context from parameters. Args: analysis_type: Type of analysis **kwargs: Analysis parameters Returns: AnalysisContext object """ # Estimate data size based on parameters bucket_count = len(kwargs.get("bucket_names", [])) if kwargs.get("bucket_names") else 10 lookback_days = kwargs.get("lookback_days", 30) # Rough estimation of data size estimated_data_size_mb = bucket_count * lookback_days * 0.1 # 0.1MB per bucket per day # Determine complexity (remove bucket_count from kwargs to avoid conflict) context_kwargs = kwargs.copy() context_kwargs.pop('bucket_names', None) # Remove to avoid conflicts complexity_level = self.get_complexity_level(analysis_type, bucket_count=bucket_count, **context_kwargs) return AnalysisContext( analysis_type=analysis_type, complexity_level=complexity_level, estimated_data_size_mb=estimated_data_size_mb, bucket_count=bucket_count, region=kwargs.get("region"), include_cost_analysis=kwargs.get("include_cost_analysis", True), lookback_days=lookback_days, parallel_execution=kwargs.get("parallel_execution", True), custom_parameters=kwargs ) def get_timeout_for_analysis(self, analysis_type: str, **kwargs) -> float: """ Convenience method to get timeout for an analysis. Args: analysis_type: Type of analysis **kwargs: Analysis parameters Returns: Calculated timeout in seconds """ context = self.create_analysis_context(analysis_type, **kwargs) result = self.calculate_timeout(context) return result.final_timeout def get_performance_statistics(self) -> Dict[str, Any]: """Get performance statistics and timeout effectiveness.""" with self._lock: stats = { "timestamp": datetime.now().isoformat(), "configuration": { "base_timeout": self.config.base_timeout, "min_timeout": self.config.min_timeout, "max_timeout": self.config.max_timeout, "grace_period": self.config.grace_period }, "historical_data": {}, "system_load": { "samples_count": len(self.system_load_samples), "recent_avg_load": 0.0 } } # Calculate historical statistics for analysis_type, history in self.performance_history.items(): if history: stats["historical_data"][analysis_type] = { "execution_count": len(history), "avg_execution_time": sum(history) / len(history), "min_execution_time": min(history), "max_execution_time": max(history), "recent_executions": history[-10:] if len(history) >= 10 else history } # Calculate recent system load if self.system_load_samples: cutoff_time = datetime.now() - timedelta(minutes=5) recent_loads = [ load for timestamp, load in self.system_load_samples if timestamp >= cutoff_time ] if recent_loads: stats["system_load"]["recent_avg_load"] = sum(recent_loads) / len(recent_loads) return stats def optimize_configuration(self): """Optimize timeout configuration based on historical data.""" with self._lock: if not self.performance_history: return # Analyze timeout effectiveness total_executions = sum(len(history) for history in self.performance_history.values()) if total_executions < 50: # Need sufficient data return # Calculate average execution times by complexity complexity_averages = {} for analysis_type, complexity_history in self.complexity_history.items(): for complexity_level, execution_time in complexity_history: if complexity_level not in complexity_averages: complexity_averages[complexity_level] = [] complexity_averages[complexity_level].append(execution_time) # Update complexity multipliers based on actual performance for complexity_level, execution_times in complexity_averages.items(): if len(execution_times) >= 10: # Need sufficient samples avg_time = sum(execution_times) / len(execution_times) base_avg = sum( sum(history) / len(history) for history in self.performance_history.values() ) / len(self.performance_history) # Calculate optimal multiplier optimal_multiplier = avg_time / base_avg if base_avg > 0 else 1.0 # Gradually adjust current multiplier towards optimal current_multiplier = self.config.complexity_multiplier[complexity_level] new_multiplier = current_multiplier * 0.8 + optimal_multiplier * 0.2 self.config.complexity_multiplier[complexity_level] = new_multiplier logger.info( f"Optimized complexity multiplier for {complexity_level.name}: " f"{current_multiplier:.2f} -> {new_multiplier:.2f}" ) def shutdown(self): """Shutdown the timeout handler.""" logger.info("Shutting down ProgressiveTimeoutHandler") with self._lock: # Optionally save historical data for persistence self.performance_history.clear() self.complexity_history.clear() self.system_load_samples.clear() logger.info("ProgressiveTimeoutHandler shutdown complete") # Global timeout handler instance _timeout_handler = None def get_timeout_handler() -> ProgressiveTimeoutHandler: """Get the global timeout handler instance.""" global _timeout_handler if _timeout_handler is None: _timeout_handler = ProgressiveTimeoutHandler() return _timeout_handler

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server