Skip to main content
Glama

dune_query

Execute Dune Analytics queries to analyze blockchain data with agent-optimized previews and customizable parameters.

Instructions

Execute Dune queries and return agent-optimized preview.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
parametersNo
refreshNo
max_ageNo
limitNo
offsetNo
sample_countNo
sort_byNo
columnsNo
formatNopreview
extrasNo
timeout_secondsNo

Implementation Reference

  • The core handler function ExecuteQueryTool.execute() that implements the logic for executing Dune queries, handling parameters, caching, polling, formatting results, logging to history, and comprehensive error handling with context enrichment.
    def execute( self, *, query: str, parameters: dict[str, Any] | None = None, refresh: bool = False, max_age: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: list[str] | None = None, performance: str = "medium", format: str = "preview", extras: dict[str, Any] | None = None, timeout_seconds: float | None = None, ) -> dict[str, Any]: t0 = time.time() try: # Use native SHOW statements directly - they're faster than information_schema queries # See issue #10: https://github.com/Evan-Kim2028/spice-mcp/issues/10 # Removed rewrite to avoid performance issues with information_schema queries q_use = query # Poll-only: return execution handle without fetching results if format == "poll": exec_obj = execute_dune_query( q_use, parameters=parameters, api_key=self.config.dune.api_key, performance=performance, poll=False, http_client=self._http, ) execution_id = exec_obj.get("execution_id", "unknown") q_type = _categorize_query(q_use) q_sha = self._persist_query_sql(q_use, q_type) # Determine IDs for history q_id_value = None template_id_value = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(q_use) elif q_type == "url": q_id_value = dune_urls.get_query_id(q_use) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": template_id_value = int(os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379")) except Exception: pass extra_fields: dict[str, Any] = {} if q_id_value is not None: extra_fields["query_id"] = q_id_value if template_id_value is not None: extra_fields["template_query_id"] = template_id_value self.query_history.record( execution_id=execution_id, query_type=q_type, query_preview=query, status="submitted", duration_ms=int((time.time() - t0) * 1000), query_sha256=q_sha, cache_hit=False, **extra_fields, ) return {"type": "execution", "execution_id": execution_id, "status": "submitted"} if format == "metadata": meta = self.query_service.fetch_metadata( query=q_use, parameters=parameters, max_age=max_age, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, extras=extras, performance=performance, ) return { "type": "metadata", "metadata": meta.get("metadata"), "next_uri": meta.get("next_uri"), "next_offset": meta.get("next_offset"), } result = self.query_service.execute( query=q_use, parameters=parameters, refresh=refresh, max_age=max_age, poll=True, timeout_seconds=timeout_seconds, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, extras=extras, include_execution=True, performance=performance, return_raw=format == "raw", ) duration_ms = int((time.time() - t0) * 1000) execution = result.get("execution", {}) execution_id = execution.get("execution_id", "unknown") q_type = _categorize_query(q_use) q_sha = self._persist_query_sql(q_use, q_type) # Determine query id (or template id for raw SQL) for history q_id_value: int | None = None template_id_value: int | None = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(q_use) elif q_type == "url": q_id_value = dune_urls.get_query_id(q_use) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": tmpl = os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379") template_id_value = int(tmpl) except Exception: pass extra_fields: dict[str, Any] = {} if q_id_value is not None: extra_fields["query_id"] = q_id_value if template_id_value is not None: extra_fields["template_query_id"] = template_id_value self.query_history.record( execution_id=execution_id, query_type=q_type, query_preview=query, status="success", duration_ms=duration_ms, rowcount=result.get("rowcount"), query_sha256=q_sha, cache_hit=False, **extra_fields, ) # Derive ids for payload and history q_id_value = None template_id_value = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(query) elif q_type == "url": q_id_value = dune_urls.get_query_id(query) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": template_id_value = int(os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379")) except Exception: pass payload = {**result} payload["execution_id"] = execution_id payload["duration_ms"] = result.get("duration_ms") if result.get("duration_ms") is not None else duration_ms if q_id_value is not None: payload["query_id"] = q_id_value if template_id_value is not None: payload["template_query_id"] = template_id_value # User-friendly web URL for the query (or template for raw SQL) q_for_url = q_id_value if q_id_value is not None else template_id_value if q_for_url is not None: payload["query_url"] = f"https://dune.com/queries/{q_for_url}" if "metadata" not in payload: payload["metadata"] = None if format == "raw": payload["type"] = "raw" payload["data"] = result.get("data") or result.get("data_preview") else: payload["type"] = "preview" return payload except Exception as exc: duration_ms = int((time.time() - t0) * 1000) # Try to include query id info on error as well q_type = _categorize_query(q_use) q_id_value: int | None = None template_id_value: int | None = None raw_sql_engine = os.getenv("SPICE_DUNE_RAW_SQL_ENGINE", "execution_sql") try: if q_type == "query_id": q_id_value = int(q_use) elif q_type == "url": q_id_value = dune_urls.get_query_id(q_use) else: # raw SQL: only use template id if using template engine if raw_sql_engine == "template": tmpl = os.getenv("SPICE_RAW_SQL_QUERY_ID", "4060379") template_id_value = int(tmpl) except Exception: pass extra_fields: dict[str, Any] = {} if q_id_value is not None: extra_fields["query_id"] = q_id_value if template_id_value is not None: extra_fields["template_query_id"] = template_id_value # Compute query SHA256 for better debugging q_sha = None try: q_sha = self._persist_query_sql(q_use, q_type) if q_sha: extra_fields["query_sha256"] = q_sha except Exception: pass self.query_history.record( execution_id="unknown", query_type=_categorize_query(query), query_preview=query, status="error", duration_ms=duration_ms, error=str(exc), **extra_fields, ) enriched = self._enrich_error(exc) context = {"tool": "dune_query", "query": q_use, "query_type": _categorize_query(q_use)} if enriched: context.update(enriched) # Add debugging information for raw SQL failures if q_type == "raw_sql" and "could not determine execution" in str(exc): context.update({ "debug_info": "Raw SQL execution failed - check template query configuration and API key", "template_query_id": template_id_value, "environment_vars": { "SPICE_RAW_SQL_QUERY_ID": os.getenv("SPICE_RAW_SQL_QUERY_ID"), "DUNE_API_KEY_present": bool(os.getenv("DUNE_API_KEY")) }, "suggested_action": "Retry or check if the template query (4060379) is accessible" }) return error_response(exc, context=context)
  • Defines the JSON schema for the dune_query tool parameters, including query, parameters, pagination, formatting options, and advanced extras.
    def get_parameter_schema(self) -> dict[str, Any]: return { "type": "object", "properties": { "query": {"type": "string", "description": "Query ID, URL, or raw SQL"}, "parameters": {"type": "object", "description": "Query parameters"}, "refresh": {"type": "boolean", "default": False}, "max_age": {"type": "number"}, "limit": {"type": "integer"}, "offset": {"type": "integer"}, "sample_count": {"type": "integer"}, "sort_by": {"type": "string"}, "columns": {"type": "array", "items": {"type": "string"}}, "performance": { "type": "string", "enum": ["medium", "large"], "default": "medium", "description": "Request the medium (default) or large performance tier.", }, "format": { "type": "string", "enum": ["preview", "raw", "metadata", "poll"], "default": "preview", "description": "Preview returns compact data; raw returns all rows; poll returns execution handle only", }, "extras": { "type": "object", "description": "Optional advanced flags passed through to results (e.g., allow_partial_results)", "additionalProperties": True, "properties": { "allow_partial_results": {"type": "boolean"}, "ignore_max_datapoints_per_request": {"type": "boolean"}, }, }, "timeout_seconds": {"type": "number", "description": "Polling timeout in seconds"}, }, "required": ["query"], "additionalProperties": False, }
  • FastMCP registration of the dune_query tool using @app.tool decorator, defining the entry-point function that delegates to the core implementation.
    @app.tool( name="dune_query", title="Run Dune Query", description="Execute Dune queries and return agent-optimized preview.", tags={"dune", "query"}, ) def dune_query( query: str, parameters: dict[str, Any] | None = None, refresh: bool = False, max_age: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: list[str] | None = None, format: Literal["preview", "raw", "metadata", "poll"] = "preview", extras: dict[str, Any] | None = None, timeout_seconds: float | None = None, ) -> dict[str, Any]: """Execute Dune queries (by ID, URL, or raw SQL) and return agent-optimized preview. ⚠️ IMPORTANT: ALWAYS use dune_discover FIRST to find verified table names. Do not guess table names or query information_schema directly. The query parameter accepts: - Query IDs (e.g., "123456") - Query URLs (e.g., "https://dune.com/queries/123456") - Raw SQL using tables discovered via dune_discover For Spellbook models, use the 'dune_table' field returned by dune_discover. Example: dune_discover(keyword="walrus") → returns dune_table="sui_walrus.base_table" Then use: dune_query(query="SELECT * FROM sui_walrus.base_table LIMIT 10") This wrapper ensures FastMCP doesn't detect overloads in imported functions. """ # Always ensure parameters is explicitly passed (even if None) to avoid FastMCP # overload detection when the keyword is omitted return _dune_query_impl( query=query, parameters=parameters, refresh=refresh, max_age=max_age, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, format=format, extras=extras, timeout_seconds=timeout_seconds, )
  • Internal implementation wrapper that normalizes inputs and calls the core ExecuteQueryTool.execute handler.
    def _dune_query_impl( query: str, parameters: dict[str, Any] | None = None, refresh: bool = False, max_age: float | None = None, limit: int | None = None, offset: int | None = None, sample_count: int | None = None, sort_by: str | None = None, columns: list[str] | None = None, format: Literal["preview", "raw", "metadata", "poll"] = "preview", extras: dict[str, Any] | None = None, timeout_seconds: float | None = None, ) -> dict[str, Any]: """Internal implementation of dune_query to avoid FastMCP overload detection.""" _ensure_initialized() assert EXECUTE_QUERY_TOOL is not None # Normalize parameters: handle case where MCP client passes JSON string # This can happen if FastMCP's schema generation doesn't match client expectations normalized_parameters = parameters if isinstance(parameters, str): try: import json normalized_parameters = json.loads(parameters) except (json.JSONDecodeError, TypeError): return error_response( ValueError(f"parameters must be a dict or JSON string, got {type(parameters).__name__}"), context={ "tool": "dune_query", "query": query, "parameters_type": type(parameters).__name__, } ) # Normalize extras similarly normalized_extras = extras if isinstance(extras, str): try: import json normalized_extras = json.loads(extras) except (json.JSONDecodeError, TypeError): normalized_extras = None try: # Execute query synchronously return EXECUTE_QUERY_TOOL.execute( query=query, parameters=normalized_parameters, refresh=refresh, max_age=max_age, limit=limit, offset=offset, sample_count=sample_count, sort_by=sort_by, columns=columns, format=format, extras=normalized_extras, timeout_seconds=timeout_seconds, ) except Exception as e: return error_response(e, context={ "tool": "dune_query", "query": query, "limit": limit, "offset": offset, })
  • Instantiation of the ExecuteQueryTool instance used by the dune_query tool.
    EXECUTE_QUERY_TOOL = ExecuteQueryTool(CONFIG, QUERY_SERVICE, QUERY_HISTORY)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server