cache.py•3.99 kB
"""캐시 관리"""
import redis.asyncio as redis
import json
import logging
from typing import Dict, Any, Optional, Union
from src.exceptions import CacheError
class CacheManager:
"""Redis 캐시 관리자"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.redis_client: Optional[redis.Redis] = None
self.default_ttl = 300 # 5분
self.logger = logging.getLogger(__name__)
@property
def connection_url(self) -> str:
"""Redis 연결 URL 생성"""
auth = f":{self.config['password']}@" if self.config['password'] else ""
return f"redis://{auth}{self.config['host']}:{self.config['port']}/{self.config['db']}"
@property
def is_connected(self) -> bool:
"""연결 상태 확인"""
return self.redis_client is not None
async def connect(self) -> None:
"""Redis 연결"""
try:
self.redis_client = redis.from_url(
self.connection_url,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
# 연결 테스트
await self.redis_client.ping()
self.logger.info("Redis 연결 성공")
except Exception as e:
self.logger.error(f"Redis 연결 실패: {e}")
self.redis_client = None
raise CacheError(f"Redis 연결 실패: {e}")
async def disconnect(self) -> None:
"""Redis 연결 해제"""
if self.redis_client:
await self.redis_client.close()
self.redis_client = None
self.logger.info("Redis 연결 해제")
async def get(self, key: str) -> Optional[Any]:
"""캐시에서 데이터 조회"""
if not self.is_connected:
raise CacheError("Redis에 연결되지 않음")
try:
data = await self.redis_client.get(key)
return json.loads(data) if data else None
except Exception as e:
self.logger.error(f"캐시 조회 실패: {e}")
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""캐시에 데이터 저장"""
if not self.is_connected:
raise CacheError("Redis에 연결되지 않음")
try:
ttl = ttl or self.default_ttl
serialized = json.dumps(value, ensure_ascii=False)
result = await self.redis_client.setex(key, ttl, serialized)
return bool(result)
except Exception as e:
self.logger.error(f"캐시 저장 실패: {e}")
return False
async def delete(self, key: str) -> bool:
"""캐시에서 키 삭제"""
if not self.is_connected:
raise CacheError("Redis에 연결되지 않음")
try:
result = await self.redis_client.delete(key)
return result > 0
except Exception as e:
self.logger.error(f"캐시 삭제 실패: {e}")
return False
async def invalidate_pattern(self, pattern: str) -> int:
"""패턴 매칭으로 캐시 무효화"""
if not self.is_connected:
raise CacheError("Redis에 연결되지 않음")
try:
keys = await self.redis_client.keys(pattern)
if keys:
return await self.redis_client.delete(*keys)
return 0
except Exception as e:
self.logger.error(f"캐시 무효화 실패: {e}")
return 0
async def health_check(self) -> bool:
"""Redis 상태 확인"""
try:
if not self.is_connected:
return False
await self.redis_client.ping()
return True
except Exception as e:
self.logger.error(f"Redis 헬스체크 실패: {e}")
return False