execute_query
Run queries on Dune Analytics by specifying a query ID, returning a job ID for tracking results.
Instructions
Execute a query by ID. Returns Job ID.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query_id | Yes | ||
| params | No |
Implementation Reference
- src/main.py:187-209 (handler)The primary MCP tool handler for 'execute_query'. Decorated with @mcp.tool(), it checks session budget, delegates execution to DuneService, tracks usage, and returns the job ID with instructions.@mcp.tool() def execute_query(query_id: int, params: Optional[Dict[str, Any]] = None) -> str: """ Execute a query by ID. Returns Job ID. """ try: # 1. Check Session Budget (Query Count) # We don't know exact cost yet, so we pass 0 for now, or an estimate if we had one. budget_manager.check_can_execute_query(estimated_cost=0) # 2. Execute job_id = dune_service.execute_query(query_id, params) # 3. Track (Assume minimal cost for now until we get result metadata) budget_manager.track_execution(cost=0) return f"Execution started. Job ID: {job_id}. Use 'get_job_status' to check progress." except BudgetExceededError as e: return f"EXECUTION DENIED: {str(e)}" except Exception as e: return f"Error executing query: {str(e)}"
- src/services/dune_client.py:464-494 (helper)Supporting service method in DuneService that performs the actual API POST to /query/{query_id}/execute, returning the execution_id (job ID). Called by the MCP handler.def execute_query(self, query_id: int, params: Optional[Dict[str, Any]] = None) -> str: # We want to start execution and return ID, NOT wait. # client.execute_query() waits. # client.execute() (base method) usually returns the response with job_id. query = QueryBase( query_id=query_id, params=[QueryParameter.from_dict(p) for p in (params or [])] ) # Using the lower-level execute method to get job_id without waiting # The SDK implementation of execute() typically performs the POST /execute call try: # We construct the execution payload manually if SDK doesn't expose non-blocking nicely # Or checking SDK: client.execute(query) -> ExecutionResult (which contains job_id) # wait... client.execute() waits for completion loop. # Use raw request for async trigger to be safe and efficient url = f"{self.base_url}/query/{query_id}/execute" payload = {"query_parameters": {p["name"]: p["value"] for p in (params or [])}} headers = {"X-Dune-Api-Key": self.api_key} response = requests.post(url, json=payload, headers=headers) response.raise_for_status() data = response.json() return data["execution_id"] except Exception as e: logger.error(f"Error executing query {query_id}: {e}") raise
- src/services/budget_manager.py:23-30 (helper)BudgetManager helper method used by the handler to check if a query execution is allowed based on session limits before proceeding.def check_can_execute_query(self, estimated_cost: float = 0.0) -> None: """ Check if a new query can be executed within the session budget. Raises BudgetExceededError if limits would be breached. """ # Check query count limit if self.query_count >= self.config.max_queries: raise BudgetExceededError(