Skip to main content
Glama
helpers.py1.85 kB
import asyncio from functools import partial from typing import Any, Awaitable, Callable from aiohttp import ClientSession async def retry_async( f: Callable[[], Awaitable[None]], name: str, attempts: int, sleep_s: float ): print(f"trying {name}...") for attempt in range(attempts): try: await f() print(f"{name} succeeded") return except Exception as ex: print( f"{name} failed: will wait {sleep_s} seconds, then try again ({attempt + 1}/{attempts}) (error: {ex})" ) await asyncio.sleep(sleep_s) raise Exception(f"{name} failed: exhausted all {attempts} retry attempts") async def healthcheck( method: str, url: str, status_predicate: Callable[[int], bool], attempts: int, sleep_s: float, ): return await retry_async( f=partial(_single_healthcheck, method, url, status_predicate), name=f"{method}:::{url} healthcheck", attempts=attempts, sleep_s=sleep_s, ) async def _single_healthcheck( method: str, url: str, status_predicate: Callable[[int], bool], ): async with ClientSession() as session: async with session.request(method=method, url=url) as resp: status = resp.status resp_body = await resp.text() print(f"healthcheck response: {status} {resp_body}") if status_predicate(status): return raise Exception("healthcheck failed ") def parse_int(text: str) -> int: return int(text) def get_key_path(obj: dict[str, Any], key_path: str) -> tuple[Any, bool]: key_path_parts = key_path.split(".") value = obj for key in key_path_parts: if key not in value: return None, False value = value[key] return value, True

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server