Skip to main content
Glama
extract.py32.9 kB
from __future__ import annotations import io import os import time from typing import TYPE_CHECKING, overload from ..http_client import HttpClient from . import cache as _cache from . import urls as _urls from .transport import ( current_http_client, use_http_client, ) from .transport import ( get as _transport_get, ) from .transport import ( post as _transport_post, ) from .types import ( ExecuteKwargs, Execution, OutputKwargs, Performance, PollKwargs, Query, RetrievalKwargs, ) from .typing_utils import resolve_raw_sql_template_id # Keep local helper implementations for compatibility with tests ADAPTER_VERSION = "spice-mcp-adapter" # Runtime-configurable HTTP timeouts (helps avoid host-level timeouts) _GET_TIMEOUT: float = float(os.getenv("SPICE_DUNE_GET_TIMEOUT", os.getenv("SPICE_HTTP_TIMEOUT", "30"))) _POST_TIMEOUT: float = float(os.getenv("SPICE_DUNE_POST_TIMEOUT", os.getenv("SPICE_HTTP_TIMEOUT", "30"))) if TYPE_CHECKING: from collections.abc import Mapping, Sequence from typing import Any, Literal import polars as pl # --------------------------------------------------------------------------- # Internal helpers used by adapter code and tests def _is_sql(query: int | str) -> bool: if isinstance(query, int): return False if isinstance(query, str): if query.startswith('https://') or query.startswith('api.dune.com') or query.startswith('dune.com/queries'): return False try: int(query) return False except ValueError: return True raise Exception('invalid format for query: ' + str(type(query))) def _determine_input_type( query_or_execution: Query | Execution, parameters: Mapping[str, Any] | None = None, ) -> tuple[int | None, Execution | None, Mapping[str, Any] | None]: if isinstance(query_or_execution, str) and query_or_execution == '': raise Exception('empty query') if isinstance(query_or_execution, int | str): if _is_sql(query_or_execution): query_id = resolve_raw_sql_template_id() execution = None new_params = dict(parameters or {}) new_params.update({'query': query_or_execution}) parameters = new_params else: query_id = _urls.get_query_id(query_or_execution) execution = None elif isinstance(query_or_execution, dict) and 'execution_id' in query_or_execution: query_id = None execution = query_or_execution # type: ignore[assignment] else: raise Exception('input must be a query id, query url, or execution id') return query_id, execution, parameters def _http_get(url: str, *, headers: Mapping[str, str], timeout: float): import requests return requests.get(url, headers=headers, timeout=timeout) @overload def query( query_or_execution: Query | Execution, *, verbose: bool = True, refresh: bool = False, max_age: float | None = None, parameters: Mapping[str, Any] | None = None, api_key: str | None = None, performance: Performance = 'medium', poll: Literal[False], poll_interval: float = 1.0, timeout_seconds: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: Sequence[str] | None = None, extras: Mapping[str, Any] | None = None, types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, all_types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, cache: bool = True, cache_dir: str | None = None, save_to_cache: bool = True, load_from_cache: bool = True, include_execution: bool = False, ) -> Execution: ... @overload def query( query_or_execution: Query | Execution, *, verbose: bool = True, refresh: bool = False, max_age: float | None = None, parameters: Mapping[str, Any] | None = None, api_key: str | None = None, performance: Performance = 'medium', poll: Literal[True] = True, poll_interval: float = 1.0, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: Sequence[str] | None = None, extras: Mapping[str, Any] | None = None, types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, all_types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, cache: bool = True, cache_dir: str | None = None, save_to_cache: bool = True, load_from_cache: bool = True, include_execution: Literal[False] = False, ) -> pl.DataFrame: ... @overload def query( query_or_execution: Query | Execution, *, verbose: bool = True, refresh: bool = False, max_age: float | None = None, parameters: Mapping[str, Any] | None = None, api_key: str | None = None, performance: Performance = 'medium', poll: Literal[True] = True, poll_interval: float = 1.0, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: Sequence[str] | None = None, extras: Mapping[str, Any] | None = None, types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, all_types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, cache: bool = True, cache_dir: str | None = None, save_to_cache: bool = True, load_from_cache: bool = True, include_execution: Literal[True], ) -> tuple[pl.DataFrame, Execution]: ... def query( query_or_execution: Query | Execution, *, verbose: bool = True, refresh: bool = False, max_age: float | None = None, parameters: Mapping[str, Any] | None = None, api_key: str | None = None, performance: Performance = 'medium', poll: bool = True, poll_interval: float = 1.0, timeout_seconds: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: Sequence[str] | None = None, extras: Mapping[str, Any] | None = None, types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, all_types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, cache: bool = True, cache_dir: str | None = None, save_to_cache: bool = True, load_from_cache: bool = True, include_execution: bool = False, http_client: HttpClient | None = None, ) -> pl.DataFrame | Execution | tuple[pl.DataFrame, Execution]: """get results of query as dataframe # Parameters - query: query or execution to retrieve results of - verbose: whether to print verbose info - refresh: trigger a new execution instead of using most recent execution - max_age: max age of last execution in seconds, or trigger a new execution - parameters: dict of query parameters - api_key: dune api key, otherwise use DUNE_API_KEY env var - performance: performance level - poll: wait for result as DataFrame, or just return Execution handle - poll_interval: polling interval in seconds - limit: number of rows to query in result - offset: row number to start returning results from - sample_count: number of random samples from query to return - sort_by: an ORDER BY clause to sort data by - columns: columns to retrieve, by default retrieve all columns - extras: extra parameters used for fetching execution result - examples: ignore_max_datapoints_per_request, allow_partial_results - types: column types to use in output polars dataframe - all_types: like types, but must strictly include all columns in data - cache: whether to use cache for saving or loading - cache_dir: directory to use for cached data (create tmp_dir if None) - save_to_cache: whether to save to cache, set false to load only - load_from_cache: whether to load from cache, set false to save only - include_execution: return Execution metadata alongside query result """ with use_http_client(http_client): # Check if this is raw SQL and should use Execute SQL endpoint raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") is_raw_sql = _is_sql(query_or_execution) if isinstance(query_or_execution, str) else False if is_raw_sql and raw_sql_engine == "execution_sql": # Use Execute SQL endpoint for raw SQL execution = _execute_raw_sql( query_or_execution, # type: ignore[arg-type] parameters=parameters, timeout_seconds=timeout_seconds, api_key=api_key, verbose=verbose, ) query_id = None else: # determine whether target is a query or an execution query_id, execution, parameters = _determine_input_type( query_or_execution, parameters, ) # gather arguments execute_kwargs: ExecuteKwargs = { 'query_id': query_id, 'api_key': api_key, 'parameters': parameters, 'performance': performance, } poll_kwargs: PollKwargs = { 'poll_interval': poll_interval, 'api_key': api_key, 'verbose': verbose, 'timeout_seconds': timeout_seconds, } result_kwargs: RetrievalKwargs = { 'limit': limit, 'offset': offset, 'sample_count': sample_count, 'sort_by': sort_by, 'columns': columns, 'extras': extras, 'types': types, 'all_types': all_types, 'verbose': verbose, } output_kwargs: OutputKwargs = { 'execute_kwargs': execute_kwargs, 'result_kwargs': result_kwargs, 'cache': cache, 'save_to_cache': save_to_cache, 'cache_dir': cache_dir, 'include_execution': include_execution, } # execute or retrieve query if query_id: # Check if this is a parameterized query (raw SQL via template or parameterized query) # For parameterized queries, results don't exist until execution, so skip GET attempt is_parameterized = ( parameters is not None and len(parameters) > 0 and ('query' in parameters or any(k != 'query' for k in parameters)) ) if cache and load_from_cache and not refresh: cache_result, cache_execution = _cache.load_from_cache( execute_kwargs, result_kwargs, output_kwargs ) if cache_result is not None: return cache_result if execution is None and cache_execution is not None: execution = cache_execution if max_age is not None and not refresh: age = get_query_latest_age(**execute_kwargs, verbose=verbose) # type: ignore if age is None or age > max_age: refresh = True # Skip GET results attempt for parameterized queries - they need execution first if not refresh and not is_parameterized: df = get_results(**execute_kwargs, **result_kwargs) if df is not None: return process_result(df, execution, **output_kwargs) try: execution = execute_query(**execute_kwargs, verbose=verbose) except Exception as e: # Re-raise with more context about the failure if verbose: print(f'execute_query failed for query_id={query_id}, parameters={parameters}') raise Exception(f'failed to execute query {query_id}: {e}') from e else: # query_id is None when using Execute SQL endpoint for raw SQL # execution should already be set from _execute_raw_sql() above if verbose and execution is None: print(f'query_id is None, execution={execution}, query_or_execution={query_or_execution}') # check execution status if execution is None: error_detail = f'query_id={query_id}, query_type={type(query_or_execution).__name__}' if isinstance(query_or_execution, str): error_detail += f', query_preview={query_or_execution[:100]}' raise Exception(f'could not determine execution ({error_detail})') if poll: poll_execution(execution, **poll_kwargs) df = get_results(execution, api_key, **result_kwargs) if df is not None: return process_result(df, execution, **output_kwargs) else: raise Exception('no successful execution for query') else: return execution if TYPE_CHECKING: @overload def _process_result( df: pl.DataFrame, execution: Execution | None, execute_kwargs: ExecuteKwargs, result_kwargs: RetrievalKwargs, cache: bool, save_to_cache: bool, cache_dir: str | None, include_execution: Literal[False], ) -> pl.DataFrame: ... @overload def _process_result( df: pl.DataFrame, execution: Execution | None, execute_kwargs: ExecuteKwargs, result_kwargs: RetrievalKwargs, cache: bool, save_to_cache: bool, cache_dir: str | None, include_execution: Literal[True], ) -> tuple[pl.DataFrame, Execution]: ... @overload def _process_result( df: pl.DataFrame, execution: Execution | None, execute_kwargs: ExecuteKwargs, result_kwargs: RetrievalKwargs, cache: bool, save_to_cache: bool, cache_dir: str | None, include_execution: bool, ) -> pl.DataFrame | tuple[pl.DataFrame, Execution]: ... def _process_result( df: pl.DataFrame, execution: Execution | None, execute_kwargs: ExecuteKwargs, result_kwargs: RetrievalKwargs, cache: bool, save_to_cache: bool, cache_dir: str | None, include_execution: bool, ) -> pl.DataFrame | tuple[pl.DataFrame, Execution]: if cache and save_to_cache and execute_kwargs['query_id'] is not None: if execution is None: execution = get_latest_execution(execute_kwargs) if execution is None: raise Exception('could not get execution') _cache.save_to_cache( df, execution, execute_kwargs, result_kwargs, cache_dir ) if include_execution: if execution is None: execution = get_latest_execution(execute_kwargs) if execution is None: raise Exception('could not get execution') return df, execution else: return df def _get_query_latest_age( query_id: int, *, verbose: bool = True, parameters: Mapping[str, Any] | None = None, performance: Performance = 'medium', api_key: str | None = None, ) -> float | None: import datetime import json # process inputs if api_key is None: api_key = _urls.get_api_key() headers = {'X-Dune-API-Key': api_key, 'User-Agent': get_user_agent()} data = {} if parameters is not None: data['query_parameters'] = parameters url = _urls.get_query_results_url(query_id, parameters=data, csv=False) # print summary if verbose: print('checking age of last execution, query_id = ' + str(query_id)) response = _transport_get(url, headers=headers, timeout=15.0) # check if result is error result = response.json() try: if 'error' in result: if ( result['error'] == 'not found: No execution found for the latest version of the given query' ): if verbose: print( 'no age for query, because no previous executions exist' ) return None raise Exception(result['error']) except json.JSONDecodeError: pass # process result if 'execution_started_at' in result: now = datetime.datetime.now(datetime.UTC).timestamp() started = _parse_timestamp(result['execution_started_at']) age = now - started if verbose: print('latest result age:', age) return age else: if verbose: print('no age for query, because no previous executions exist') return None def _parse_timestamp(timestamp: str) -> int: import datetime # reduce number of decimals in if len(timestamp) > 27 and timestamp[-1] == 'Z': timestamp = timestamp[:26] + 'Z' timestamp_float = ( datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') .replace(tzinfo=datetime.UTC) .timestamp() ) return int(timestamp_float) def _execute_raw_sql( query_sql: str, *, parameters: Mapping[str, Any] | None = None, timeout_seconds: float | None = None, api_key: str | None = None, verbose: bool = True, ) -> Execution: """Execute raw SQL via POST /execution/sql endpoint.""" url = _urls.url_templates['execution_sql'] if api_key is None: api_key = _urls.get_api_key() headers = {'X-Dune-API-Key': api_key, 'User-Agent': get_user_agent()} data: dict[str, Any] = {'query_sql': query_sql} if parameters is not None: data['parameters'] = parameters if timeout_seconds is not None: data['timeout_seconds'] = timeout_seconds if verbose: print(f'executing raw SQL via /execution/sql, query_preview={query_sql[:100]}...') response = _transport_post(url, headers=headers, json=data, timeout=_POST_TIMEOUT) try: result: Mapping[str, Any] = response.json() except Exception as e: if verbose: print(f'failed to parse response JSON: {e}') print(f'response status: {response.status_code}') print(f'response text: {response.text[:500]}') raise Exception(f'failed to parse response: {e}') from e if 'execution_id' not in result: error_msg = result.get('error', f'response missing execution_id: {result}') if verbose: print(f'execution failed: {error_msg}') print(f'response status: {response.status_code}') print(f'full response: {result}') raise Exception(error_msg) execution_id = result['execution_id'] return {'execution_id': execution_id, 'timestamp': None} def _execute( query_id: int | str, *, parameters: Mapping[str, Any] | None = None, performance: Performance = 'medium', api_key: str | None = None, verbose: bool = True, ) -> Execution: # process inputs url = _urls.get_query_execute_url(query_id) if api_key is None: api_key = _urls.get_api_key() headers = {'X-Dune-API-Key': api_key, 'User-Agent': get_user_agent()} data = {} if parameters is not None: data['query_parameters'] = parameters data['performance'] = performance # print summary if verbose: print('executing query, query_id = ' + str(query_id)) # perform request response = _transport_post(url, headers=headers, json=data, timeout=_POST_TIMEOUT) # Parse response with better error handling try: result: Mapping[str, Any] = response.json() except Exception as e: if verbose: print(f'failed to parse response JSON: {e}') print(f'response status: {response.status_code}') print(f'response text: {response.text[:500]}') raise Exception(f'failed to parse response: {e}') from e # check for errors if 'execution_id' not in result: error_msg = result.get('error', f'response missing execution_id: {result}') if verbose: print(f'execution failed: {error_msg}') print(f'response status: {response.status_code}') print(f'full response: {result}') raise Exception(error_msg) # process result execution_id = result['execution_id'] return {'execution_id': execution_id, 'timestamp': None} def _get_results( execution: Execution | None = None, api_key: str | None = None, *, query_id: int | None = None, parameters: Mapping[str, Any] | None = None, performance: Performance = 'medium', limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: Sequence[str] | None = None, extras: Mapping[str, Any] | None = None, types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, all_types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, verbose: bool = True, ) -> pl.DataFrame | None: import random import time as _time import polars as pl # process inputs similar to upstream if api_key is None: api_key = _urls.get_api_key() headers = {'X-Dune-API-Key': api_key, 'User-Agent': get_user_agent()} params: dict[str, Any] = { 'limit': limit, 'offset': offset, 'sample_count': sample_count, 'sort_by': sort_by, 'columns': columns, } if extras is not None: params.update(extras) if performance is not None: params['performance'] = performance if parameters is not None: params['query_parameters'] = parameters if query_id is not None: url = _urls.get_query_results_url(query_id, parameters=params) elif execution is not None: url = _urls.get_execution_results_url(execution['execution_id'], params) else: raise Exception('must specify query_id or execution') # print summary if verbose: if query_id is not None: print('getting results, query_id = ' + str(query_id)) elif execution is not None: print('getting results, execution_id = ' + str(execution['execution_id'])) # perform request # GET with simple retry/backoff for 429/502 def _get_with_retries(u: str): client = current_http_client() if client is not None: return client.request("GET", u, headers=headers, timeout=_GET_TIMEOUT) attempts = 0 backoff = 0.5 while True: resp = _transport_get(u, headers=headers, timeout=_GET_TIMEOUT) if resp.status_code in (429, 502): attempts += 1 if attempts >= 3: return resp sleep_for = backoff * random.uniform(1.5, 2.5) _time.sleep(sleep_for) backoff = min(5.0, backoff * 2) continue return resp response = _get_with_retries(url) if response.status_code == 404: return None result = response.text # process result df = _process_raw_table(result, types=types, all_types=all_types) # support pagination when using limit response_headers = response.headers if limit is not None: n_rows = len(df) pages = [] while 'x-dune-next-uri' in response_headers and n_rows < limit: if verbose: offset_hdr = response.headers.get('x-dune-next-offset', 'unknown') print('gathering additional page, offset = ' + str(offset_hdr)) next_url = response_headers['x-dune-next-uri'] response = _get_with_retries(next_url) result = response.text response_headers = response.headers page = _process_raw_table(result, types=types, all_types=all_types) n_rows += len(page) pages.append(page) df = pl.concat([df, *pages]).limit(limit) return df def _process_raw_table( raw_csv: str, types: Sequence[type[pl.DataType] | None] | Mapping[str, type[pl.DataType] | None] | None, all_types: Sequence[type[pl.DataType]] | Mapping[str, type[pl.DataType]] | None = None, ) -> pl.DataFrame: import polars as pl # convert from map to sequence first_line = raw_csv.split('\n', maxsplit=1)[0] column_order = first_line.split(',') # parse data as csv df = pl.read_csv( io.StringIO(raw_csv), infer_schema_length=len(raw_csv), null_values='<nil>', truncate_ragged_lines=True, schema_overrides=[pl.String for column in column_order], ) # check if using all_types if all_types is not None and types is not None: raise Exception('cannot specify both types and all_types') elif all_types is not None: types = all_types # cast types new_types = [] for c, column in enumerate(df.columns): new_type = None if types is not None: if isinstance(types, list): if len(types) > c and types[c] is not None: new_type = types[c] elif isinstance(types, dict): if column in types and types[column] is not None: new_type = types[column] else: raise Exception('invalid format for types') if new_type is None: new_type = infer_type(df[column]) if new_type == pl.Datetime or isinstance(new_type, pl.Datetime): time_format = '%Y-%m-%d %H:%M:%S%.3f %Z' df = df.with_columns(pl.col(column).str.to_datetime(time_format)) new_type = None new_types.append(new_type) # check that all types were used if isinstance(types, dict): missing_columns = [ name for name in types.keys() if name not in df.columns ] if len(missing_columns) > 0: raise Exception( 'types specified for missing columns: ' + str(missing_columns) ) if all_types is not None: missing_columns = [name for name in df.columns if name not in all_types] if len(missing_columns) > 0: raise Exception( 'types not specified for columns: ' + str(missing_columns) ) new_columns = [] for column, type in zip(df.columns, new_types): if type is not None: if type == pl.Boolean: new_column = pl.col(column) == 'true' else: new_column = pl.col(column).cast(type) new_columns.append(new_column) df = df.with_columns(*new_columns) return df def infer_type(s: pl.Series) -> pl.DataType: import re import polars as pl # Heuristic: detect common UTC timestamp format used by Dune exports try: non_null = [v for v in s.to_list() if v is not None and v != '<nil>'] if non_null and all( isinstance(v, str) and re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} UTC$", v) for v in non_null ): return pl.Datetime except Exception: pass try: as_str = pl.DataFrame(s).write_csv(None) return pl.read_csv(io.StringIO(as_str))[s.name].dtype except Exception: return pl.String() def _poll_execution( execution: Execution, *, api_key: str | None, poll_interval: float, verbose: bool, timeout_seconds: float | None, ) -> None: import random import time # process inputs url = _urls.get_execution_status_url(execution['execution_id']) execution_id = execution['execution_id'] if api_key is None: api_key = _urls.get_api_key() headers = {'X-Dune-API-Key': api_key, 'User-Agent': get_user_agent()} # print summary t_start = time.time() # poll until completion sleep_amount = poll_interval while True: t_poll = time.time() # print summary if verbose: print( 'waiting for results, execution_id = ' + str(execution['execution_id']) + ', t = ' + '%.02f' % (t_poll - t_start) ) # poll response = _http_get(url, headers=headers, timeout=_GET_TIMEOUT) result = response.json() if ( 'is_execution_finished' not in result and response.status_code == 429 ): sleep_amount = sleep_amount * random.uniform(1, 2) time.sleep(sleep_amount) continue if result['is_execution_finished']: if result['state'] == 'QUERY_STATE_FAILED': # Enrich error message with state and any error details if present err_detail = '' try: if 'error' in result and result['error']: err_detail = f", error={result['error']}" except Exception: pass raise Exception( f"QUERY FAILED execution_id={execution_id} state={result.get('state')}{err_detail}" ) execution['timestamp'] = _parse_timestamp( result['execution_started_at'] ) break # timeout check if timeout_seconds is not None and (t_poll - t_start) > timeout_seconds: raise TimeoutError( f'query polling timed out after {timeout_seconds} seconds' ) # wait until polling interval t_wait = time.time() - t_poll if t_wait < poll_interval: time.sleep(poll_interval - t_wait) # check for errors if result['state'] == 'QUERY_STATE_FAILED': err_detail = '' try: if 'error' in result and result['error']: err_detail = f", error={result['error']}" except Exception: pass raise Exception( f"QUERY FAILED execution_id={execution_id} state={result.get('state')}{err_detail}" ) def get_latest_execution( execute_kwargs: ExecuteKwargs, *, allow_unfinished: bool = False, ) -> Execution | None: import json import random query_id = execute_kwargs['query_id'] api_key = execute_kwargs['api_key'] parameters = execute_kwargs['parameters'] if query_id is None: raise Exception('query_id required for get_latest_execution') # process inputs if api_key is None: api_key = _urls.get_api_key() headers = {'X-Dune-API-Key': api_key, 'User-Agent': get_user_agent()} data: dict[str, Any] = {} if parameters is not None: data['query_parameters'] = parameters data['limit'] = 0 url = _urls.get_query_results_url(query_id, parameters=data, csv=False) sleep_amount = 1.0 while True: # perform request response = _http_get(url, headers=headers, timeout=15.0) if response.status_code in (429, 502): sleep_amount = sleep_amount * random.uniform(1, 2) time.sleep(sleep_amount) continue # check if result is error result = response.json() try: if 'error' in result: if ( result['error'] == 'not found: No execution found for the latest version of the given query' ): return None if response.status_code == 429: sleep_amount = sleep_amount * random.uniform(1, 2) time.sleep(sleep_amount) raise Exception(result['error']) except json.JSONDecodeError: pass break # process result if not result['is_execution_finished'] and not allow_unfinished: return None execution: Execution = {'execution_id': result['execution_id']} if 'execution_started_at' in result: execution['timestamp'] = int( _parse_timestamp(result['execution_started_at']) ) return execution def get_user_agent() -> str: # Identify as spice-mcp vendored spice client return 'spice-mcp/' + ADAPTER_VERSION # --------------------------------------------------------------------------- # Public aliases expected by adapter/tests (non-underscored names) determine_input_type = _determine_input_type get_query_latest_age = _get_query_latest_age execute_query = _execute get_results = _get_results process_result = _process_result poll_execution = _poll_execution

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server