redis_cache.py•4.12 kB
"""Redis-based caching implementation."""
import json
from typing import Any, Optional
import redis.asyncio as redis
from src.utils.logger import get_logger
logger = get_logger(__name__)
class CacheManager:
"""
Redis-based cache manager with TTL support.
"""
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: Optional[str] = None,
ssl: bool = False,
):
"""
Initialize cache manager.
Args:
host: Redis host
port: Redis port
db: Redis database number
password: Redis password (optional)
ssl: Use SSL connection
"""
self.host = host
self.port = port
self.db = db
self.password = password
self.ssl = ssl
self.client: Optional[redis.Redis] = None
async def connect(self) -> None:
"""Establish connection to Redis."""
try:
self.client = await redis.Redis(
host=self.host,
port=self.port,
db=self.db,
password=self.password,
ssl=self.ssl,
decode_responses=True,
)
await self.client.ping()
logger.info("redis_connected", host=self.host, port=self.port)
except Exception as e:
logger.error("redis_connection_failed", error=str(e))
raise
async def close(self) -> None:
"""Close Redis connection."""
if self.client:
await self.client.close()
logger.info("redis_connection_closed")
async def get(self, key: str) -> Optional[Any]:
"""
Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None if not found
"""
if not self.client:
logger.warning("redis_not_connected")
return None
try:
value = await self.client.get(key)
if value:
logger.debug("cache_hit", key=key)
return json.loads(value)
logger.debug("cache_miss", key=key)
return None
except Exception as e:
logger.error("cache_get_error", key=key, error=str(e))
return None
async def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
"""
Set value in cache with TTL.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds
Returns:
True if successful, False otherwise
"""
if not self.client:
logger.warning("redis_not_connected")
return False
try:
serialized = json.dumps(value)
await self.client.setex(key, ttl, serialized)
logger.debug("cache_set", key=key, ttl=ttl)
return True
except Exception as e:
logger.error("cache_set_error", key=key, error=str(e))
return False
async def delete(self, key: str) -> bool:
"""
Delete key from cache.
Args:
key: Cache key
Returns:
True if successful, False otherwise
"""
if not self.client:
logger.warning("redis_not_connected")
return False
try:
await self.client.delete(key)
logger.debug("cache_delete", key=key)
return True
except Exception as e:
logger.error("cache_delete_error", key=key, error=str(e))
return False
async def exists(self, key: str) -> bool:
"""
Check if key exists in cache.
Args:
key: Cache key
Returns:
True if exists, False otherwise
"""
if not self.client:
return False
try:
return bool(await self.client.exists(key))
except Exception as e:
logger.error("cache_exists_error", key=key, error=str(e))
return False