Skip to main content
Glama
rate_limiter.py7.36 kB
"""Rate limiting for consolidation agents. Implements a token bucket algorithm to prevent agent operations from overwhelming the system. Each agent has its own rate limiter. Design Decision (from research.md): Token bucket is simple, well-understood, and allows bursting while maintaining overall rate limits. Per-agent limits allow parallelism while preventing storms. """ from __future__ import annotations import logging import time from collections import deque from threading import Lock logger = logging.getLogger(__name__) class RateLimiter: """Token bucket rate limiter for consolidation agents. Implements a sliding window rate limiter that allows up to `max_ops` operations within any `window_seconds` period. Attributes: max_ops: Maximum operations allowed per window window: Window size in seconds remaining: Current number of tokens available Example: >>> limiter = RateLimiter(max_ops=100, window_seconds=60) >>> if limiter.acquire(): ... # Perform operation ... pass >>> # Or block until ready: >>> limiter.wait_and_acquire() """ def __init__(self, max_ops: int = 100, window_seconds: int = 60) -> None: """Initialize rate limiter. Args: max_ops: Maximum operations per window (default: 100) window_seconds: Window size in seconds (default: 60) Raises: ValueError: If max_ops < 1 or window_seconds < 1 """ if max_ops < 1: raise ValueError("max_ops must be >= 1") if window_seconds < 1: raise ValueError("window_seconds must be >= 1") self.max_ops = max_ops self.window = window_seconds self._timestamps: deque[float] = deque() self._lock = Lock() @property def remaining(self) -> int: """Number of tokens remaining in current window.""" with self._lock: self._cleanup() return self.max_ops - len(self._timestamps) def _cleanup(self) -> None: """Remove expired timestamps from the window.""" now = time.time() cutoff = now - self.window while self._timestamps and self._timestamps[0] < cutoff: self._timestamps.popleft() def acquire(self) -> bool: """Try to acquire a token. Returns: True if token acquired, False if rate limit exceeded Thread-safe: Can be called from multiple threads. """ with self._lock: self._cleanup() if len(self._timestamps) < self.max_ops: self._timestamps.append(time.time()) logger.debug( f"Rate limiter: acquired token ({self.max_ops - len(self._timestamps)} remaining)" ) return True logger.debug(f"Rate limiter: limit exceeded ({self.max_ops} ops/{self.window}s)") return False def wait_and_acquire(self, timeout: float | None = None) -> bool: """Block until a token is available. Args: timeout: Maximum seconds to wait (None = wait forever) Returns: True if token acquired, False if timeout reached Thread-safe: Can be called from multiple threads. """ start_time = time.time() poll_interval = 0.1 # 100ms between checks while True: if self.acquire(): return True # Check timeout if timeout is not None: elapsed = time.time() - start_time if elapsed >= timeout: logger.debug(f"Rate limiter: timeout after {elapsed:.2f}s") return False time.sleep(poll_interval) def reset(self) -> None: """Reset the rate limiter, clearing all timestamps. Useful for testing or administrative purposes. """ with self._lock: self._timestamps.clear() logger.debug("Rate limiter: reset") def time_until_available(self) -> float: """Calculate seconds until next token is available. Returns: Seconds to wait (0 if token available now) """ with self._lock: self._cleanup() if len(self._timestamps) < self.max_ops: return 0.0 # Oldest timestamp will expire first oldest = self._timestamps[0] wait_time = (oldest + self.window) - time.time() return max(0.0, wait_time) def __repr__(self) -> str: """Return string representation.""" return f"RateLimiter(max_ops={self.max_ops}, window={self.window}s, remaining={self.remaining})" class AgentRateLimiters: """Manager for per-agent rate limiters. Provides centralized access to rate limiters for each agent type, ensuring consistent rate limiting across the consolidation system. Example: >>> limiters = AgentRateLimiters(default_ops=100) >>> if limiters.get("decay").acquire(): ... # Run decay analyzer ... pass """ def __init__( self, default_ops: int = 100, default_window: int = 60, per_agent_limits: dict[str, int] | None = None, ) -> None: """Initialize agent rate limiters. Args: default_ops: Default max operations per minute default_window: Default window in seconds per_agent_limits: Optional per-agent operation limits """ self._default_ops = default_ops self._default_window = default_window self._per_agent_limits = per_agent_limits or {} self._limiters: dict[str, RateLimiter] = {} self._lock = Lock() def get(self, agent: str) -> RateLimiter: """Get or create rate limiter for an agent. Args: agent: Agent name (decay, cluster, merge, promote, relations) Returns: RateLimiter instance for the agent """ with self._lock: if agent not in self._limiters: max_ops = self._per_agent_limits.get(agent, self._default_ops) self._limiters[agent] = RateLimiter( max_ops=max_ops, window_seconds=self._default_window, ) logger.debug( f"Created rate limiter for {agent}: {max_ops} ops/{self._default_window}s" ) return self._limiters[agent] def status(self) -> dict[str, dict[str, int | float]]: """Get status of all active rate limiters. Returns: Dict mapping agent name to status (remaining, max_ops, wait_time) """ with self._lock: return { agent: { "remaining": limiter.remaining, "max_ops": limiter.max_ops, "wait_time": limiter.time_until_available(), } for agent, limiter in self._limiters.items() } def reset_all(self) -> None: """Reset all rate limiters.""" with self._lock: for limiter in self._limiters.values(): limiter.reset() logger.info("Reset all agent rate limiters")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server