"""Authentication and rate limiting utilities for BigBugAI MCP server.
- API key check against BIGBUGAI_MCP_API_KEY
- Per-key rate limiting using `limits` moving window strategy
Environment variables:
- BIGBUGAI_MCP_API_KEY: required API key value
- MCP_RATE_LIMIT: rate limit string (e.g., "60/hour"), default "60/hour"
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from functools import wraps
from os import getenv
from typing import Any, TypeVar, cast
from limits.aio.storage import MemoryStorage
from limits.aio.strategies import MovingWindowRateLimiter
from limits.util import parse as parse_limit
class RateLimitError(Exception):
"""Raised when the rate limit is exceeded."""
class AuthConfigError(PermissionError):
"""Raised when server auth is not properly configured."""
class RateLimiter:
"""Async rate limiter using in-memory storage and moving window strategy.
This approximates a token bucket for our purposes.
"""
def __init__(self, rate: str) -> None:
self.rate_str = rate
# Parse a single rate string like "60/hour"
self.item = parse_limit(rate)
self.storage = MemoryStorage()
self.strategy = MovingWindowRateLimiter(self.storage)
async def hit(self, key: str) -> None:
allowed = await self.strategy.test(self.item, key)
if not allowed:
reset, remaining = await self.strategy.get_window_stats(self.item, key)
# reset is epoch seconds
raise RateLimitError(
f"Rate limit exceeded ({self.rate_str}) for key. Remaining={remaining}, "
f"resets_at={int(reset)}"
)
await self.strategy.hit(self.item, key)
# Global limiter instance configured from env; tests can reconfigure via function below
_rate_limiter: RateLimiter | None = None
def _get_rate_string() -> str:
return getenv("MCP_RATE_LIMIT", "60/hour")
def configure_rate_limiter_from_env() -> None:
global _rate_limiter
_rate_limiter = RateLimiter(_get_rate_string())
def get_rate_limiter() -> RateLimiter:
global _rate_limiter
if _rate_limiter is None:
configure_rate_limiter_from_env()
assert _rate_limiter is not None
return _rate_limiter
def require_key(key: str | None) -> None:
"""Validate the provided key against BIGBUGAI_MCP_API_KEY.
Raises PermissionError if invalid or server not configured.
"""
expected = getenv("BIGBUGAI_MCP_API_KEY")
if not expected:
raise AuthConfigError(
"BIGBUGAI_MCP_API_KEY is not set on the server; auth is not configured."
)
if not key or key != expected:
raise PermissionError("Invalid API key provided.")
F = TypeVar("F", bound=Callable[..., Awaitable[Any]])
def guarded(func: F) -> F:
"""Decorator to enforce server-level API key presence and rate limiting.
The API key is sourced from the environment variable BIGBUGAI_MCP_API_KEY.
Tool payloads no longer need to include an `api_key` field.
"""
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
# Ensure server is configured with an API key
expected = getenv("BIGBUGAI_MCP_API_KEY")
if not expected:
raise AuthConfigError(
"BIGBUGAI_MCP_API_KEY is not set on the server; auth is not configured."
)
# Rate limit keyed by the configured API key
limiter = get_rate_limiter()
await limiter.hit(expected)
return await func(*args, **kwargs)
return cast(F, wrapper)