Skip to main content
Glama
retry.py4.22 kB
"""Retry utilities with exponential backoff. This module provides decorators and utilities for retrying failed operations with configurable backoff strategies. """ import asyncio from functools import wraps from typing import TypeVar, Callable, Any import logging logger = logging.getLogger(__name__) T = TypeVar("T") def async_retry( max_attempts: int = 3, backoff_factor: float = 2.0, initial_delay: float = 1.0, max_delay: float = 60.0, exceptions: tuple[type[Exception], ...] = (Exception,), ): """Decorator for async functions with exponential backoff retry. Args: max_attempts: Maximum number of attempts (including initial) backoff_factor: Multiplier for exponential backoff initial_delay: Initial delay in seconds max_delay: Maximum delay between retries exceptions: Tuple of exception types to catch and retry Returns: Decorated function with retry logic Example: @async_retry(max_attempts=3, exceptions=(ConnectionError,)) async def fetch_data(): return await api.get("/data") """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> T: last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except exceptions as e: last_exception = e # If this was the last attempt, re-raise if attempt == max_attempts - 1: logger.error( f"{func.__name__} failed after {max_attempts} attempts: {e}" ) raise # Calculate delay with exponential backoff delay = min(initial_delay * (backoff_factor**attempt), max_delay) logger.warning( f"{func.__name__} failed (attempt {attempt + 1}/{max_attempts}): " f"{type(e).__name__}: {e}. Retrying in {delay:.1f}s..." ) await asyncio.sleep(delay) # This should never be reached, but just in case if last_exception: raise last_exception return wrapper return decorator class RetryContext: """Context manager for retry operations with more control. This provides an alternative to the decorator when you need more fine-grained control over retry logic. Example: retry = RetryContext(max_attempts=3) async with retry: result = await some_operation() """ def __init__( self, max_attempts: int = 3, backoff_factor: float = 2.0, initial_delay: float = 1.0, max_delay: float = 60.0, exceptions: tuple[type[Exception], ...] = (Exception,), ): self.max_attempts = max_attempts self.backoff_factor = backoff_factor self.initial_delay = initial_delay self.max_delay = max_delay self.exceptions = exceptions self.attempt = 0 self.last_exception: Exception | None = None async def __aenter__(self): """Enter retry context.""" self.attempt += 1 return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit retry context and handle retries.""" if exc_type is None: return True if not issubclass(exc_type, self.exceptions): return False self.last_exception = exc_val if self.attempt >= self.max_attempts: logger.error( f"Operation failed after {self.max_attempts} attempts: {exc_val}" ) return False delay = min( self.initial_delay * (self.backoff_factor ** (self.attempt - 1)), self.max_delay, ) logger.warning( f"Attempt {self.attempt}/{self.max_attempts} failed: " f"{exc_type.__name__}: {exc_val}. Retrying in {delay:.1f}s..." ) await asyncio.sleep(delay) return True

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server