Skip to main content
Glama

dune_query

Execute Dune Analytics queries to analyze blockchain data with agent-optimized previews, parameter support, and flexible formatting options.

Instructions

Execute Dune queries and return agent-optimized preview.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
parametersNo
refreshNo
max_ageNo
limitNo
offsetNo
sample_countNo
sort_byNo
columnsNo
formatNopreview
extrasNo
timeout_secondsNo

Implementation Reference

  • Registration of the 'dune_query' tool using FastMCP @app.tool decorator.
    @app.tool( name="dune_query", title="Run Dune Query", description="Execute Dune queries and return agent-optimized preview.", tags={"dune", "query"}, )
  • Thin wrapper handler for dune_query tool, normalizes inputs and delegates to internal impl.
    def dune_query( query: str, parameters: dict[str, Any] | None = None, refresh: bool = False, max_age: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: list[str] | None = None, format: Literal["preview", "raw", "metadata", "poll"] = "preview", extras: dict[str, Any] | None = None, timeout_seconds: float | None = None, ) -> dict[str, Any]: """Execute Dune queries (by ID, URL, or raw SQL) and return agent-optimized preview. ⚠️ IMPORTANT: ALWAYS use dune_discover FIRST to find verified table names. Do not guess table names or query information_schema directly. The query parameter accepts: - Query IDs (e.g., "123456") - Query URLs (e.g., "https://dune.com/queries/123456") - Raw SQL using tables discovered via dune_discover For Spellbook models, use the 'dune_table' field returned by dune_discover. Example: dune_discover(keyword="walrus") → returns dune_table="sui_walrus.base_table" Then use: dune_query(query="SELECT * FROM sui_walrus.base_table LIMIT 10") This wrapper ensures FastMCP doesn't detect overloads in imported functions. """ # Always ensure parameters is explicitly passed (even if None) to avoid FastMCP # overload detection when the keyword is omitted return _dune_query_impl( query=query, parameters=parameters, refresh=refresh, max_age=max_age, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, format=format, extras=extras, timeout_seconds=timeout_seconds, )
  • Internal impl that normalizes params and calls ExecuteQueryTool.execute().
    def _dune_query_impl( query: str, parameters: dict[str, Any] | None = None, refresh: bool = False, max_age: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: list[str] | None = None, format: Literal["preview", "raw", "metadata", "poll"] = "preview", extras: dict[str, Any] | None = None, timeout_seconds: float | None = None, ) -> dict[str, Any]: """Internal implementation of dune_query to avoid FastMCP overload detection.""" _ensure_initialized() assert EXECUTE_QUERY_TOOL is not None # Normalize parameters: handle case where MCP client passes JSON string # This can happen if FastMCP's schema generation doesn't match client expectations normalized_parameters = parameters if isinstance(parameters, str): try: import json normalized_parameters = json.loads(parameters) except (json.JSONDecodeError, TypeError): return error_response( ValueError(f"parameters must be a dict or JSON string, got {type(parameters).__name__}"), context={ "tool": "dune_query", "query": query, "parameters_type": type(parameters).__name__, } ) # Normalize extras similarly normalized_extras = extras if isinstance(extras, str): try: import json normalized_extras = json.loads(extras) except (json.JSONDecodeError, TypeError): normalized_extras = None try: # Execute query synchronously return EXECUTE_QUERY_TOOL.execute( query=query, parameters=normalized_parameters, refresh=refresh, max_age=max_age, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, format=format, extras=normalized_extras, timeout_seconds=timeout_seconds, ) except Exception as e: return error_response(e, context={ "tool": "dune_query", "query": query, "limit": limit, "offset": offset, })
  • Core execution handler in ExecuteQueryTool.execute(): handles formats, executes via QueryService, logs to history, formats output.
    def execute( self, *, query: str, parameters: dict[str, Any] | None = None, refresh: bool = False, max_age: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: list[str] | None = None, performance: str = "medium", format: str = "preview", extras: dict[str, Any] | None = None, timeout_seconds: float | None = None, ) -> dict[str, Any]: t0 = time.time() try: # Use native SHOW statements directly - they're faster than information_schema queries # See issue #10: https://github.com/Evan-Kim2028/spice-mcp/issues/10 # Removed rewrite to avoid performance issues with information_schema queries q_use = query # Poll-only: return execution handle without fetching results if format == "poll": exec_obj = execute_dune_query( q_use, parameters=parameters, api_key=self.config.dune.api_key, performance=performance, poll=False, http_client=self._http, ) execution_id = exec_obj.get("execution_id", "unknown") q_type = _categorize_query(q_use) q_sha = self._persist_query_sql(q_use, q_type) # Determine IDs for history q_id_value = None template_id_value = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(q_use) elif q_type == "url": q_id_value = dune_urls.get_query_id(q_use) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": template_id_value = int(os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379")) except Exception: pass extra_fields: dict[str, Any] = {} if q_id_value is not None: extra_fields["query_id"] = q_id_value if template_id_value is not None: extra_fields["template_query_id"] = template_id_value self.query_history.record( execution_id=execution_id, query_type=q_type, query_preview=query, status="submitted", duration_ms=int((time.time() - t0) * 1000), query_sha256=q_sha, cache_hit=False, **extra_fields, ) return {"type": "execution", "execution_id": execution_id, "status": "submitted"} if format == "metadata": meta = self.query_service.fetch_metadata( query=q_use, parameters=parameters, max_age=max_age, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, extras=extras, performance=performance, ) return { "type": "metadata", "metadata": meta.get("metadata"), "next_uri": meta.get("next_uri"), "next_offset": meta.get("next_offset"), } result = self.query_service.execute( query=q_use, parameters=parameters, refresh=refresh, max_age=max_age, poll=True, timeout_seconds=timeout_seconds, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, extras=extras, include_execution=True, performance=performance, return_raw=format == "raw", ) duration_ms = int((time.time() - t0) * 1000) execution = result.get("execution", {}) execution_id = execution.get("execution_id", "unknown") q_type = _categorize_query(q_use) q_sha = self._persist_query_sql(q_use, q_type) # Determine query id (or template id for raw SQL) for history q_id_value: int | None = None template_id_value: int | None = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(q_use) elif q_type == "url": q_id_value = dune_urls.get_query_id(q_use) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": tmpl = os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379") template_id_value = int(tmpl) except Exception: pass extra_fields: dict[str, Any] = {} if q_id_value is not None: extra_fields["query_id"] = q_id_value if template_id_value is not None: extra_fields["template_query_id"] = template_id_value self.query_history.record( execution_id=execution_id, query_type=q_type, query_preview=query, status="success", duration_ms=duration_ms, rowcount=result.get("rowcount"), query_sha256=q_sha, cache_hit=False, **extra_fields, ) # Derive ids for payload and history q_id_value = None template_id_value = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(query) elif q_type == "url": q_id_value = dune_urls.get_query_id(query) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": template_id_value = int(os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379")) except Exception: pass payload = {**result} payload["execution_id"] = execution_id payload["duration_ms"] = result.get("duration_ms") if result.get("duration_ms") is not None else duration_ms if q_id_value is not None: payload["query_id"] = q_id_value if template_id_value is not None: payload["template_query_id"] = template_id_value # User-friendly web URL for the query (or template for raw SQL) q_for_url = q_id_value if q_id_value is not None else template_id_value if q_for_url is not None: payload["query_url"] = f"https://dune.com/queries/{q_for_url}" if "metadata" not in payload: payload["metadata"] = None if format == "raw": payload["type"] = "raw" payload["data"] = result.get("data") or result.get("data_preview") else: payload["type"] = "preview" return payload except Exception as exc: duration_ms = int((time.time() - t0) * 1000) # Try to include query id info on error as well q_type = _categorize_query(q_use) q_id_value: int | None = None template_id_value: int | None = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(q_use) elif q_type == "url": q_id_value = dune_urls.get_query_id(q_use) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": tmpl = os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379") template_id_value = int(tmpl) except Exception: pass extra_fields: dict[str, Any] = {} if q_id_value is not None: extra_fields["query_id"] = q_id_value if template_id_value is not None: extra_fields["template_query_id"] = template_id_value # Compute query SHA256 for better debugging q_sha = None try: q_sha = self._persist_query_sql(q_use, q_type) if q_sha: extra_fields["query_sha256"] = q_sha except Exception: pass self.query_history.record( execution_id="unknown", query_type=_categorize_query(query), query_preview=query, status="error", duration_ms=duration_ms, error=str(exc), **extra_fields, ) enriched = self._enrich_error(exc) context = {"tool": "dune_query", "query": q_use, "query_type": _categorize_query(q_use)} if enriched: context.update(enriched) # Add debugging information for raw SQL failures if q_type == "raw_sql" and "could not determine execution" in str(exc): context.update({ "debug_info": "Raw SQL execution failed - check template query configuration and API key", "template_query_id": template_id_value, "environment_vars": { "SPICE_RAW_SQL_QUERY_ID": os.getenv("SPICE_RAW_SQL_QUERY_ID"), "DUNE_API_KEY_present": bool(os.getenv("DUNE_API_KEY")) }, "suggested_action": "Retry or check if the template query (4060379) is accessible" }) return error_response(exc, context=context)
  • Explicit JSON schema definition for dune_query tool parameters in ExecuteQueryTool (matches function signature).
    def get_parameter_schema(self) -> dict[str, Any]: return { "type": "object", "properties": { "query": {"type": "string", "description": "Query ID, URL, or raw SQL"}, "parameters": {"type": "object", "description": "Query parameters"}, "refresh": {"type": "boolean", "default": False}, "max_age": {"type": "number"}, "limit": {"type": "integer"}, "offset": {"type": "integer"}, "sample_count": {"type": "integer"}, "sort_by": {"type": "string"}, "columns": {"type": "array", "items": {"type": "string"}}, "performance": { "type": "string", "enum": ["medium", "large"], "default": "medium", "description": "Request the medium (default) or large performance tier.", }, "format": { "type": "string", "enum": ["preview", "raw", "metadata", "poll"], "default": "preview", "description": "Preview returns compact data; raw returns all rows; poll returns execution handle only", }, "extras": { "type": "object", "description": "Optional advanced flags passed through to results (e.g., allow_partial_results)", "additionalProperties": True, "properties": { "allow_partial_results": {"type": "boolean"}, "ignore_max_datapoints_per_request": {"type": "boolean"}, }, }, "timeout_seconds": {"type": "number", "description": "Polling timeout in seconds"}, }, "required": ["query"], "additionalProperties": False, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server