"""
Caching service with Redis and in-memory TTL fallback.
"""
import asyncio
import json
import logging
import time
from abc import ABC, abstractmethod
from typing import Any, Generic, Optional, TypeVar
import redis.asyncio as redis
from app.config import settings
from app.utils.errors import CacheError
logger = logging.getLogger(__name__)
T = TypeVar("T")
class CacheBackend(ABC, Generic[T]):
"""Abstract base class for cache backends."""
@abstractmethod
async def get(self, key: str) -> Optional[T]:
"""Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None
"""
pass
@abstractmethod
async def set(self, key: str, value: T, ttl: int) -> None:
"""Set value in cache.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds
"""
pass
@abstractmethod
async def delete(self, key: str) -> None:
"""Delete value from cache.
Args:
key: Cache key
"""
pass
@abstractmethod
async def clear(self) -> None:
"""Clear all cache."""
pass
class RedisCache(CacheBackend[T]):
"""Redis-based cache implementation."""
def __init__(self, redis_url: str):
"""Initialize Redis cache.
Args:
redis_url: Redis connection URL
"""
self.redis_url = redis_url
self.client: Optional[redis.Redis] = None
async def connect(self) -> None:
"""Connect to Redis."""
try:
self.client = await redis.from_url(self.redis_url, decode_responses=True)
await self.client.ping()
logger.info("Connected to Redis")
except Exception as e:
logger.error(f"Failed to connect to Redis: {str(e)}")
raise CacheError(f"Redis connection failed: {str(e)}")
async def disconnect(self) -> None:
"""Disconnect from Redis."""
if self.client:
await self.client.close()
logger.info("Disconnected from Redis")
async def get(self, key: str) -> Optional[T]:
"""Get value from Redis.
Args:
key: Cache key
Returns:
Cached value or None
"""
if not self.client:
return None
try:
value = await self.client.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
logger.warning(f"Redis get error for key {key}: {str(e)}")
return None
async def set(self, key: str, value: T, ttl: int) -> None:
"""Set value in Redis.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds
"""
if not self.client:
return
try:
await self.client.setex(key, ttl, json.dumps(value, default=str))
except Exception as e:
logger.warning(f"Redis set error for key {key}: {str(e)}")
async def delete(self, key: str) -> None:
"""Delete value from Redis.
Args:
key: Cache key
"""
if not self.client:
return
try:
await self.client.delete(key)
except Exception as e:
logger.warning(f"Redis delete error for key {key}: {str(e)}")
async def clear(self) -> None:
"""Clear all Redis cache."""
if not self.client:
return
try:
await self.client.flushdb()
except Exception as e:
logger.warning(f"Redis clear error: {str(e)}")
class InMemoryCache(CacheBackend[T]):
"""In-memory cache with TTL support."""
def __init__(self):
"""Initialize in-memory cache."""
self.cache: dict[str, tuple[Any, float]] = {}
async def get(self, key: str) -> Optional[T]:
"""Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None
"""
if key not in self.cache:
return None
value, expiry = self.cache[key]
if time.time() > expiry:
del self.cache[key]
return None
return value
async def set(self, key: str, value: T, ttl: int) -> None:
"""Set value in cache.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds
"""
expiry = time.time() + ttl
self.cache[key] = (value, expiry)
async def delete(self, key: str) -> None:
"""Delete value from cache.
Args:
key: Cache key
"""
if key in self.cache:
del self.cache[key]
async def clear(self) -> None:
"""Clear all cache."""
self.cache.clear()
class CacheManager:
"""Cache manager with Redis fallback to in-memory."""
def __init__(self):
"""Initialize cache manager."""
self.backend: CacheBackend = InMemoryCache()
self._redis_available = False
async def initialize(self) -> None:
"""Initialize cache backend."""
if settings.redis_enabled:
try:
redis_cache = RedisCache(settings.redis_url)
await redis_cache.connect()
self.backend = redis_cache
self._redis_available = True
logger.info("Using Redis cache")
except Exception as e:
logger.warning(f"Redis initialization failed, using in-memory cache: {str(e)}")
self.backend = InMemoryCache()
else:
logger.info("Using in-memory cache")
self.backend = InMemoryCache()
async def shutdown(self) -> None:
"""Shutdown cache backend."""
if isinstance(self.backend, RedisCache):
await self.backend.disconnect()
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None
"""
return await self.backend.get(key)
async def set(self, key: str, value: Any, ttl: int) -> None:
"""Set value in cache.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds
"""
await self.backend.set(key, value, ttl)
async def delete(self, key: str) -> None:
"""Delete value from cache.
Args:
key: Cache key
"""
await self.backend.delete(key)
async def clear(self) -> None:
"""Clear all cache."""
await self.backend.clear()
@property
def is_redis_available(self) -> bool:
"""Check if Redis is available.
Returns:
True if Redis is available
"""
return self._redis_available
# Global cache manager instance
cache_manager = CacheManager()