Skip to main content
Glama
aws-samples

CFM Tips - Cost Optimization MCP Server

by aws-samples
intelligent_cache.py21.8 kB
""" Intelligent Cache System for CFM Tips MCP Server Provides intelligent caching for frequently accessed data including pricing, bucket metadata, and analysis results with automatic expiration and memory management. """ import logging import time import threading import hashlib import json import pickle from typing import Dict, List, Any, Optional, Callable, Union, Tuple from datetime import datetime, timedelta from dataclasses import dataclass, field from collections import OrderedDict import weakref logger = logging.getLogger(__name__) @dataclass class CacheEntry: """Individual cache entry with metadata.""" key: str value: Any created_at: datetime last_accessed: datetime access_count: int = 0 ttl_seconds: Optional[int] = None size_bytes: int = 0 tags: Dict[str, str] = field(default_factory=dict) def is_expired(self) -> bool: """Check if the cache entry has expired.""" if self.ttl_seconds is None: return False age_seconds = (datetime.now() - self.created_at).total_seconds() return age_seconds > self.ttl_seconds def update_access(self): """Update access statistics.""" self.last_accessed = datetime.now() self.access_count += 1 def calculate_size(self): """Calculate approximate size of the cached value.""" try: if isinstance(self.value, (str, bytes)): self.size_bytes = len(self.value) elif isinstance(self.value, (dict, list)): # Approximate size using JSON serialization self.size_bytes = len(json.dumps(self.value, default=str)) else: # Use pickle for other types self.size_bytes = len(pickle.dumps(self.value)) except Exception: # Fallback to a rough estimate self.size_bytes = 1024 # 1KB default class IntelligentCache: """ Intelligent caching system with advanced features: - LRU eviction with access frequency consideration - TTL-based expiration - Memory usage monitoring and limits - Cache warming and preloading - Performance metrics integration - Tag-based cache invalidation - Automatic cleanup and optimization """ def __init__(self, max_size_mb: int = 100, max_entries: int = 10000, default_ttl_seconds: int = 3600, cleanup_interval_minutes: int = 15): """ Initialize IntelligentCache. Args: max_size_mb: Maximum cache size in megabytes max_entries: Maximum number of cache entries default_ttl_seconds: Default TTL for cache entries cleanup_interval_minutes: Interval for automatic cleanup """ self.max_size_bytes = max_size_mb * 1024 * 1024 self.max_entries = max_entries self.default_ttl_seconds = default_ttl_seconds self.cleanup_interval_minutes = cleanup_interval_minutes # Cache storage (OrderedDict for LRU behavior) self.cache: OrderedDict[str, CacheEntry] = OrderedDict() # Cache statistics self.stats = { "hits": 0, "misses": 0, "evictions": 0, "expirations": 0, "total_size_bytes": 0, "cleanup_runs": 0 } # Thread safety self._lock = threading.RLock() # Background cleanup self._cleanup_active = True self._cleanup_thread = None # Cache warming functions self._warming_functions: Dict[str, Callable] = {} # Performance monitor integration self._performance_monitor = None # Start background cleanup self._start_cleanup_thread() logger.info(f"IntelligentCache initialized (max_size: {max_size_mb}MB, max_entries: {max_entries})") def set_performance_monitor(self, performance_monitor): """Set performance monitor for metrics integration.""" self._performance_monitor = performance_monitor def _start_cleanup_thread(self): """Start background cleanup thread.""" self._cleanup_thread = threading.Thread( target=self._cleanup_worker, daemon=True, name="CacheCleanup" ) self._cleanup_thread.start() logger.info("Cache cleanup thread started") def _cleanup_worker(self): """Background worker for cache cleanup.""" while self._cleanup_active: try: time.sleep(self.cleanup_interval_minutes * 60) self._cleanup_expired_entries() self._optimize_cache() except Exception as e: logger.error(f"Error in cache cleanup: {e}") def _generate_key(self, key_parts: Union[str, List[Any]]) -> str: """Generate a consistent cache key from various inputs.""" if isinstance(key_parts, str): return key_parts # Convert to string and hash for consistency key_str = json.dumps(key_parts, sort_keys=True, default=str) return hashlib.md5(key_str.encode()).hexdigest() def get(self, key: Union[str, List[Any]], default: Any = None) -> Any: """ Get value from cache. Args: key: Cache key (string or list of components) default: Default value if key not found Returns: Cached value or default """ cache_key = self._generate_key(key) with self._lock: if cache_key in self.cache: entry = self.cache[cache_key] # Check if expired if entry.is_expired(): del self.cache[cache_key] self.stats["expirations"] += 1 self._update_total_size() # Record cache miss self.stats["misses"] += 1 if self._performance_monitor: self._performance_monitor.record_cache_miss("intelligent_cache") return default # Update access statistics entry.update_access() # Move to end (most recently used) self.cache.move_to_end(cache_key) # Record cache hit self.stats["hits"] += 1 if self._performance_monitor: self._performance_monitor.record_cache_hit("intelligent_cache") return entry.value # Cache miss self.stats["misses"] += 1 if self._performance_monitor: self._performance_monitor.record_cache_miss("intelligent_cache") return default def put(self, key: Union[str, List[Any]], value: Any, ttl_seconds: Optional[int] = None, tags: Optional[Dict[str, str]] = None) -> bool: """ Put value in cache. Args: key: Cache key (string or list of components) value: Value to cache ttl_seconds: Time to live in seconds (None for default) tags: Optional tags for the entry Returns: True if successfully cached, False otherwise """ cache_key = self._generate_key(key) with self._lock: # Create cache entry entry = CacheEntry( key=cache_key, value=value, created_at=datetime.now(), last_accessed=datetime.now(), ttl_seconds=ttl_seconds or self.default_ttl_seconds, tags=tags or {} ) # Calculate size entry.calculate_size() # Check if we need to make space if not self._make_space_for_entry(entry): logger.warning(f"Could not make space for cache entry: {cache_key}") return False # Add to cache self.cache[cache_key] = entry self.cache.move_to_end(cache_key) # Mark as most recently used self._update_total_size() logger.debug(f"Cached entry: {cache_key} (size: {entry.size_bytes} bytes)") return True def _make_space_for_entry(self, new_entry: CacheEntry) -> bool: """Make space for a new cache entry by evicting old ones.""" # Check entry count limit while len(self.cache) >= self.max_entries: if not self._evict_lru_entry(): return False # Check size limit while (self.stats["total_size_bytes"] + new_entry.size_bytes) > self.max_size_bytes: if not self._evict_lru_entry(): return False return True def _evict_lru_entry(self) -> bool: """Evict the least recently used entry.""" if not self.cache: return False # Get LRU entry (first in OrderedDict) lru_key, lru_entry = next(iter(self.cache.items())) # Remove from cache del self.cache[lru_key] self.stats["evictions"] += 1 logger.debug(f"Evicted LRU entry: {lru_key} (size: {lru_entry.size_bytes} bytes)") return True def _update_total_size(self): """Update total cache size statistics.""" self.stats["total_size_bytes"] = sum(entry.size_bytes for entry in self.cache.values()) def _cleanup_expired_entries(self): """Clean up expired cache entries.""" with self._lock: expired_keys = [] for key, entry in self.cache.items(): if entry.is_expired(): expired_keys.append(key) for key in expired_keys: del self.cache[key] self.stats["expirations"] += 1 if expired_keys: self._update_total_size() logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") self.stats["cleanup_runs"] += 1 def _optimize_cache(self): """Optimize cache by analyzing access patterns.""" with self._lock: if not self.cache: return # Find entries with very low access frequency current_time = datetime.now() low_access_keys = [] for key, entry in self.cache.items(): age_hours = (current_time - entry.created_at).total_seconds() / 3600 if age_hours > 1: # Only consider entries older than 1 hour access_rate = entry.access_count / age_hours if access_rate < 0.1: # Less than 0.1 accesses per hour low_access_keys.append(key) # Remove low-access entries if we're near capacity if len(self.cache) > (self.max_entries * 0.8): # 80% capacity for key in low_access_keys[:len(low_access_keys)//2]: # Remove half del self.cache[key] self.stats["evictions"] += 1 if low_access_keys: self._update_total_size() logger.info(f"Optimized cache by removing {len(low_access_keys)//2} low-access entries") def invalidate(self, key: Union[str, List[Any]]) -> bool: """ Invalidate a specific cache entry. Args: key: Cache key to invalidate Returns: True if entry was found and removed, False otherwise """ cache_key = self._generate_key(key) with self._lock: if cache_key in self.cache: del self.cache[cache_key] self._update_total_size() logger.debug(f"Invalidated cache entry: {cache_key}") return True return False def invalidate_by_tags(self, tags: Dict[str, str]) -> int: """ Invalidate cache entries by tags. Args: tags: Tags to match for invalidation Returns: Number of entries invalidated """ with self._lock: keys_to_remove = [] for key, entry in self.cache.items(): # Check if all specified tags match if all(entry.tags.get(tag_key) == tag_value for tag_key, tag_value in tags.items()): keys_to_remove.append(key) for key in keys_to_remove: del self.cache[key] if keys_to_remove: self._update_total_size() logger.info(f"Invalidated {len(keys_to_remove)} cache entries by tags: {tags}") return len(keys_to_remove) def clear(self): """Clear all cache entries.""" with self._lock: entry_count = len(self.cache) self.cache.clear() self.stats["total_size_bytes"] = 0 logger.info(f"Cleared {entry_count} cache entries") def warm_cache(self, warming_key: str, *args, **kwargs) -> bool: """ Warm cache using a registered warming function. Args: warming_key: Key for the warming function *args, **kwargs: Arguments for the warming function Returns: True if warming was successful, False otherwise """ if warming_key not in self._warming_functions: logger.warning(f"No warming function registered for key: {warming_key}") return False try: warming_func = self._warming_functions[warming_key] warming_func(self, *args, **kwargs) logger.info(f"Cache warming completed for: {warming_key}") return True except Exception as e: logger.error(f"Error warming cache for {warming_key}: {e}") return False def register_warming_function(self, key: str, func: Callable): """ Register a cache warming function. Args: key: Unique key for the warming function func: Function that takes (cache, *args, **kwargs) and warms the cache """ self._warming_functions[key] = func logger.info(f"Registered cache warming function: {key}") def get_statistics(self) -> Dict[str, Any]: """Get comprehensive cache statistics.""" with self._lock: total_requests = self.stats["hits"] + self.stats["misses"] hit_rate = (self.stats["hits"] / total_requests * 100) if total_requests > 0 else 0 # Calculate size distribution size_distribution = {} if self.cache: sizes = [entry.size_bytes for entry in self.cache.values()] size_distribution = { "min_size_bytes": min(sizes), "max_size_bytes": max(sizes), "avg_size_bytes": sum(sizes) / len(sizes) } # Calculate access frequency distribution access_distribution = {} if self.cache: access_counts = [entry.access_count for entry in self.cache.values()] access_distribution = { "min_access_count": min(access_counts), "max_access_count": max(access_counts), "avg_access_count": sum(access_counts) / len(access_counts) } return { "timestamp": datetime.now().isoformat(), "cache_performance": { "hit_rate_percent": hit_rate, "total_hits": self.stats["hits"], "total_misses": self.stats["misses"], "total_requests": total_requests }, "cache_size": { "current_entries": len(self.cache), "max_entries": self.max_entries, "current_size_bytes": self.stats["total_size_bytes"], "current_size_mb": self.stats["total_size_bytes"] / 1024 / 1024, "max_size_mb": self.max_size_bytes / 1024 / 1024, "utilization_percent": (len(self.cache) / self.max_entries * 100) if self.max_entries > 0 else 0 }, "cache_operations": { "evictions": self.stats["evictions"], "expirations": self.stats["expirations"], "cleanup_runs": self.stats["cleanup_runs"] }, "size_distribution": size_distribution, "access_distribution": access_distribution, "warming_functions": list(self._warming_functions.keys()) } def get_cache_contents(self, include_values: bool = False) -> List[Dict[str, Any]]: """ Get information about current cache contents. Args: include_values: Whether to include actual cached values Returns: List of cache entry information """ with self._lock: contents = [] for key, entry in self.cache.items(): entry_info = { "key": key, "created_at": entry.created_at.isoformat(), "last_accessed": entry.last_accessed.isoformat(), "access_count": entry.access_count, "ttl_seconds": entry.ttl_seconds, "size_bytes": entry.size_bytes, "tags": entry.tags, "is_expired": entry.is_expired() } if include_values: entry_info["value"] = entry.value contents.append(entry_info) return contents def shutdown(self): """Shutdown the cache system.""" logger.info("Shutting down IntelligentCache") self._cleanup_active = False if self._cleanup_thread and self._cleanup_thread.is_alive(): self._cleanup_thread.join(timeout=5) with self._lock: self.cache.clear() logger.info("IntelligentCache shutdown complete") # Specialized cache implementations for common use cases class PricingCache(IntelligentCache): """Specialized cache for AWS pricing data.""" def __init__(self): # Pricing data changes infrequently, so longer TTL super().__init__( max_size_mb=50, max_entries=5000, default_ttl_seconds=3600 * 6, # 6 hours cleanup_interval_minutes=30 ) # Register warming functions self.register_warming_function("s3_pricing", self._warm_s3_pricing) self.register_warming_function("ec2_pricing", self._warm_ec2_pricing) def _warm_s3_pricing(self, cache, region: str = "us-east-1"): """Warm cache with common S3 pricing data.""" # This would be implemented to preload common S3 pricing queries logger.info(f"Warming S3 pricing cache for region: {region}") def _warm_ec2_pricing(self, cache, region: str = "us-east-1"): """Warm cache with common EC2 pricing data.""" # This would be implemented to preload common EC2 pricing queries logger.info(f"Warming EC2 pricing cache for region: {region}") class BucketMetadataCache(IntelligentCache): """Specialized cache for S3 bucket metadata.""" def __init__(self): # Bucket metadata changes more frequently super().__init__( max_size_mb=30, max_entries=3000, default_ttl_seconds=1800, # 30 minutes cleanup_interval_minutes=15 ) class AnalysisResultsCache(IntelligentCache): """Specialized cache for analysis results.""" def __init__(self): # Analysis results can be large but are valuable to cache super().__init__( max_size_mb=200, max_entries=1000, default_ttl_seconds=3600, # 1 hour cleanup_interval_minutes=20 ) # Global cache instances _pricing_cache = None _bucket_metadata_cache = None _analysis_results_cache = None def get_pricing_cache() -> PricingCache: """Get the global pricing cache instance.""" global _pricing_cache if _pricing_cache is None: _pricing_cache = PricingCache() return _pricing_cache def get_bucket_metadata_cache() -> BucketMetadataCache: """Get the global bucket metadata cache instance.""" global _bucket_metadata_cache if _bucket_metadata_cache is None: _bucket_metadata_cache = BucketMetadataCache() return _bucket_metadata_cache def get_analysis_results_cache() -> AnalysisResultsCache: """Get the global analysis results cache instance.""" global _analysis_results_cache if _analysis_results_cache is None: _analysis_results_cache = AnalysisResultsCache() return _analysis_results_cache

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aws-samples/sample-cfm-tips-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server