Skip to main content
Glama
client.py9.05 kB
from __future__ import annotations import os import time from collections.abc import Mapping, Sequence from typing import Any import polars as pl from ...config import Config from ...core.models import ( QueryRequest, QueryResult, ResultMetadata, ResultPreview, SchemaMatch, TableColumn, TableDescription, TableSummary, ) from ...core.ports import CatalogExplorer, QueryExecutor from ...polars_utils import collect_preview from ..http_client import HttpClient, HttpClientConfig from . import extract, urls # Use wrapper to avoid FastMCP detecting overloads in extract.query() # Note: We still import extract for _determine_input_type and other non-overloaded functions from .query_wrapper import execute_query as _execute_dune_query class DuneAdapter(QueryExecutor, CatalogExplorer): """Thin façade around the vendored extract module.""" def __init__( self, config: Config, *, http_client: HttpClient | None = None, ): self.config = config http_config: HttpClientConfig = config.http self._http = http_client or HttpClient(http_config) # QueryExecutor ----------------------------------------------------------------- def execute(self, request: QueryRequest) -> QueryResult: self._ensure_api_key() start = time.time() q = request.query # Use native SHOW statements directly - they're faster than information_schema queries # See issue #10: https://github.com/Evan-Kim2028/spice-mcp/issues/10 # Removed rewrite to avoid performance issues with information_schema queries result = _execute_dune_query( query_or_execution=q, verbose=False, refresh=request.refresh, max_age=request.max_age, parameters=request.parameters, api_key=self._api_key(), performance=request.performance or "medium", poll=request.poll, timeout_seconds=request.timeout_seconds, limit=request.limit, offset=request.offset, sample_count=request.sample_count, sort_by=request.sort_by, columns=request.columns, cache_dir=self.config.cache.cache_dir, include_execution=request.include_execution, http_client=self._http, ) if request.include_execution: df, execution = result else: df = result execution = {} duration_ms = int((time.time() - start) * 1000) columns = list(df.columns) rowcount = int(len(df)) lazyframe = df.lazy() preview = _build_preview(lazyframe, columns, rowcount) del df # Use rewritten SQL for metadata determination too req_for_meta = request try: from dataclasses import replace if isinstance(q, str) and q != request.query: req_for_meta = replace(request, query=q) # type: ignore[arg-type] except Exception: pass meta = self.fetch_metadata(req_for_meta, execution=execution) meta.duration_ms = duration_ms return QueryResult(preview=preview, info=meta, lazyframe=lazyframe) def fetch_metadata( self, request: QueryRequest, *, execution: Mapping[str, Any] | None = None ) -> ResultMetadata: self._ensure_api_key() query_id, _, effective_params = extract._determine_input_type( # type: ignore[attr-defined] request.query, request.parameters, ) payload: dict[str, Any] = {} next_uri: str | None = None next_offset: int | None = None if query_id is not None: params: dict[str, Any] = {} if effective_params is not None: params["query_parameters"] = effective_params params.update( { "limit": request.limit, "offset": request.offset, "sample_count": request.sample_count, "sort_by": request.sort_by, "columns": list(request.columns) if request.columns else None, } ) if request.extras: try: params.update(request.extras) except Exception: pass url = urls.get_query_results_url(query_id, parameters=params, csv=False) from .user_agent import get_user_agent headers = { "X-Dune-API-Key": self._api_key(), "User-Agent": get_user_agent(), } try: resp = self._http.request("GET", url, headers=headers) data = resp.json() if isinstance(data, dict): payload = data.get("result", {}).get("metadata") or {} next_uri = data.get("next_uri") next_offset = data.get("next_offset") if "error" in data: payload = {**payload, "error": data["error"]} if "state" in data: payload = {**payload, "state": data["state"]} except Exception: payload = {} execution_meta: dict[str, Any] = {} if execution: execution_meta = dict(execution) return ResultMetadata( execution=execution_meta, duration_ms=0, metadata=payload or None, next_offset=next_offset, next_uri=next_uri, ) # CatalogExplorer --------------------------------------------------------------- def find_schemas(self, keyword: str) -> Sequence[SchemaMatch]: sql = f"SHOW SCHEMAS LIKE '%{keyword}%'" df = self._run_sql(sql) return [SchemaMatch(schema=row.get("Schema", "")) for row in df.to_dicts()] def list_tables(self, schema: str, limit: int | None = None) -> Sequence[TableSummary]: sql = f"SHOW TABLES FROM {schema}" df = self._run_sql(sql, limit=limit) return [ TableSummary(schema=schema, table=row.get("Table", row.get("name", ""))) for row in df.to_dicts() ] def describe_table(self, schema: str, table: str) -> TableDescription: fq = f"{schema}.{table}" try: df = self._run_sql(f"SHOW COLUMNS FROM {fq}") rows = df.to_dicts() columns = [ TableColumn( name=row.get("Column") or row.get("column_name") or "", dune_type=row.get("Type") or row.get("data_type"), polars_dtype=row.get("Type") or None, comment=row.get("Comment"), extra=row.get("Extra"), ) for row in rows ] return TableDescription(fully_qualified_name=fq, columns=columns) except Exception: df = self._run_sql(f"SELECT * FROM {fq} LIMIT 1") columns = [ TableColumn(name=name, polars_dtype=str(dtype)) for name, dtype in zip(df.columns, df.dtypes) ] return TableDescription(fully_qualified_name=fq, columns=columns) # Internal helpers -------------------------------------------------------------- def _run_sql(self, sql: str, *, limit: int | None = None) -> pl.DataFrame: self._ensure_api_key() # Use native SHOW statements directly - they're faster than information_schema queries # See issue #10: https://github.com/Evan-Kim2028/spice-mcp/issues/10 sql_eff = sql df = _execute_dune_query( query_or_execution=sql_eff, verbose=False, performance="medium", timeout_seconds=self.config.default_timeout_seconds, limit=limit, cache_dir=self.config.cache.cache_dir, http_client=self._http, ) if limit is not None and len(df) > limit: return df.head(limit) return df def _ensure_api_key(self) -> None: if not os.getenv("DUNE_API_KEY"): os.environ["DUNE_API_KEY"] = self._api_key() def _api_key(self) -> str: return self.config.dune.api_key def _build_preview(lf: pl.LazyFrame, columns: list[str], rowcount: int) -> ResultPreview: preview_rows = min(10, rowcount) data_preview = collect_preview(lf, preview_rows) return ResultPreview( rowcount=rowcount, columns=columns, data_preview=data_preview, ) def _maybe_rewrite_show_sql(sql: str) -> str | None: """DEPRECATED: This function is no longer used. Native SHOW statements are now used directly as they're faster than information_schema queries in Dune. See issue #10 for details. This function is kept for backward compatibility but is not called. """ # Function body kept for reference but not executed return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server