Skip to main content
Glama
backpressure.py3.64 kB
"""Utilities for detecting and reporting backpressure conditions.""" from __future__ import annotations from dataclasses import dataclass, field from threading import Lock from typing import Callable NoticeWriter = Callable[[str], None] @dataclass(slots=True) class BackpressureMonitor: """Track outstanding work items and emit notices when buffering occurs.""" name: str threshold: int notice_writer: NoticeWriter recovery_threshold: int | None = None enter_message_factory: Callable[[int], str] | None = None exit_message_factory: Callable[[], str] | None = None _lock: Lock = field(default_factory=Lock, init=False, repr=False) _pending: int = field(default=0, init=False, repr=False) _in_backpressure: bool = field(default=False, init=False, repr=False) def __post_init__(self) -> None: if self.threshold <= 0: raise ValueError("threshold must be positive") if self.recovery_threshold is None: self.recovery_threshold = max(0, self.threshold // 2) if self.recovery_threshold < 0: raise ValueError("recovery_threshold must be non-negative") if self.exit_message_factory is None: self.exit_message_factory = lambda: f"[{self.name}] backlog cleared; operations resumed in real time." if self.enter_message_factory is None: self.enter_message_factory = ( lambda count: ( f"[{self.name}] buffering {count} pending item(s); processing will resume shortly." ) ) def increment(self, amount: int = 1) -> int: """Increase the tracked count and emit a notice if threshold is crossed.""" if amount <= 0: raise ValueError("amount must be positive") message: str | None = None with self._lock: self._pending += amount count = self._pending if not self._in_backpressure and count >= self.threshold: self._in_backpressure = True message = self.enter_message_factory(count) if message: self.notice_writer(message) return count def decrement(self, amount: int = 1) -> int: """Decrease the tracked count and emit a notice when backpressure clears.""" if amount <= 0: raise ValueError("amount must be positive") message: str | None = None with self._lock: self._pending = max(0, self._pending - amount) count = self._pending if self._in_backpressure and count <= self.recovery_threshold: self._in_backpressure = False message = self.exit_message_factory() if message: self.notice_writer(message) return self._pending def reset(self) -> None: """Reset the monitor to an idle state, emitting notices if necessary.""" message: str | None = None with self._lock: self._pending = 0 if self._in_backpressure: self._in_backpressure = False message = self.exit_message_factory() if message: self.notice_writer(message) @property def pending(self) -> int: """Return the number of outstanding items currently tracked.""" with self._lock: return self._pending @property def in_backpressure(self) -> bool: """Return whether the monitor is currently signalling backpressure.""" with self._lock: return self._in_backpressure __all__ = ["BackpressureMonitor", "NoticeWriter"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FreddyE1982/mcp2term'

If you have feedback or need assistance with the MCP directory API, please join our Discord server