"""레이트 리밋 관리 - Token Bucket 알고리즘 구현"""
import asyncio
import time
from .types import RateLimitConfig, RateLimitResult, RateLimitStatus
class TokenBucket:
"""Token Bucket 알고리즘 구현"""
def __init__(self, capacity: int, refill_rate: float):
"""
Args:
capacity: 최대 토큰 수
refill_rate: 초당 토큰 충전량
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = float(capacity)
self.last_refill = time.time()
self._lock = asyncio.Lock()
def _refill(self) -> None:
"""토큰 충전"""
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
async def acquire(self, tokens: int = 1) -> bool:
"""
토큰 획득 시도
Args:
tokens: 필요한 토큰 수
Returns:
획득 성공 여부
"""
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_available(self) -> int:
"""사용 가능한 토큰 수 반환"""
self._refill()
return int(self.tokens)
class TokenBucketRateLimiter:
"""멀티 레벨 Token Bucket 레이트 리미터"""
def __init__(self, config: RateLimitConfig | None = None):
"""
Args:
config: 레이트 리밋 설정
"""
config = config or RateLimitConfig()
# 초당/분당/시간당 버킷 생성
self._second_bucket = TokenBucket(
capacity=config.requests_per_second,
refill_rate=config.requests_per_second, # 1초에 전체 충전
)
self._minute_bucket = TokenBucket(
capacity=config.requests_per_minute,
refill_rate=config.requests_per_minute / 60, # 60초에 전체 충전
)
self._hour_bucket = TokenBucket(
capacity=config.requests_per_hour,
refill_rate=config.requests_per_hour / 3600, # 3600초에 전체 충전
)
async def acquire(self) -> RateLimitResult:
"""
요청 허용 여부 확인 및 토큰 획득
Returns:
RateLimitResult: 허용 여부, 남은 토큰, 리셋 시간
"""
# 모든 버킷에서 동시에 토큰 획득 시도
second_ok = await self._second_bucket.acquire()
minute_ok = await self._minute_bucket.acquire()
hour_ok = await self._hour_bucket.acquire()
allowed = second_ok and minute_ok and hour_ok
# 거부된 경우 획득한 토큰 반환은 생략 (보수적 접근)
# 실제 환경에서는 롤백 로직 추가 가능
# 가장 적은 토큰을 가진 버킷 기준으로 remaining 계산
remaining = min(
self._second_bucket.get_available(),
self._minute_bucket.get_available(),
self._hour_bucket.get_available(),
)
# 리셋 시간: 1초 후 (초당 버킷 기준)
reset_at = time.time() + 1.0
return RateLimitResult(
allowed=allowed,
remaining=remaining,
reset_at=reset_at,
)
def get_status(self) -> RateLimitStatus:
"""현재 레이트 리밋 상태 조회"""
available = min(
self._second_bucket.get_available(),
self._minute_bucket.get_available(),
self._hour_bucket.get_available(),
)
return RateLimitStatus(
available=available,
reset_at=time.time() + 1.0,
)