Skip to main content
Glama
cache.py7.03 kB
""" Caching service with Redis and in-memory TTL fallback. """ import asyncio import json import logging import time from abc import ABC, abstractmethod from typing import Any, Generic, Optional, TypeVar import redis.asyncio as redis from app.config import settings from app.utils.errors import CacheError logger = logging.getLogger(__name__) T = TypeVar("T") class CacheBackend(ABC, Generic[T]): """Abstract base class for cache backends.""" @abstractmethod async def get(self, key: str) -> Optional[T]: """Get value from cache. Args: key: Cache key Returns: Cached value or None """ pass @abstractmethod async def set(self, key: str, value: T, ttl: int) -> None: """Set value in cache. Args: key: Cache key value: Value to cache ttl: Time to live in seconds """ pass @abstractmethod async def delete(self, key: str) -> None: """Delete value from cache. Args: key: Cache key """ pass @abstractmethod async def clear(self) -> None: """Clear all cache.""" pass class RedisCache(CacheBackend[T]): """Redis-based cache implementation.""" def __init__(self, redis_url: str): """Initialize Redis cache. Args: redis_url: Redis connection URL """ self.redis_url = redis_url self.client: Optional[redis.Redis] = None async def connect(self) -> None: """Connect to Redis.""" try: self.client = await redis.from_url(self.redis_url, decode_responses=True) await self.client.ping() logger.info("Connected to Redis") except Exception as e: logger.error(f"Failed to connect to Redis: {str(e)}") raise CacheError(f"Redis connection failed: {str(e)}") async def disconnect(self) -> None: """Disconnect from Redis.""" if self.client: await self.client.close() logger.info("Disconnected from Redis") async def get(self, key: str) -> Optional[T]: """Get value from Redis. Args: key: Cache key Returns: Cached value or None """ if not self.client: return None try: value = await self.client.get(key) if value: return json.loads(value) return None except Exception as e: logger.warning(f"Redis get error for key {key}: {str(e)}") return None async def set(self, key: str, value: T, ttl: int) -> None: """Set value in Redis. Args: key: Cache key value: Value to cache ttl: Time to live in seconds """ if not self.client: return try: await self.client.setex(key, ttl, json.dumps(value, default=str)) except Exception as e: logger.warning(f"Redis set error for key {key}: {str(e)}") async def delete(self, key: str) -> None: """Delete value from Redis. Args: key: Cache key """ if not self.client: return try: await self.client.delete(key) except Exception as e: logger.warning(f"Redis delete error for key {key}: {str(e)}") async def clear(self) -> None: """Clear all Redis cache.""" if not self.client: return try: await self.client.flushdb() except Exception as e: logger.warning(f"Redis clear error: {str(e)}") class InMemoryCache(CacheBackend[T]): """In-memory cache with TTL support.""" def __init__(self): """Initialize in-memory cache.""" self.cache: dict[str, tuple[Any, float]] = {} async def get(self, key: str) -> Optional[T]: """Get value from cache. Args: key: Cache key Returns: Cached value or None """ if key not in self.cache: return None value, expiry = self.cache[key] if time.time() > expiry: del self.cache[key] return None return value async def set(self, key: str, value: T, ttl: int) -> None: """Set value in cache. Args: key: Cache key value: Value to cache ttl: Time to live in seconds """ expiry = time.time() + ttl self.cache[key] = (value, expiry) async def delete(self, key: str) -> None: """Delete value from cache. Args: key: Cache key """ if key in self.cache: del self.cache[key] async def clear(self) -> None: """Clear all cache.""" self.cache.clear() class CacheManager: """Cache manager with Redis fallback to in-memory.""" def __init__(self): """Initialize cache manager.""" self.backend: CacheBackend = InMemoryCache() self._redis_available = False async def initialize(self) -> None: """Initialize cache backend.""" if settings.redis_enabled: try: redis_cache = RedisCache(settings.redis_url) await redis_cache.connect() self.backend = redis_cache self._redis_available = True logger.info("Using Redis cache") except Exception as e: logger.warning(f"Redis initialization failed, using in-memory cache: {str(e)}") self.backend = InMemoryCache() else: logger.info("Using in-memory cache") self.backend = InMemoryCache() async def shutdown(self) -> None: """Shutdown cache backend.""" if isinstance(self.backend, RedisCache): await self.backend.disconnect() async def get(self, key: str) -> Optional[Any]: """Get value from cache. Args: key: Cache key Returns: Cached value or None """ return await self.backend.get(key) async def set(self, key: str, value: Any, ttl: int) -> None: """Set value in cache. Args: key: Cache key value: Value to cache ttl: Time to live in seconds """ await self.backend.set(key, value, ttl) async def delete(self, key: str) -> None: """Delete value from cache. Args: key: Cache key """ await self.backend.delete(key) async def clear(self) -> None: """Clear all cache.""" await self.backend.clear() @property def is_redis_available(self) -> bool: """Check if Redis is available. Returns: True if Redis is available """ return self._redis_available # Global cache manager instance cache_manager = CacheManager()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wintersoldier3912-a11y/Project-1'

If you have feedback or need assistance with the MCP directory API, please join our Discord server