execute_query
Run queries on Dune Analytics by ID to retrieve blockchain data, returning a job ID for tracking results.
Instructions
Execute a query by ID. Returns Job ID.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query_id | Yes | ||
| params | No |
Implementation Reference
- src/main.py:187-208 (handler)The primary MCP tool handler for 'execute_query'. Decorated with @mcp.tool() for automatic registration. Performs budget check, delegates to DuneService, tracks usage, handles errors, and returns job ID.@mcp.tool() def execute_query(query_id: int, params: Optional[Dict[str, Any]] = None) -> str: """ Execute a query by ID. Returns Job ID. """ try: # 1. Check Session Budget (Query Count) # We don't know exact cost yet, so we pass 0 for now, or an estimate if we had one. budget_manager.check_can_execute_query(estimated_cost=0) # 2. Execute job_id = dune_service.execute_query(query_id, params) # 3. Track (Assume minimal cost for now until we get result metadata) budget_manager.track_execution(cost=0) return f"Execution started. Job ID: {job_id}. Use 'get_job_status' to check progress." except BudgetExceededError as e: return f"EXECUTION DENIED: {str(e)}" except Exception as e: return f"Error executing query: {str(e)}"
- src/services/dune_client.py:454-484 (helper)DuneService.execute_query method: core implementation that triggers async query execution via Dune API POST /execute, returns execution_id (job_id). Called by the main handler.def execute_query(self, query_id: int, params: Optional[Dict[str, Any]] = None) -> str: # We want to start execution and return ID, NOT wait. # client.execute_query() waits. # client.execute() (base method) usually returns the response with job_id. query = QueryBase( query_id=query_id, params=[QueryParameter.from_dict(p) for p in (params or [])] ) # Using the lower-level execute method to get job_id without waiting # The SDK implementation of execute() typically performs the POST /execute call try: # We construct the execution payload manually if SDK doesn't expose non-blocking nicely # Or checking SDK: client.execute(query) -> ExecutionResult (which contains job_id) # wait... client.execute() waits for completion loop. # Use raw request for async trigger to be safe and efficient url = f"{self.base_url}/query/{query_id}/execute" payload = {"query_parameters": {p["name"]: p["value"] for p in (params or [])}} headers = {"X-Dune-Api-Key": self.api_key} response = requests.post(url, json=payload, headers=headers) response.raise_for_status() data = response.json() return data["execution_id"] except Exception as e: logger.error(f"Error executing query {query_id}: {e}") raise
- src/services/budget_manager.py:23-43 (helper)BudgetManager.check_can_execute_query: Validates session budget before query execution. Raises BudgetExceededError if query count or credits exceeded. Called from handler.def check_can_execute_query(self, estimated_cost: float = 0.0) -> None: """ Check if a new query can be executed within the session budget. Raises BudgetExceededError if limits would be breached. """ # Check query count limit if self.query_count >= self.config.max_queries: raise BudgetExceededError( f"Session limit reached: Maximum {self.config.max_queries} queries allowed. " "Please search for existing queries or optimize your workflow." ) # Check credit limit if self.used_credits + estimated_cost > self.config.max_credits: remaining = self.config.max_credits - self.used_credits raise BudgetExceededError( f"Credit limit exceeded: Query costs ~{estimated_cost} credits, " f"but only {remaining:.2f} remaining in session budget." ) def check_can_access_schema(self) -> None:
- src/main.py:136-136 (helper)Usage of budget check in get_table_schema tool (another tool that executes queries). Shows pattern but not core to execute_query.budget_manager.check_can_execute_query(estimated_cost=1) # Assume 1 credit base cost