Skip to main content
Glama
rate_limiter.py6.64 kB
""" Token Bucket Rate Limiter for Reddit API. Implements a token bucket algorithm to ensure we never exceed Reddit's 100 requests per 60 seconds limit (free tier). """ import asyncio import logging from collections import deque from datetime import datetime, timedelta from typing import Optional import structlog logger = structlog.get_logger(__name__) class TokenBucketRateLimiter: """ Token bucket rate limiter for Reddit API requests. Implements a sliding window approach using a deque to track call timestamps. Ensures we never exceed max_calls within period_seconds window. Thread-safe using asyncio.Lock. """ def __init__(self, max_calls: int = 100, period_seconds: int = 60) -> None: """ Initialize the rate limiter. Args: max_calls: Maximum number of calls allowed in the time period (default: 100) period_seconds: Time period in seconds (default: 60) """ self.max_calls = max_calls self.period = timedelta(seconds=period_seconds) self.period_seconds = period_seconds self.calls: deque[datetime] = deque() # Store timestamps of API calls self.lock = asyncio.Lock() logger.info( "rate_limiter_initialized", max_calls=max_calls, period_seconds=period_seconds ) async def acquire(self, priority: int = 0) -> bool: """ Acquire permission to make an API call. Blocks (waits) if rate limit is currently exceeded, then grants permission. Implements a sliding window algorithm. Args: priority: Request priority (0=normal, 1=high, -1=low). Currently for future use - all priorities are treated equally in MVP. Returns: True when permission is granted (always returns True, waits if needed) Example: >>> rate_limiter = TokenBucketRateLimiter(max_calls=100, period_seconds=60) >>> await rate_limiter.acquire() # Returns immediately if capacity available True >>> # After 100 calls in 60 seconds: >>> await rate_limiter.acquire() # Waits until oldest call expires True """ async with self.lock: now = datetime.utcnow() # Remove calls outside the sliding window while self.calls and now - self.calls[0] > self.period: self.calls.popleft() # Check if we have capacity if len(self.calls) < self.max_calls: self.calls.append(now) remaining = self.max_calls - len(self.calls) logger.debug( "rate_limit_acquired", calls_made=len(self.calls), remaining=remaining, priority=priority ) # Log warning when approaching limit (>90% used) if len(self.calls) > (self.max_calls * 0.9): logger.warning( "rate_limit_approaching", calls_made=len(self.calls), max_calls=self.max_calls, remaining=remaining ) return True # Rate limit exceeded - calculate wait time oldest_call = self.calls[0] wait_until = oldest_call + self.period wait_time = (wait_until - now).total_seconds() # Add small buffer to avoid edge cases wait_time_buffered = wait_time + 0.1 logger.warning( "rate_limit_hit", calls_made=len(self.calls), max_calls=self.max_calls, wait_seconds=round(wait_time_buffered, 2), oldest_call=oldest_call.isoformat(), priority=priority ) # Release lock while waiting to allow other operations await asyncio.sleep(wait_time_buffered) # Recursively retry after waiting return await self.acquire(priority) def get_remaining(self) -> int: """ Get the number of available calls in the current window. This is a synchronous method for quick status checks. For accurate results during concurrent operations, consider the race condition between checking and acquiring. Returns: Number of calls available before hitting rate limit Example: >>> rate_limiter = TokenBucketRateLimiter(max_calls=100, period_seconds=60) >>> rate_limiter.get_remaining() 100 >>> await rate_limiter.acquire() >>> rate_limiter.get_remaining() 99 """ now = datetime.utcnow() # Count valid calls within the current window valid_calls = [c for c in self.calls if now - c <= self.period] remaining = self.max_calls - len(valid_calls) return max(0, remaining) # Never return negative async def reset(self) -> None: """ Reset the rate limiter state. Useful for testing or manual intervention. """ async with self.lock: self.calls.clear() logger.info("rate_limiter_reset") def get_stats(self) -> dict[str, any]: """ Get current rate limiter statistics. Returns: Dictionary with current stats including calls made, remaining capacity, window size, and oldest call timestamp. Example: >>> stats = rate_limiter.get_stats() >>> print(stats) { 'calls_made': 45, 'remaining': 55, 'max_calls': 100, 'period_seconds': 60, 'oldest_call': '2025-11-05T12:34:56.789012', 'utilization_percent': 45.0 } """ now = datetime.utcnow() valid_calls = [c for c in self.calls if now - c <= self.period] calls_made = len(valid_calls) remaining = self.max_calls - calls_made oldest_call: Optional[str] = None if valid_calls: oldest_call = min(valid_calls).isoformat() utilization = (calls_made / self.max_calls * 100) if self.max_calls > 0 else 0 return { "calls_made": calls_made, "remaining": remaining, "max_calls": self.max_calls, "period_seconds": self.period_seconds, "oldest_call": oldest_call, "utilization_percent": round(utilization, 2) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/padak/apify-actor-reddit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server