Skip to main content
Glama
dune_client.py21.7 kB
import os import logging import requests from typing import List, Dict, Any, Optional from dune_client.client import DuneClient from dune_client.query import QueryBase from dune_client.types import QueryParameter from dotenv import load_dotenv from .cache import CacheManager load_dotenv() logger = logging.getLogger(__name__) class DuneService: def __init__(self, cache_manager: CacheManager): self.api_key = os.getenv("DUNE_API_KEY") self.base_url = os.getenv("DUNE_API_BASE_URL", "https://api.dune.com/api/v1") if not self.api_key: raise ValueError("DUNE_API_KEY environment variable not set") self.client = DuneClient(self.api_key) self.cache = cache_manager def _get_graphql_response(self, payload: Dict[str, Any], timeout: int = 30) -> Optional[Dict[str, Any]]: url = "https://core-api.dune.com/public/graphql" try: from curl_cffi import requests as cffi_requests response = cffi_requests.post( url, json=payload, impersonate="chrome", headers={ "Referer": "https://dune.com/browse/queries", "Content-Type": "application/json", }, timeout=timeout ) # Check for Cloudflare/WAF blocks if response.status_code == 403 or "Access Denied" in response.text: logger.warning("Dune GraphQL endpoint blocked by Cloudflare (403 Forbidden).") return {"error": "WAF_BLOCK"} response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) return response.json() except Exception as e: logger.error(f"GraphQL request failed: {e}") return None def _github_api_request(self, url: str) -> Optional[Dict[str, Any]]: headers = {"Accept": "application/vnd.github.v3+json"} # For higher rate limits, user can set GITHUB_TOKEN in .env github_token = os.getenv("GITHUB_TOKEN") if github_token: headers["Authorization"] = f"token {github_token}" try: response = requests.get(url, headers=headers, timeout=15) response.raise_for_status() return response.json() except Exception as e: logger.error(f"GitHub API request failed for {url}: {e}") return None def search_spellbook(self, keyword: str) -> List[Dict[str, Any]]: """ Searches the duneanalytics/spellbook GitHub repository for SQL models and schema files. Results are cached for 24 hours. """ cache_key = f"search:{keyword}" cached_results = self.cache.get("github", cache_key) if cached_results: return cached_results base_url = "https://api.github.com/search/code" repo = "repo:duneanalytics/spellbook" # Search for .sql files sql_query = f"{keyword} {repo} in:file extension:sql" sql_url = f"{base_url}?q={sql_query}" sql_results = self._github_api_request(sql_url) # Search for schema.yml files (which might contain definitions) yaml_query = f"{keyword} {repo} in:file filename:schema.yml" yaml_url = f"{base_url}?q={yaml_query}" yaml_results = self._github_api_request(yaml_url) found_files = [] if sql_results and sql_results.get("items"): for item in sql_results["items"]: found_files.append({ "name": item["name"], "path": item["path"], "url": item["html_url"], "type": "sql_model", "repo_url": item["url"] # API URL for file content }) if yaml_results and yaml_results.get("items"): for item in yaml_results["items"]: found_files.append({ "name": item["name"], "path": item["path"], "url": item["html_url"], "type": "schema_definition", "repo_url": item["url"] }) if found_files: self.cache.set("github", cache_key, found_files) return found_files def get_spellbook_file_content(self, path: str) -> Optional[str]: """ Fetches the raw content of a file from the duneanalytics/spellbook GitHub repository. 'path' should be the full path within the repository (e.g., 'models/dex/uniswap/trades.sql'). Content is cached for 24 hours. """ cache_key = f"content:{path}" cached_content = self.cache.get("github", cache_key) if cached_content: return cached_content # GitHub raw content URL pattern raw_url = f"https://raw.githubusercontent.com/duneanalytics/spellbook/main/{path}" try: response = requests.get(raw_url, timeout=15) response.raise_for_status() content = response.text self.cache.set("github", cache_key, content) return content except Exception as e: logger.error(f"Failed to fetch content for {path} from GitHub: {e}") return None def get_user_id_by_handle(self, handle: str) -> Optional[int]: payload = { "operationName": "FindUser", "variables": {"name": handle}, "query": """ query FindUser($name: String!) { users(filters: { name: { equals: $name } }) { edges { node { id name handle } } } } """ } response_data = self._get_graphql_response(payload) if response_data and "error" in response_data and response_data["error"] == "WAF_BLOCK": return -1 # Special sentinel for Blocked if response_data: edges = response_data.get("data", {}).get("users", {}).get("edges", []) if edges: # Assuming handle is unique, take the first result user_id = edges[0].get("node", {}).get("id") try: return int(user_id) except (ValueError, TypeError): logger.error(f"Could not convert user ID '{user_id}' to int for handle '{handle}'") return None return None def search_queries(self, query: str) -> Any: # Changed return hint to Any to support error dict """ Search for public queries using Dune's GraphQL endpoint. Returns List[Dict] on success, or Dict with error on failure. """ # We use the internal GraphQL API because the Public API V1 # doesn't support generic keyword search yet. url = "https://core-api.dune.com/public/graphql" payload = { "operationName": "SearchQueries", "variables": {"term": query}, "query": """ query SearchQueries($term: String!) { queries( filters: { name: { contains: $term } } pagination: { first: 10 } ) { edges { node { id name description user { name handle } } } } } """ } response_data = self._get_graphql_response(payload) if response_data and "error" in response_data and response_data["error"] == "WAF_BLOCK": return {"error": "Public search is currently blocked by Dune's security. Please use 'search_spellbook' to find tables or 'get_query_details' if you have an ID."} if response_data: edges = response_data.get("data", {}).get("queries", {}).get("edges", []) results = [] for edge in edges: node = edge.get("node", {}) if not node: continue results.append({ "id": node.get("id"), "name": node.get("name"), "owner": node.get("user", {}).get("handle", "unknown"), "description": node.get("description", "") }) return results return [] def list_user_queries(self, user_id: int, limit: int = 10) -> Any: # Changed return hint """ List queries for a given user ID using Dune's GraphQL endpoint. """ if user_id == -1: # WAF Block sentinel return {"error": "User lookup failed due to Cloudflare block. Cannot list queries."} url = "https://core-api.dune.com/public/graphql" # Redundant, but good for clarity. payload = { "operationName": "ListUserQueries", "variables": {"userId": user_id, "limit": limit}, "query": """ query ListUserQueries($userId: Int!, $limit: Int!) { queries( filters: { userId: { equals: $userId } } pagination: { first: $limit } ) { edges { node { id name description user { name handle } } } } } """ } response_data = self._get_graphql_response(payload) if response_data and "error" in response_data and response_data["error"] == "WAF_BLOCK": return {"error": "Public search is currently blocked by Dune's security."} if response_data: edges = response_data.get("data", {}).get("queries", {}).get("edges", []) results = [] for edge in edges: node = edge.get("node", {}) if not node: continue results.append({ "id": node.get("id"), "name": node.get("name"), "owner": node.get("user", {}).get("handle", "unknown"), "description": node.get("description", "") }) return results return [] def get_table_schema(self, table_name: str) -> Dict[str, Any]: """ Fetches the schema (columns and types) for a table by running a 'SELECT * FROM table LIMIT 0' query. WARNING: This consumes Dune credits! """ sql = f"SELECT * FROM {table_name} LIMIT 0" # We need to execute and wait for the result to get metadata # create_query usually requires a name, but we can use execute_query with raw sql # via the query_id (if we had one) or generic execute. # dune-client 1.x allows executing raw SQL via `execute_query` if we pass a Query object # but usually we need a query ID. # Actually, the official SDK/API usually requires a Query ID to execute anything. # We can't just send raw SQL without an existing Query ID container unless we create one. # BUT, we can use the "Query ID 0" or "Ad-hoc" mode if supported, or we must create a query. # Let's check if we can use `client.run_sql` or similar (from our introspection earlier). # We saw `run_sql`. Let's try that. try: # run_sql takes (query_sql, performance=...) # It returns a ResultsResponse result = self.client.run_sql( query_sql=sql, performance="medium" # medium is usually fine/cheapest ) # The result object has 'meta' -> 'columns' # We need to inspect the structure of 'result' # Usually result.result.metadata.column_names / column_types if not result or not result.result: return {"error": "No result returned"} meta = result.result.metadata columns = [] if meta: # Based on debugging, ResultMetadata has column_names and column_types directly if hasattr(meta, 'column_names') and hasattr(meta, 'column_types'): for i, name in enumerate(meta.column_names): # Ensure meta.column_types is also indexed by i col_type = meta.column_types[i] if i < len(meta.column_types) else "unknown" columns.append({"name": name, "type": col_type}) else: logger.warning("Could not find column_names/column_types in ResultMetadata.") return { "table": table_name, "columns": columns } except Exception as e: logger.error(f"Error getting schema for {table_name}: {e}") raise def _get_query_graphql(self, query_id: int) -> Optional[Dict[str, Any]]: """ Fallback: Fetch query details via GraphQL if the official API returns 403. This often happens for 'Community' queries that aren't explicitly published but are visible on the web. """ payload = { "operationName": "GetQuery", "variables": {"id": query_id}, "query": """ query GetQuery($id: Int!) { query(id: $id) { id name description parameters ownerFields { query } } } """ } response_data = self._get_graphql_response(payload) if response_data and "data" in response_data: q = response_data["data"].get("query") if q: # Map GraphQL structure to expected SDK structure return { "id": q["id"], "name": q["name"], "description": q["description"] or "", "sql": q.get("ownerFields", {}).get("query", ""), # The SQL is here! "parameters": q.get("parameters", []) # JSON scalar } return None def create_query(self, name: str, sql: str, description: str = "") -> int: """ Creates a new query in Dune. Returns the new Query ID. """ try: # client.create_query returns a Query object, we need its ID # Removed 'description' arg as it's not supported in current SDK version query = self.client.create_query(name=name, query_sql=sql) return query.base.query_id except Exception as e: logger.error(f"Error creating query '{name}': {e}") raise def update_query(self, query_id: int, sql: str, description: str = None, name: str = None) -> int: """ Updates an existing query. Returns the Query ID. """ try: # client.update_query takes query_id and optional fields # Removed 'description' arg as it's not supported in current SDK version self.client.update_query(query_id, query_sql=sql, name=name) return query_id except Exception as e: logger.error(f"Error updating query {query_id}: {e}") raise def archive_query(self, query_id: int) -> bool: """ Archives a query. Returns True on success. """ try: return self.client.archive_query(query_id) except Exception as e: logger.error(f"Error archiving query {query_id}: {e}") raise def get_query(self, query_id: int) -> Dict[str, Any]: cache_key = str(query_id) cached = self.cache.get("query", cache_key) if cached: return cached try: query = self.client.get_query(query_id) # Serialize data = { "id": query.base.query_id, "name": query.base.name, "description": query.base.description or "", "sql": query.sql, "parameters": [p.to_dict() for p in query.base.parameters] if query.base.parameters else [] } self.cache.set("query", cache_key, data) return data except Exception as e: # Check for 403 Forbidden (common for public-but-not-published queries) is_forbidden = "403" in str(e) or "Forbidden" in str(e) if is_forbidden: logger.info(f"Access Forbidden via SDK for Query {query_id}. Attempting GraphQL fallback...") fallback_data = self._get_query_graphql(query_id) if fallback_data: self.cache.set("query", cache_key, fallback_data) return fallback_data logger.error(f"Error fetching query {query_id}: {e}") raise def execute_query(self, query_id: int, params: Optional[Dict[str, Any]] = None) -> str: # We want to start execution and return ID, NOT wait. # client.execute_query() waits. # client.execute() (base method) usually returns the response with job_id. query = QueryBase( query_id=query_id, params=[QueryParameter.from_dict(p) for p in (params or [])] ) # Using the lower-level execute method to get job_id without waiting # The SDK implementation of execute() typically performs the POST /execute call try: # We construct the execution payload manually if SDK doesn't expose non-blocking nicely # Or checking SDK: client.execute(query) -> ExecutionResult (which contains job_id) # wait... client.execute() waits for completion loop. # Use raw request for async trigger to be safe and efficient url = f"{self.base_url}/query/{query_id}/execute" payload = {"query_parameters": {p["name"]: p["value"] for p in (params or [])}} headers = {"X-Dune-Api-Key": self.api_key} response = requests.post(url, json=payload, headers=headers) response.raise_for_status() data = response.json() return data["execution_id"] except Exception as e: logger.error(f"Error executing query {query_id}: {e}") raise def get_status(self, job_id: str) -> Dict[str, Any]: """ Returns the status of a job, including credit usage if available. """ cached = self.cache.get("status", job_id) if cached: # Check if cached value is old string format or new dict format if isinstance(cached, str): return {"state": cached} return cached status_response = self.client.get_execution_status(job_id) state = str(status_response.state).replace("ExecutionState.", "") result = { "state": state, "credits_used": getattr(status_response, "execution_cost_credits", None), "execution_time": getattr(status_response, "execution_time_millis", 0) # sometimes available } if state in ["COMPLETED", "FAILED", "CANCELLED"]: self.cache.set("status", job_id, result) return result def get_result(self, job_id: str) -> Any: return self.client.get_execution_results(job_id) def analyze_result(self, job_id: str, data_processor: Any) -> Dict[str, Any]: """ Fetches results and runs advanced analysis using the DataProcessor. """ result = self.get_result(job_id) return data_processor.analyze_dataframe(result) def get_usage(self) -> Dict[str, Any]: """Returns the credit usage info.""" # Raw request as get_usage might return complex object url = f"{self.base_url}/auth/usage" # Verify endpoint # The SDK has client.get_usage(), let's use that if it works try: # Note: SDK get_usage might be for generic usage or specific endpoint # Let's rely on SDK # Inspecting check_dune.py output: 'get_usage' exists! # It likely returns a pydantic model or dict. return self.client.get_usage() # We will inspect this return type at runtime except Exception as e: logger.error(f"Error getting usage: {e}") # Fallback mock return {"error": str(e)}

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nice-bills/dune-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server