import os
import logging
import requests
from typing import List, Dict, Any, Optional
from dune_client.client import DuneClient
from dune_client.query import QueryBase
from dune_client.types import QueryParameter
from dotenv import load_dotenv
from .cache import CacheManager
load_dotenv()
logger = logging.getLogger(__name__)
class DuneService:
def __init__(self, cache_manager: CacheManager):
self.api_key = os.getenv("DUNE_API_KEY")
self.base_url = os.getenv("DUNE_API_BASE_URL", "https://api.dune.com/api/v1")
if not self.api_key:
raise ValueError("DUNE_API_KEY environment variable not set")
self.client = DuneClient(self.api_key)
self.cache = cache_manager
def _get_graphql_response(self, payload: Dict[str, Any], timeout: int = 30) -> Optional[Dict[str, Any]]:
url = "https://core-api.dune.com/public/graphql"
try:
from curl_cffi import requests as cffi_requests
response = cffi_requests.post(
url,
json=payload,
impersonate="chrome",
headers={
"Referer": "https://dune.com/browse/queries",
"Content-Type": "application/json",
},
timeout=timeout
)
# Check for Cloudflare/WAF blocks
if response.status_code == 403 or "Access Denied" in response.text:
logger.warning("Dune GraphQL endpoint blocked by Cloudflare (403 Forbidden).")
return {"error": "WAF_BLOCK"}
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
return response.json()
except Exception as e:
logger.error(f"GraphQL request failed: {e}")
return None
def _github_api_request(self, url: str) -> Optional[Dict[str, Any]]:
headers = {"Accept": "application/vnd.github.v3+json"}
# For higher rate limits, user can set GITHUB_TOKEN in .env
github_token = os.getenv("GITHUB_TOKEN")
if github_token:
headers["Authorization"] = f"token {github_token}"
try:
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"GitHub API request failed for {url}: {e}")
return None
def search_spellbook(self, keyword: str) -> List[Dict[str, Any]]:
"""
Searches the duneanalytics/spellbook GitHub repository for SQL models and schema files.
Results are cached for 24 hours.
"""
cache_key = f"search:{keyword}"
cached_results = self.cache.get("github", cache_key)
if cached_results:
return cached_results
base_url = "https://api.github.com/search/code"
repo = "repo:duneanalytics/spellbook"
# Search for .sql files
sql_query = f"{keyword} {repo} in:file extension:sql"
sql_url = f"{base_url}?q={sql_query}"
sql_results = self._github_api_request(sql_url)
# Search for schema.yml files (which might contain definitions)
yaml_query = f"{keyword} {repo} in:file filename:schema.yml"
yaml_url = f"{base_url}?q={yaml_query}"
yaml_results = self._github_api_request(yaml_url)
found_files = []
if sql_results and sql_results.get("items"):
for item in sql_results["items"]:
found_files.append({
"name": item["name"],
"path": item["path"],
"url": item["html_url"],
"type": "sql_model",
"repo_url": item["url"] # API URL for file content
})
if yaml_results and yaml_results.get("items"):
for item in yaml_results["items"]:
found_files.append({
"name": item["name"],
"path": item["path"],
"url": item["html_url"],
"type": "schema_definition",
"repo_url": item["url"]
})
if found_files:
self.cache.set("github", cache_key, found_files)
return found_files
def get_spellbook_file_content(self, path: str) -> Optional[str]:
"""
Fetches the raw content of a file from the duneanalytics/spellbook GitHub repository.
'path' should be the full path within the repository (e.g., 'models/dex/uniswap/trades.sql').
Content is cached for 24 hours.
"""
cache_key = f"content:{path}"
cached_content = self.cache.get("github", cache_key)
if cached_content:
return cached_content
# GitHub raw content URL pattern
raw_url = f"https://raw.githubusercontent.com/duneanalytics/spellbook/main/{path}"
try:
response = requests.get(raw_url, timeout=15)
response.raise_for_status()
content = response.text
self.cache.set("github", cache_key, content)
return content
except Exception as e:
logger.error(f"Failed to fetch content for {path} from GitHub: {e}")
return None
def get_user_id_by_handle(self, handle: str) -> Optional[int]:
payload = {
"operationName": "FindUser",
"variables": {"name": handle},
"query": """
query FindUser($name: String!) {
users(filters: { name: { equals: $name } }) {
edges {
node {
id
name
handle
}
}
}
}
"""
}
response_data = self._get_graphql_response(payload)
if response_data and "error" in response_data and response_data["error"] == "WAF_BLOCK":
return -1 # Special sentinel for Blocked
if response_data:
edges = response_data.get("data", {}).get("users", {}).get("edges", [])
if edges:
# Assuming handle is unique, take the first result
user_id = edges[0].get("node", {}).get("id")
try:
return int(user_id)
except (ValueError, TypeError):
logger.error(f"Could not convert user ID '{user_id}' to int for handle '{handle}'")
return None
return None
def search_queries(self, query: str) -> Any: # Changed return hint to Any to support error dict
"""
Search for public queries using Dune's GraphQL endpoint.
Returns List[Dict] on success, or Dict with error on failure.
"""
# We use the internal GraphQL API because the Public API V1
# doesn't support generic keyword search yet.
url = "https://core-api.dune.com/public/graphql"
payload = {
"operationName": "SearchQueries",
"variables": {"term": query},
"query": """
query SearchQueries($term: String!) {
queries(
filters: { name: { contains: $term } }
pagination: { first: 10 }
) {
edges {
node {
id
name
description
user {
name
handle
}
}
}
}
}
"""
}
response_data = self._get_graphql_response(payload)
if response_data and "error" in response_data and response_data["error"] == "WAF_BLOCK":
return {"error": "Public search is currently blocked by Dune's security. Please use 'search_spellbook' to find tables or 'get_query_details' if you have an ID."}
if response_data:
edges = response_data.get("data", {}).get("queries", {}).get("edges", [])
results = []
for edge in edges:
node = edge.get("node", {})
if not node:
continue
results.append({
"id": node.get("id"),
"name": node.get("name"),
"owner": node.get("user", {}).get("handle", "unknown"),
"description": node.get("description", "")
})
return results
return []
def list_user_queries(self, user_id: int, limit: int = 10) -> Any: # Changed return hint
"""
List queries for a given user ID using Dune's GraphQL endpoint.
"""
if user_id == -1: # WAF Block sentinel
return {"error": "User lookup failed due to Cloudflare block. Cannot list queries."}
url = "https://core-api.dune.com/public/graphql" # Redundant, but good for clarity.
payload = {
"operationName": "ListUserQueries",
"variables": {"userId": user_id, "limit": limit},
"query": """
query ListUserQueries($userId: Int!, $limit: Int!) {
queries(
filters: { userId: { equals: $userId } }
pagination: { first: $limit }
) {
edges {
node {
id
name
description
user {
name
handle
}
}
}
}
}
"""
}
response_data = self._get_graphql_response(payload)
if response_data and "error" in response_data and response_data["error"] == "WAF_BLOCK":
return {"error": "Public search is currently blocked by Dune's security."}
if response_data:
edges = response_data.get("data", {}).get("queries", {}).get("edges", [])
results = []
for edge in edges:
node = edge.get("node", {})
if not node:
continue
results.append({
"id": node.get("id"),
"name": node.get("name"),
"owner": node.get("user", {}).get("handle", "unknown"),
"description": node.get("description", "")
})
return results
return []
def get_table_schema(self, table_name: str) -> Dict[str, Any]:
"""
Fetches the schema (columns and types) for a table by running a
'SELECT * FROM table LIMIT 0' query.
WARNING: This consumes Dune credits!
"""
sql = f"SELECT * FROM {table_name} LIMIT 0"
# We need to execute and wait for the result to get metadata
# create_query usually requires a name, but we can use execute_query with raw sql
# via the query_id (if we had one) or generic execute.
# dune-client 1.x allows executing raw SQL via `execute_query` if we pass a Query object
# but usually we need a query ID.
# Actually, the official SDK/API usually requires a Query ID to execute anything.
# We can't just send raw SQL without an existing Query ID container unless we create one.
# BUT, we can use the "Query ID 0" or "Ad-hoc" mode if supported, or we must create a query.
# Let's check if we can use `client.run_sql` or similar (from our introspection earlier).
# We saw `run_sql`. Let's try that.
try:
# run_sql takes (query_sql, performance=...)
# It returns a ResultsResponse
result = self.client.run_sql(
query_sql=sql,
performance="medium" # medium is usually fine/cheapest
)
# The result object has 'meta' -> 'columns'
# We need to inspect the structure of 'result'
# Usually result.result.metadata.column_names / column_types
if not result or not result.result:
return {"error": "No result returned"}
meta = result.result.metadata
columns = []
if meta:
# Based on debugging, ResultMetadata has column_names and column_types directly
if hasattr(meta, 'column_names') and hasattr(meta, 'column_types'):
for i, name in enumerate(meta.column_names):
# Ensure meta.column_types is also indexed by i
col_type = meta.column_types[i] if i < len(meta.column_types) else "unknown"
columns.append({"name": name, "type": col_type})
else:
logger.warning("Could not find column_names/column_types in ResultMetadata.")
return {
"table": table_name,
"columns": columns
}
except Exception as e:
logger.error(f"Error getting schema for {table_name}: {e}")
raise
def _get_query_graphql(self, query_id: int) -> Optional[Dict[str, Any]]:
"""
Fallback: Fetch query details via GraphQL if the official API returns 403.
This often happens for 'Community' queries that aren't explicitly published
but are visible on the web.
"""
payload = {
"operationName": "GetQuery",
"variables": {"id": query_id},
"query": """
query GetQuery($id: Int!) {
query(id: $id) {
id
name
description
parameters
ownerFields {
query
}
}
}
"""
}
response_data = self._get_graphql_response(payload)
if response_data and "data" in response_data:
q = response_data["data"].get("query")
if q:
# Map GraphQL structure to expected SDK structure
return {
"id": q["id"],
"name": q["name"],
"description": q["description"] or "",
"sql": q.get("ownerFields", {}).get("query", ""), # The SQL is here!
"parameters": q.get("parameters", []) # JSON scalar
}
return None
def create_query(self, name: str, sql: str, description: str = "") -> int:
"""
Creates a new query in Dune. Returns the new Query ID.
"""
try:
# client.create_query returns a Query object, we need its ID
# Removed 'description' arg as it's not supported in current SDK version
query = self.client.create_query(name=name, query_sql=sql)
return query.base.query_id
except Exception as e:
logger.error(f"Error creating query '{name}': {e}")
raise
def update_query(self, query_id: int, sql: str, description: str = None, name: str = None) -> int:
"""
Updates an existing query. Returns the Query ID.
"""
try:
# client.update_query takes query_id and optional fields
# Removed 'description' arg as it's not supported in current SDK version
self.client.update_query(query_id, query_sql=sql, name=name)
return query_id
except Exception as e:
logger.error(f"Error updating query {query_id}: {e}")
raise
def archive_query(self, query_id: int) -> bool:
"""
Archives a query. Returns True on success.
"""
try:
return self.client.archive_query(query_id)
except Exception as e:
logger.error(f"Error archiving query {query_id}: {e}")
raise
def get_query(self, query_id: int) -> Dict[str, Any]:
cache_key = str(query_id)
cached = self.cache.get("query", cache_key)
if cached:
return cached
try:
query = self.client.get_query(query_id)
# Serialize
data = {
"id": query.base.query_id,
"name": query.base.name,
"description": query.base.description or "",
"sql": query.sql,
"parameters": [p.to_dict() for p in query.base.parameters] if query.base.parameters else []
}
self.cache.set("query", cache_key, data)
return data
except Exception as e:
# Check for 403 Forbidden (common for public-but-not-published queries)
is_forbidden = "403" in str(e) or "Forbidden" in str(e)
if is_forbidden:
logger.info(f"Access Forbidden via SDK for Query {query_id}. Attempting GraphQL fallback...")
fallback_data = self._get_query_graphql(query_id)
if fallback_data:
self.cache.set("query", cache_key, fallback_data)
return fallback_data
logger.error(f"Error fetching query {query_id}: {e}")
raise
def execute_query(self, query_id: int, params: Optional[Dict[str, Any]] = None) -> str:
# We want to start execution and return ID, NOT wait.
# client.execute_query() waits.
# client.execute() (base method) usually returns the response with job_id.
query = QueryBase(
query_id=query_id,
params=[QueryParameter.from_dict(p) for p in (params or [])]
)
# Using the lower-level execute method to get job_id without waiting
# The SDK implementation of execute() typically performs the POST /execute call
try:
# We construct the execution payload manually if SDK doesn't expose non-blocking nicely
# Or checking SDK: client.execute(query) -> ExecutionResult (which contains job_id)
# wait... client.execute() waits for completion loop.
# Use raw request for async trigger to be safe and efficient
url = f"{self.base_url}/query/{query_id}/execute"
payload = {"query_parameters": {p["name"]: p["value"] for p in (params or [])}}
headers = {"X-Dune-Api-Key": self.api_key}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
return data["execution_id"]
except Exception as e:
logger.error(f"Error executing query {query_id}: {e}")
raise
def get_status(self, job_id: str) -> Dict[str, Any]:
"""
Returns the status of a job, including credit usage if available.
"""
cached = self.cache.get("status", job_id)
if cached:
# Check if cached value is old string format or new dict format
if isinstance(cached, str):
return {"state": cached}
return cached
status_response = self.client.get_execution_status(job_id)
state = str(status_response.state).replace("ExecutionState.", "")
result = {
"state": state,
"credits_used": getattr(status_response, "execution_cost_credits", None),
"execution_time": getattr(status_response, "execution_time_millis", 0) # sometimes available
}
if state in ["COMPLETED", "FAILED", "CANCELLED"]:
self.cache.set("status", job_id, result)
return result
def get_result(self, job_id: str) -> Any:
return self.client.get_execution_results(job_id)
def analyze_result(self, job_id: str, data_processor: Any) -> Dict[str, Any]:
"""
Fetches results and runs advanced analysis using the DataProcessor.
"""
result = self.get_result(job_id)
return data_processor.analyze_dataframe(result)
def get_usage(self) -> Dict[str, Any]:
"""Returns the credit usage info."""
# Raw request as get_usage might return complex object
url = f"{self.base_url}/auth/usage" # Verify endpoint
# The SDK has client.get_usage(), let's use that if it works
try:
# Note: SDK get_usage might be for generic usage or specific endpoint
# Let's rely on SDK
# Inspecting check_dune.py output: 'get_usage' exists!
# It likely returns a pydantic model or dict.
return self.client.get_usage() # We will inspect this return type at runtime
except Exception as e:
logger.error(f"Error getting usage: {e}")
# Fallback mock
return {"error": str(e)}