Skip to main content
Glama
manager.pyโ€ข10.3 kB
"""Cache manager implementation with in-memory and optional Redis support.""" import asyncio import hashlib import json import logging import time from typing import Any, Optional, Dict, Callable from functools import lru_cache from dataclasses import dataclass, field logger = logging.getLogger(__name__) @dataclass class CacheEntry: """Cache entry with value and metadata.""" value: Any timestamp: float ttl: int access_count: int = 0 last_access: float = field(default_factory=time.time) def is_expired(self) -> bool: """Check if cache entry is expired.""" return time.time() - self.timestamp > self.ttl def access(self) -> Any: """Access the cached value and update metadata.""" self.access_count += 1 self.last_access = time.time() return self.value @dataclass class CacheStats: """Cache statistics.""" hits: int = 0 misses: int = 0 sets: int = 0 deletes: int = 0 evictions: int = 0 size: int = 0 @property def hit_rate(self) -> float: """Calculate cache hit rate.""" total = self.hits + self.misses return (self.hits / total * 100) if total > 0 else 0.0 def to_dict(self) -> Dict[str, Any]: """Convert stats to dictionary.""" return { "hits": self.hits, "misses": self.misses, "sets": self.sets, "deletes": self.deletes, "evictions": self.evictions, "size": self.size, "hit_rate": round(self.hit_rate, 2), } class CacheManager: """ Cache manager with TTL support and automatic eviction. Features: - In-memory caching with TTL - LRU eviction when max size is reached - Cache statistics - Async support - Optional Redis backend (future) """ def __init__(self, max_size: int = 1000, default_ttl: int = 300): """ Initialize cache manager. Args: max_size: Maximum number of cache entries default_ttl: Default TTL in seconds """ self.max_size = max_size self.default_ttl = default_ttl self._cache: Dict[str, CacheEntry] = {} self._stats = CacheStats() self._lock = asyncio.Lock() self._cleanup_task: Optional[asyncio.Task] = None logger.info(f"Cache manager initialized (max_size={max_size}, default_ttl={default_ttl})") async def start(self): """Start background cleanup task.""" if self._cleanup_task is None: self._cleanup_task = asyncio.create_task(self._cleanup_loop()) logger.info("Cache cleanup task started") async def stop(self): """Stop background cleanup task.""" if self._cleanup_task is not None: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass self._cleanup_task = None logger.info("Cache cleanup task stopped") async def _cleanup_loop(self): """Background task to clean up expired entries.""" while True: try: await asyncio.sleep(60) # Run every minute await self._cleanup_expired() except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in cache cleanup: {e}") async def _cleanup_expired(self): """Remove expired entries from cache.""" async with self._lock: expired_keys = [key for key, entry in self._cache.items() if entry.is_expired()] for key in expired_keys: del self._cache[key] self._stats.evictions += 1 if expired_keys: self._stats.size = len(self._cache) logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries") def _make_key(self, key: str, namespace: Optional[str] = None) -> str: """ Create cache key with optional namespace. Args: key: Cache key namespace: Optional namespace prefix Returns: Namespaced cache key """ if namespace: return f"{namespace}:{key}" return key def _hash_key(self, *args, **kwargs) -> str: """ Generate cache key from function arguments. Args: *args: Positional arguments **kwargs: Keyword arguments Returns: Hash of arguments as cache key """ key_data = { "args": args, "kwargs": sorted(kwargs.items()), } key_str = json.dumps(key_data, sort_keys=True, default=str) return hashlib.md5(key_str.encode()).hexdigest() async def get( self, key: str, namespace: Optional[str] = None, default: Any = None ) -> Optional[Any]: """ Get value from cache. Args: key: Cache key namespace: Optional namespace default: Default value if not found Returns: Cached value or default """ cache_key = self._make_key(key, namespace) async with self._lock: entry = self._cache.get(cache_key) if entry is None: self._stats.misses += 1 logger.debug(f"Cache miss: {cache_key}") return default if entry.is_expired(): del self._cache[cache_key] self._stats.misses += 1 self._stats.evictions += 1 self._stats.size = len(self._cache) logger.debug(f"Cache expired: {cache_key}") return default self._stats.hits += 1 logger.debug(f"Cache hit: {cache_key} (access_count={entry.access_count + 1})") return entry.access() async def set( self, key: str, value: Any, ttl: Optional[int] = None, namespace: Optional[str] = None ): """ Set value in cache. Args: key: Cache key value: Value to cache ttl: Time to live in seconds (uses default if not provided) namespace: Optional namespace """ cache_key = self._make_key(key, namespace) ttl = ttl or self.default_ttl async with self._lock: # Evict LRU entry if cache is full if len(self._cache) >= self.max_size and cache_key not in self._cache: await self._evict_lru() entry = CacheEntry(value=value, timestamp=time.time(), ttl=ttl) self._cache[cache_key] = entry self._stats.sets += 1 self._stats.size = len(self._cache) logger.debug(f"Cache set: {cache_key} (ttl={ttl}s)") async def _evict_lru(self): """Evict least recently used entry.""" if not self._cache: return # Find LRU entry lru_key = min(self._cache.items(), key=lambda x: x[1].last_access)[0] del self._cache[lru_key] self._stats.evictions += 1 logger.debug(f"Cache evicted (LRU): {lru_key}") async def delete(self, key: str, namespace: Optional[str] = None) -> bool: """ Delete value from cache. Args: key: Cache key namespace: Optional namespace Returns: True if deleted, False if not found """ cache_key = self._make_key(key, namespace) async with self._lock: if cache_key in self._cache: del self._cache[cache_key] self._stats.deletes += 1 self._stats.size = len(self._cache) logger.debug(f"Cache deleted: {cache_key}") return True return False async def clear(self, namespace: Optional[str] = None): """ Clear cache entries. Args: namespace: If provided, only clear entries in this namespace """ async with self._lock: if namespace: # Clear only entries in namespace prefix = f"{namespace}:" keys_to_delete = [k for k in self._cache.keys() if k.startswith(prefix)] for key in keys_to_delete: del self._cache[key] logger.info(f"Cleared {len(keys_to_delete)} entries from namespace: {namespace}") else: # Clear all entries count = len(self._cache) self._cache.clear() logger.info(f"Cleared all {count} cache entries") self._stats.size = len(self._cache) async def exists(self, key: str, namespace: Optional[str] = None) -> bool: """ Check if key exists in cache. Args: key: Cache key namespace: Optional namespace Returns: True if key exists and not expired """ cache_key = self._make_key(key, namespace) async with self._lock: entry = self._cache.get(cache_key) if entry is None: return False if entry.is_expired(): del self._cache[cache_key] self._stats.evictions += 1 self._stats.size = len(self._cache) return False return True def get_stats(self) -> Dict[str, Any]: """Get cache statistics.""" return self._stats.to_dict() def reset_stats(self): """Reset cache statistics.""" self._stats = CacheStats(size=len(self._cache)) logger.info("Cache statistics reset") # Global cache manager instance _cache_manager: Optional[CacheManager] = None @lru_cache(maxsize=1) def get_cache_manager() -> CacheManager: """ Get or create global cache manager instance. Returns: Global CacheManager instance """ global _cache_manager if _cache_manager is None: from ..config import get_settings settings = get_settings() _cache_manager = CacheManager( max_size=settings.cache_max_size, default_ttl=settings.cache_ttl ) logger.info("Created global cache manager") return _cache_manager

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vespo92/TrueNasCoreMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server