test_rate_limiters.py•11.4 kB
import time
from contextlib import contextmanager
from math import exp, isclose
from typing import Any, Callable, Iterator, List, Optional
from unittest import mock
import pytest
from phoenix.client.utils.rate_limiters import (
AdaptiveTokenBucket,
UnavailableTokensError,
)
@contextmanager
def freeze_time(frozen_time: Optional[float] = None) -> Iterator[Callable[[], None]]:
frozen_time = time.time() if frozen_time is None else frozen_time
with mock.patch("time.time") as mock_time:
mock_time.return_value = frozen_time
yield mock_time
@contextmanager
def warp_time(start: Optional[float]) -> Iterator[Any]:
sleeps: List[float] = [0]
current_time = start if start is not None else time.time()
def instant_sleep(sleep_time: float) -> None:
nonlocal sleeps
sleeps.append(sleep_time)
def time_warp() -> float:
try:
nonlocal current_time
nonlocal sleeps
current_time += sleeps.pop()
return current_time
except IndexError:
return current_time
with mock.patch("time.time") as mock_time:
with mock.patch("time.sleep") as mock_sleep:
mock_sleep.side_effect = instant_sleep
mock_time.side_effect = time_warp
yield mock_sleep
@contextmanager
def async_warp_time(start: Optional[float]) -> Iterator[Any]:
sleeps: List[float] = [0]
current_time = start if start is not None else time.time()
def instant_sleep(sleep_time: float) -> None:
nonlocal sleeps
sleeps.append(sleep_time)
def time_warp() -> float:
try:
nonlocal current_time
nonlocal sleeps
current_time += sleeps.pop()
return current_time
except IndexError:
return current_time
with mock.patch("time.time") as mock_time:
with mock.patch("asyncio.sleep") as mock_sleep:
mock_sleep.side_effect = instant_sleep
mock_time.side_effect = time_warp
yield mock_sleep
def test_token_bucket_gains_tokens_over_time() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with freeze_time(start + 5):
assert isclose(bucket.available_requests(), 5)
with freeze_time(start + 10):
assert isclose(bucket.available_requests(), 10)
def test_token_rate_limiter_can_max_out_on_requests() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with freeze_time(start + 30):
assert bucket.available_requests() == 30
with freeze_time(start + 120):
assert bucket.available_requests() == 120
with freeze_time(start + 130):
assert bucket.available_requests() == 120
def test_token_rate_limiter_spends_tokens() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with freeze_time(start + 3):
assert bucket.available_requests() == 3
bucket.make_request_if_ready()
assert bucket.available_requests() == 2
def test_token_rate_limiter_cannot_spend_unavailable_tokens() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
assert bucket.available_requests() == 0
with pytest.raises(UnavailableTokensError):
bucket.make_request_if_ready()
def test_token_rate_limiter_can_block_until_tokens_are_available() -> None:
start = time.time()
with freeze_time(start):
rate = 0.5
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with warp_time(start) as mock_sleep:
assert bucket.available_requests() == 0
bucket.wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = 1 / rate
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
async def test_token_rate_limiter_async_waits_until_tokens_are_available() -> None:
start = time.time()
with freeze_time(start):
rate = 0.5
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with async_warp_time(start) as mock_sleep:
assert bucket.available_requests() == 0
await bucket.async_wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = 1 / rate
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
def test_token_rate_limiter_can_accumulate_tokens_before_waiting() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with warp_time(start + 5) as mock_sleep:
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = (1 / rate) - 5
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
async def test_token_rate_limiter_can_async_accumulate_tokens_before_waiting() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with async_warp_time(start + 5) as mock_sleep:
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
await bucket.async_wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = (1 / rate) - 5
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
def test_token_bucket_adaptively_increases_rate_over_time() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=0.01,
cooldown_seconds=5,
)
with warp_time(start + 5) as mock_sleep:
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
elapsed_time = sum(sleeps) + 5
assert isclose(bucket.rate, 0.1 * exp(0.01 * elapsed_time))
def test_token_bucket_does_not_increase_rate_past_maximum() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=100,
cooldown_seconds=5,
)
with warp_time(start + 5):
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
assert isclose(bucket.rate, rate * 2)
def test_token_bucket_resets_rate_after_inactivity() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=100,
cooldown_seconds=5,
)
with warp_time(start + 5):
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
assert isclose(bucket.rate, rate * 2)
with warp_time(start + 100):
bucket.wait_until_ready()
assert isclose(bucket.rate, rate)
def test_token_bucket_decreases_rate() -> None:
start = time.time()
with warp_time(start):
rate = 100
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=0.25,
rate_increase_factor=0.01,
cooldown_seconds=5,
)
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25)
assert bucket.tokens == 0
assert time.time() == start + 5
def test_token_bucket_decreases_rate_once_per_cooldown_period() -> None:
start = time.time()
with warp_time(start):
rate = 100
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=0.25,
rate_increase_factor=0.01,
cooldown_seconds=5,
)
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25)
with warp_time(start + 3):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25), "3 seconds is still within the cooldown period"
with warp_time(start - 6):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25), "requests before the rate limited request are ignored"
with warp_time(start + 6):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 6.25)