Skip to main content
Glama

FastAPI MCP Server

by arrehman3
_utils.py2.81 kB
from __future__ import annotations import functools import sys from collections.abc import Awaitable, Generator from contextlib import AbstractAsyncContextManager, contextmanager from typing import Any, Callable, Generic, Protocol, TypeVar, overload from starlette.types import Scope if sys.version_info >= (3, 13): # pragma: no cover from inspect import iscoroutinefunction from typing import TypeIs else: # pragma: no cover from asyncio import iscoroutinefunction from typing_extensions import TypeIs has_exceptiongroups = True if sys.version_info < (3, 11): # pragma: no cover try: from exceptiongroup import BaseExceptionGroup # type: ignore[unused-ignore,import-not-found] except ImportError: has_exceptiongroups = False T = TypeVar("T") AwaitableCallable = Callable[..., Awaitable[T]] @overload def is_async_callable(obj: AwaitableCallable[T]) -> TypeIs[AwaitableCallable[T]]: ... @overload def is_async_callable(obj: Any) -> TypeIs[AwaitableCallable[Any]]: ... def is_async_callable(obj: Any) -> Any: while isinstance(obj, functools.partial): obj = obj.func return iscoroutinefunction(obj) or (callable(obj) and iscoroutinefunction(obj.__call__)) T_co = TypeVar("T_co", covariant=True) class AwaitableOrContextManager(Awaitable[T_co], AbstractAsyncContextManager[T_co], Protocol[T_co]): ... class SupportsAsyncClose(Protocol): async def close(self) -> None: ... # pragma: no cover SupportsAsyncCloseType = TypeVar("SupportsAsyncCloseType", bound=SupportsAsyncClose, covariant=False) class AwaitableOrContextManagerWrapper(Generic[SupportsAsyncCloseType]): __slots__ = ("aw", "entered") def __init__(self, aw: Awaitable[SupportsAsyncCloseType]) -> None: self.aw = aw def __await__(self) -> Generator[Any, None, SupportsAsyncCloseType]: return self.aw.__await__() async def __aenter__(self) -> SupportsAsyncCloseType: self.entered = await self.aw return self.entered async def __aexit__(self, *args: Any) -> None | bool: await self.entered.close() return None @contextmanager def collapse_excgroups() -> Generator[None, None, None]: try: yield except BaseException as exc: if has_exceptiongroups: # pragma: no cover while isinstance(exc, BaseExceptionGroup) and len(exc.exceptions) == 1: exc = exc.exceptions[0] raise exc def get_route_path(scope: Scope) -> str: path: str = scope["path"] root_path = scope.get("root_path", "") if not root_path: return path if not path.startswith(root_path): return path if path == root_path: return "" if path[len(root_path)] == "/": return path[len(root_path) :] return path

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arrehman3/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server