We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import time
from contextlib import contextmanager
from math import exp, isclose
from typing import Any, Callable, Iterator, List, Optional
from unittest import mock
import pytest
from phoenix.client.utils.rate_limiters import (
AdaptiveTokenBucket,
UnavailableTokensError,
)
@contextmanager
def freeze_time(frozen_time: Optional[float] = None) -> Iterator[Callable[[], None]]:
frozen_time = time.time() if frozen_time is None else frozen_time
with mock.patch("time.time") as mock_time:
mock_time.return_value = frozen_time
yield mock_time
@contextmanager
def warp_time(start: Optional[float]) -> Iterator[Any]:
sleeps: List[float] = [0]
current_time = start if start is not None else time.time()
def instant_sleep(sleep_time: float) -> None:
nonlocal sleeps
sleeps.append(sleep_time)
def time_warp() -> float:
try:
nonlocal current_time
nonlocal sleeps
current_time += sleeps.pop()
return current_time
except IndexError:
return current_time
with mock.patch("time.time") as mock_time:
with mock.patch("time.sleep") as mock_sleep:
mock_sleep.side_effect = instant_sleep
mock_time.side_effect = time_warp
yield mock_sleep
@contextmanager
def async_warp_time(start: Optional[float]) -> Iterator[Any]:
sleeps: List[float] = [0]
current_time = start if start is not None else time.time()
def instant_sleep(sleep_time: float) -> None:
nonlocal sleeps
sleeps.append(sleep_time)
def time_warp() -> float:
try:
nonlocal current_time
nonlocal sleeps
current_time += sleeps.pop()
return current_time
except IndexError:
return current_time
with mock.patch("time.time") as mock_time:
with mock.patch("asyncio.sleep") as mock_sleep:
mock_sleep.side_effect = instant_sleep
mock_time.side_effect = time_warp
yield mock_sleep
def test_token_bucket_gains_tokens_over_time() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with freeze_time(start + 5):
assert isclose(bucket.available_requests(), 5)
with freeze_time(start + 10):
assert isclose(bucket.available_requests(), 10)
def test_token_rate_limiter_can_max_out_on_requests() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with freeze_time(start + 30):
assert bucket.available_requests() == 30
with freeze_time(start + 120):
assert bucket.available_requests() == 120
with freeze_time(start + 130):
assert bucket.available_requests() == 120
def test_token_rate_limiter_spends_tokens() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with freeze_time(start + 3):
assert bucket.available_requests() == 3
bucket.make_request_if_ready()
assert bucket.available_requests() == 2
def test_token_rate_limiter_cannot_spend_unavailable_tokens() -> None:
start = time.time()
with freeze_time(start):
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=1,
maximum_per_second_request_rate=1,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
assert bucket.available_requests() == 0
with pytest.raises(UnavailableTokensError):
bucket.make_request_if_ready()
def test_token_rate_limiter_can_block_until_tokens_are_available() -> None:
start = time.time()
with freeze_time(start):
rate = 0.5
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with warp_time(start) as mock_sleep:
assert bucket.available_requests() == 0
bucket.wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = 1 / rate
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
async def test_token_rate_limiter_async_waits_until_tokens_are_available() -> None:
start = time.time()
with freeze_time(start):
rate = 0.5
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with async_warp_time(start) as mock_sleep:
assert bucket.available_requests() == 0
await bucket.async_wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = 1 / rate
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
def test_token_rate_limiter_can_accumulate_tokens_before_waiting() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with warp_time(start + 5) as mock_sleep:
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = (1 / rate) - 5
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
async def test_token_rate_limiter_can_async_accumulate_tokens_before_waiting() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=2,
rate_reduction_factor=1,
rate_increase_factor=0,
cooldown_seconds=5,
)
with async_warp_time(start + 5) as mock_sleep:
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
await bucket.async_wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
time_cost = (1 / rate) - 5
assert isclose(sum(sleeps), time_cost, rel_tol=0.2)
def test_token_bucket_adaptively_increases_rate_over_time() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=0.01,
cooldown_seconds=5,
)
with warp_time(start + 5) as mock_sleep:
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
sleeps = [call.args[0] for call in mock_sleep.call_args_list]
elapsed_time = sum(sleeps) + 5
assert isclose(bucket.rate, 0.1 * exp(0.01 * elapsed_time))
def test_token_bucket_does_not_increase_rate_past_maximum() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=100,
cooldown_seconds=5,
)
with warp_time(start + 5):
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
assert isclose(bucket.rate, rate * 2)
def test_token_bucket_resets_rate_after_inactivity() -> None:
start = time.time()
with freeze_time(start):
rate = 0.1
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=1,
rate_increase_factor=100,
cooldown_seconds=5,
)
with warp_time(start + 5):
assert bucket.available_requests() == 0.5, "should have accumulated half a request"
bucket.wait_until_ready()
assert isclose(bucket.rate, rate * 2)
with warp_time(start + 100):
bucket.wait_until_ready()
assert isclose(bucket.rate, rate)
def test_token_bucket_decreases_rate() -> None:
start = time.time()
with warp_time(start):
rate = 100
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=0.25,
rate_increase_factor=0.01,
cooldown_seconds=5,
)
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25)
assert bucket.tokens == 0
assert time.time() == start + 5
def test_token_bucket_decreases_rate_once_per_cooldown_period() -> None:
start = time.time()
with warp_time(start):
rate = 100
bucket = AdaptiveTokenBucket(
initial_per_second_request_rate=rate,
maximum_per_second_request_rate=rate * 2,
enforcement_window_minutes=1,
rate_reduction_factor=0.25,
rate_increase_factor=0.01,
cooldown_seconds=5,
)
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25)
with warp_time(start + 3):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25), "3 seconds is still within the cooldown period"
with warp_time(start - 6):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 25), "requests before the rate limited request are ignored"
with warp_time(start + 6):
bucket.on_rate_limit_error(request_start_time=time.time())
assert isclose(bucket.rate, 6.25)