Skip to main content
Glama
utils.py1.56 kB
# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under both the MIT license found in the # LICENSE-MIT file in the root directory of this source tree and the Apache # License, Version 2.0 found in the LICENSE-APACHE file in the root directory # of this source tree. # pyre-strict import asyncio import json import shlex from typing import List from .timeouts import DEFAULT_OPERATION_TIMEOUT async def _read_until_valid_json(stream: asyncio.StreamReader) -> object: buffer = b"" while True: data = await stream.readuntil(b"}") buffer += data try: return json.loads(buffer.decode()) except json.JSONDecodeError: pass raise RuntimeError( "Should not be reachable since either the valid JSON is there or `asyncio.IncompleteReadError` is raised." ) async def execute_generic_text_producing_command( name: str, cmd: List[str], timeout: float = DEFAULT_OPERATION_TIMEOUT ) -> str: process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout) if process.returncode != 0: raise RuntimeError( f"Failed to {name} with command:\n```\n{shlex.join(cmd)}\n```\nstdout:\n```\n{stdout.decode(errors='ignore')}\n```\nstdout:\n```\n{stderr.decode(errors='ignore')}\n```\n" ) return stdout.decode()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server