Skip to main content
Glama
httpx_util.py2.05 kB
from typing import Tuple import httpx from httpx import RequestError from tenacity import retry, stop_after_attempt, wait_fixed class FetchRequestError(RequestError): ... @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) async def request(method: str, url: str, cookies: dict = None, headers: dict = None, json_data: dict = None, timeout: int = 30, **kwargs) -> Tuple[int, dict]: async with httpx.AsyncClient() as client: try: response = await client.request( method, url, cookies=cookies, headers=headers, json=json_data, timeout=timeout, **kwargs ) return response.status_code, response.json() except Exception as e: raise FetchRequestError(f"{e}, {response.text} -> {response.status_code}") @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) async def request_img(method: str, url: str, timeout: int = 30, **kwargs) -> Tuple[str, int, bytes]: async with httpx.AsyncClient() as client: try: response = await client.request( method, url, timeout=timeout, **kwargs ) return url, response.status_code, response.content except Exception as e: raise FetchRequestError(f"{e}, {response.text} -> {response.status_code}") async def request_test(method: str, url: str, timeout: int = 10, **kwargs) -> Tuple[int, dict]: async with httpx.AsyncClient() as client: try: response = await client.request(method, url, timeout=timeout, **kwargs) return response.status_code, response.json() except Exception as e: raise FetchRequestError(f"{e}, {response.text} -> {response.status_code}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cyanty/Arcs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server