import redis.asyncio as redis
import json
from typing import Optional, Any, List
from datetime import timedelta
import structlog
from src.config import get_settings
logger = structlog.get_logger()
settings = get_settings()
class RedisCache:
"""Redis caching layer."""
def __init__(self):
self.client: Optional[redis.Redis] = None
self.default_ttl = settings.REDIS_CACHE_TTL
async def connect(self):
"""Initialize Redis connection."""
self.client = await redis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True
)
logger.info("redis_connected")
async def disconnect(self):
"""Close Redis connection."""
if self.client:
await self.client.close()
logger.info("redis_disconnected")
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
if not self.client:
return None
try:
value = await self.client.get(key)
if value:
return json.loads(value)
except Exception as e:
logger.error("cache_get_error", key=key, error=str(e))
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
"""Set value in cache with TTL."""
if not self.client:
return False
try:
ttl = ttl or self.default_ttl
await self.client.setex(
key,
ttl,
json.dumps(value, default=str)
)
return True
except Exception as e:
logger.error("cache_set_error", key=key, error=str(e))
return False
async def delete(self, key: str) -> bool:
"""Delete value from cache."""
if not self.client:
return False
try:
await self.client.delete(key)
return True
except Exception as e:
logger.error("cache_delete_error", key=key, error=str(e))
return False
async def exists(self, key: str) -> bool:
"""Check if key exists in cache."""
if not self.client:
return False
try:
return bool(await self.client.exists(key))
except Exception as e:
logger.error("cache_exists_error", key=key, error=str(e))
return False
async def ping(self) -> bool:
"""Ping the cache."""
if not self.client:
return False
try:
return await self.client.ping()
except Exception as e:
logger.error("cache_ping_error", error=str(e))
return False
async def get_or_set(
self,
key: str,
factory_func,
ttl: Optional[int] = None
) -> Optional[Any]:
"""Get from cache or execute factory function and cache result."""
value = await self.get(key)
if value is not None:
return value
value = await factory_func()
if value is not None:
await self.set(key, value, ttl)
return value
async def scan(self, pattern: str) -> List[str]:
"""Scan for keys matching pattern."""
if not self.client:
return []
try:
keys = []
async for key in self.client.scan_iter(match=pattern):
keys.append(key)
return keys
except Exception as e:
logger.error("cache_scan_error", pattern=pattern, error=str(e))
return []
# Global cache instance
cache = RedisCache()