Skip to main content
Glama
cache.py15.1 kB
"""Cache Management Module. Provides intelligent caching for file metadata, Excel structures, and execution results to improve performance. """ import os import json import pickle import hashlib import time from typing import Dict, Any, Optional, Union, List from pathlib import Path from dataclasses import dataclass, asdict from threading import Lock try: from core.config import get_config CORE_AVAILABLE = True except ImportError: CORE_AVAILABLE = False def get_config(): return {'cache': {'enabled': True, 'ttl': 3600, 'max_size': 100}} @dataclass class CacheEntry: """Cache entry with metadata.""" key: str value: Any created_at: float accessed_at: float access_count: int size_bytes: int ttl: Optional[float] = None class CacheManager: """Intelligent cache manager with TTL and size limits.""" def __init__(self, cache_dir: Optional[str] = None, max_memory_size: int = 100 * 1024 * 1024, # 100MB default_ttl: int = 3600): """Initialize cache manager. Args: cache_dir: Directory for persistent cache max_memory_size: Maximum memory cache size in bytes default_ttl: Default TTL in seconds """ self.cache_dir = Path(cache_dir) if cache_dir else Path.home() / '.chatexcel_cache' self.max_memory_size = max_memory_size self.default_ttl = default_ttl # In-memory cache self._memory_cache: Dict[str, CacheEntry] = {} self._memory_size = 0 self._lock = Lock() # Ensure cache directory exists self.cache_dir.mkdir(parents=True, exist_ok=True) # Load configuration if CORE_AVAILABLE: config = get_config() cache_config = config.get('cache', {}) self.enabled = cache_config.get('enabled', True) self.default_ttl = cache_config.get('ttl', default_ttl) self.max_memory_size = cache_config.get('max_size', max_memory_size) * 1024 * 1024 else: self.enabled = True def get(self, key: str, default: Any = None) -> Any: """Get value from cache. Args: key: Cache key default: Default value if not found Returns: Cached value or default """ if not self.enabled: return default with self._lock: # Check memory cache first if key in self._memory_cache: entry = self._memory_cache[key] # Check TTL if self._is_expired(entry): self._remove_from_memory(key) return default # Update access info entry.accessed_at = time.time() entry.access_count += 1 return entry.value # Check persistent cache persistent_value = self._get_from_disk(key) if persistent_value is not None: # Add to memory cache self._add_to_memory(key, persistent_value) return persistent_value return default def set(self, key: str, value: Any, ttl: Optional[int] = None, persist: bool = False) -> bool: """Set value in cache. Args: key: Cache key value: Value to cache ttl: Time to live in seconds persist: Whether to persist to disk Returns: True if successfully cached """ if not self.enabled: return False try: with self._lock: # Add to memory cache success = self._add_to_memory(key, value, ttl) # Persist to disk if requested if persist and success: self._save_to_disk(key, value, ttl) return success except Exception: return False def delete(self, key: str) -> bool: """Delete value from cache. Args: key: Cache key Returns: True if successfully deleted """ if not self.enabled: return False with self._lock: # Remove from memory memory_removed = self._remove_from_memory(key) # Remove from disk disk_removed = self._remove_from_disk(key) return memory_removed or disk_removed def clear(self, memory_only: bool = False) -> None: """Clear cache. Args: memory_only: If True, only clear memory cache """ with self._lock: # Clear memory cache self._memory_cache.clear() self._memory_size = 0 # Clear disk cache if not memory_only: try: for cache_file in self.cache_dir.glob('*.cache'): cache_file.unlink() except Exception: pass def get_stats(self) -> Dict[str, Any]: """Get cache statistics. Returns: Dictionary with cache statistics """ with self._lock: memory_entries = len(self._memory_cache) disk_entries = len(list(self.cache_dir.glob('*.cache'))) # Calculate hit rates (simplified) total_accesses = sum(entry.access_count for entry in self._memory_cache.values()) return { 'enabled': self.enabled, 'memory_entries': memory_entries, 'disk_entries': disk_entries, 'memory_size_bytes': self._memory_size, 'memory_size_mb': self._memory_size / (1024 * 1024), 'max_memory_size_mb': self.max_memory_size / (1024 * 1024), 'total_accesses': total_accesses, 'cache_dir': str(self.cache_dir) } def cleanup_expired(self) -> int: """Clean up expired entries. Returns: Number of entries removed """ if not self.enabled: return 0 removed_count = 0 with self._lock: # Clean memory cache expired_keys = [] for key, entry in self._memory_cache.items(): if self._is_expired(entry): expired_keys.append(key) for key in expired_keys: self._remove_from_memory(key) removed_count += 1 # Clean disk cache try: for cache_file in self.cache_dir.glob('*.cache'): try: with open(cache_file, 'rb') as f: entry_data = pickle.load(f) if self._is_disk_entry_expired(entry_data): cache_file.unlink() removed_count += 1 except Exception: # Remove corrupted cache files cache_file.unlink() removed_count += 1 except Exception: pass return removed_count def cache_file_info(self, file_path: str) -> str: """Generate cache key for file info. Args: file_path: Path to file Returns: Cache key for file info """ return f"file_info:{self._hash_key(file_path)}" def cache_excel_structure(self, file_path: str) -> str: """Generate cache key for Excel structure. Args: file_path: Path to Excel file Returns: Cache key for Excel structure """ return f"excel_structure:{self._hash_key(file_path)}" def cache_execution_result(self, code: str, file_path: str) -> str: """Generate cache key for execution result. Args: code: Python code file_path: Path to file Returns: Cache key for execution result """ combined = f"{code}:{file_path}" return f"execution:{self._hash_key(combined)}" def _add_to_memory(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """Add entry to memory cache.""" try: # Calculate size size_bytes = self._calculate_size(value) # Check if we need to make space while (self._memory_size + size_bytes > self.max_memory_size and self._memory_cache): self._evict_lru() # Don't cache if too large if size_bytes > self.max_memory_size: return False # Create entry entry = CacheEntry( key=key, value=value, created_at=time.time(), accessed_at=time.time(), access_count=1, size_bytes=size_bytes, ttl=ttl or self.default_ttl ) # Remove existing entry if present if key in self._memory_cache: self._remove_from_memory(key) # Add new entry self._memory_cache[key] = entry self._memory_size += size_bytes return True except Exception: return False def _remove_from_memory(self, key: str) -> bool: """Remove entry from memory cache.""" if key in self._memory_cache: entry = self._memory_cache.pop(key) self._memory_size -= entry.size_bytes return True return False def _evict_lru(self) -> None: """Evict least recently used entry.""" if not self._memory_cache: return # Find LRU entry lru_key = min(self._memory_cache.keys(), key=lambda k: self._memory_cache[k].accessed_at) self._remove_from_memory(lru_key) def _is_expired(self, entry: CacheEntry) -> bool: """Check if cache entry is expired.""" if entry.ttl is None: return False return time.time() - entry.created_at > entry.ttl def _save_to_disk(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """Save entry to disk.""" try: cache_file = self.cache_dir / f"{self._hash_key(key)}.cache" entry_data = { 'key': key, 'value': value, 'created_at': time.time(), 'ttl': ttl or self.default_ttl } with open(cache_file, 'wb') as f: pickle.dump(entry_data, f) return True except Exception: return False def _get_from_disk(self, key: str) -> Any: """Get entry from disk.""" try: cache_file = self.cache_dir / f"{self._hash_key(key)}.cache" if not cache_file.exists(): return None with open(cache_file, 'rb') as f: entry_data = pickle.load(f) # Check expiration if self._is_disk_entry_expired(entry_data): cache_file.unlink() return None return entry_data['value'] except Exception: return None def _remove_from_disk(self, key: str) -> bool: """Remove entry from disk.""" try: cache_file = self.cache_dir / f"{self._hash_key(key)}.cache" if cache_file.exists(): cache_file.unlink() return True except Exception: pass return False def _is_disk_entry_expired(self, entry_data: Dict[str, Any]) -> bool: """Check if disk entry is expired.""" ttl = entry_data.get('ttl') if ttl is None: return False created_at = entry_data.get('created_at', 0) return time.time() - created_at > ttl def _hash_key(self, key: str) -> str: """Generate hash for cache key.""" return hashlib.md5(key.encode('utf-8')).hexdigest() def _calculate_size(self, obj: Any) -> int: """Calculate approximate size of object in bytes.""" try: return len(pickle.dumps(obj)) except Exception: # Fallback estimation if isinstance(obj, str): return len(obj.encode('utf-8')) elif isinstance(obj, (int, float)): return 8 elif isinstance(obj, bool): return 1 elif isinstance(obj, (list, tuple)): return sum(self._calculate_size(item) for item in obj) elif isinstance(obj, dict): return sum(self._calculate_size(k) + self._calculate_size(v) for k, v in obj.items()) else: return 1024 # Default estimate # Global cache instance _global_cache = None def get_cache_manager() -> CacheManager: """Get global cache manager instance. Returns: Global CacheManager instance """ global _global_cache if _global_cache is None: _global_cache = CacheManager() return _global_cache def cache_result(key_func=None, ttl: Optional[int] = None, persist: bool = False): """Decorator for caching function results. Args: key_func: Function to generate cache key ttl: Time to live in seconds persist: Whether to persist to disk Returns: Decorated function """ def decorator(func): def wrapper(*args, **kwargs): cache = get_cache_manager() # Generate cache key if key_func: cache_key = key_func(*args, **kwargs) else: # Default key generation key_parts = [func.__name__] key_parts.extend(str(arg) for arg in args) key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items())) cache_key = ":".join(key_parts) # Try to get from cache result = cache.get(cache_key) if result is not None: return result # Execute function and cache result result = func(*args, **kwargs) cache.set(cache_key, result, ttl=ttl, persist=persist) return result return wrapper return decorator

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server